Skip to content

Commit 267b53d

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: Add PSC interface config support for Custom Training Jobs
PiperOrigin-RevId: 770333532
1 parent 84895b6 commit 267b53d

File tree

5 files changed

+360
-0
lines changed

5 files changed

+360
-0
lines changed

google/cloud/aiplatform/jobs.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
model_deployment_monitoring_job as gca_model_deployment_monitoring_job_compat,
4646
job_state_v1beta1 as gca_job_state_v1beta1,
4747
model_monitoring_v1beta1 as gca_model_monitoring_v1beta1,
48+
service_networking as gca_service_networking,
4849
) # TODO(b/242108750): remove temporary logic once model monitoring for batch prediction is GA
4950

5051
from google.cloud.aiplatform.constants import base as constants
@@ -2236,6 +2237,9 @@ def run(
22362237
persistent_resource_id: Optional[str] = None,
22372238
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
22382239
max_wait_duration: Optional[int] = None,
2240+
psc_interface_config: Optional[
2241+
gca_service_networking.PscInterfaceConfig
2242+
] = None,
22392243
) -> None:
22402244
"""Run this configured CustomJob.
22412245
@@ -2310,6 +2314,9 @@ def run(
23102314
This is the maximum duration that a job will wait for the
23112315
requested resources to be provisioned in seconds. If set to 0,
23122316
the job will wait indefinitely. The default is 1 day.
2317+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
2318+
Optional. Configuration for Private Service Connect interface
2319+
used for training.
23132320
"""
23142321
network = network or initializer.global_config.network
23152322
service_account = service_account or initializer.global_config.service_account
@@ -2329,6 +2336,7 @@ def run(
23292336
persistent_resource_id=persistent_resource_id,
23302337
scheduling_strategy=scheduling_strategy,
23312338
max_wait_duration=max_wait_duration,
2339+
psc_interface_config=psc_interface_config,
23322340
)
23332341

23342342
@base.optional_sync()
@@ -2348,6 +2356,9 @@ def _run(
23482356
persistent_resource_id: Optional[str] = None,
23492357
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
23502358
max_wait_duration: Optional[int] = None,
2359+
psc_interface_config: Optional[
2360+
gca_service_networking.PscInterfaceConfig
2361+
] = None,
23512362
) -> None:
23522363
"""Helper method to ensure network synchronization and to run the configured CustomJob.
23532364
@@ -2420,6 +2431,9 @@ def _run(
24202431
This is the maximum duration that a job will wait for the
24212432
requested resources to be provisioned in seconds. If set to 0,
24222433
the job will wait indefinitely. The default is 1 day.
2434+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
2435+
Optional. Configuration for Private Service Connect interface
2436+
used for training.
24232437
"""
24242438
self.submit(
24252439
service_account=service_account,
@@ -2435,6 +2449,7 @@ def _run(
24352449
persistent_resource_id=persistent_resource_id,
24362450
scheduling_strategy=scheduling_strategy,
24372451
max_wait_duration=max_wait_duration,
2452+
psc_interface_config=psc_interface_config,
24382453
)
24392454

24402455
self._block_until_complete()
@@ -2455,6 +2470,9 @@ def submit(
24552470
persistent_resource_id: Optional[str] = None,
24562471
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
24572472
max_wait_duration: Optional[int] = None,
2473+
psc_interface_config: Optional[
2474+
gca_service_networking.PscInterfaceConfig
2475+
] = None,
24582476
) -> None:
24592477
"""Submit the configured CustomJob.
24602478
@@ -2524,6 +2542,9 @@ def submit(
25242542
This is the maximum duration that a job will wait for the
25252543
requested resources to be provisioned in seconds. If set to 0,
25262544
the job will wait indefinitely. The default is 1 day.
2545+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
2546+
Optional. Configuration for Private Service Connect interface
2547+
used for training.
25272548
25282549
Raises:
25292550
ValueError:
@@ -2546,6 +2567,9 @@ def submit(
25462567
if network:
25472568
self._gca_resource.job_spec.network = network
25482569

2570+
if psc_interface_config:
2571+
self._gca_resource.job_spec.psc_interface_config = psc_interface_config
2572+
25492573
if (
25502574
timeout
25512575
or restart_job_on_worker_restart

google/cloud/aiplatform/training_jobs.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
training_pipeline as gca_training_pipeline,
4646
study as gca_study_compat,
4747
custom_job as gca_custom_job_compat,
48+
service_networking as gca_service_networking,
4849
)
4950

5051
from google.cloud.aiplatform.utils import _timestamped_gcs_dir
@@ -1553,6 +1554,9 @@ def _prepare_training_task_inputs_and_output_dir(
15531554
persistent_resource_id: Optional[str] = None,
15541555
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
15551556
max_wait_duration: Optional[int] = None,
1557+
psc_interface_config: Optional[
1558+
gca_service_networking.PscInterfaceConfig
1559+
] = None,
15561560
) -> Tuple[Dict, str]:
15571561
"""Prepares training task inputs and output directory for custom job.
15581562
@@ -1617,6 +1621,8 @@ def _prepare_training_task_inputs_and_output_dir(
16171621
This is the maximum duration that a job will wait for the
16181622
requested resources to be provisioned in seconds. If set to 0,
16191623
the job will wait indefinitely. The default is 30 minutes.
1624+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
1625+
Optional. The PSC interface config for the job.
16201626
Returns:
16211627
Training task inputs and Output directory for custom job.
16221628
"""
@@ -1645,6 +1651,8 @@ def _prepare_training_task_inputs_and_output_dir(
16451651
training_task_inputs["enable_dashboard_access"] = enable_dashboard_access
16461652
if persistent_resource_id:
16471653
training_task_inputs["persistent_resource_id"] = persistent_resource_id
1654+
if psc_interface_config:
1655+
training_task_inputs["psc_interface_config"] = psc_interface_config
16481656

16491657
if (
16501658
timeout
@@ -3055,6 +3063,9 @@ def run(
30553063
reservation_affinity_key: Optional[str] = None,
30563064
reservation_affinity_values: Optional[List[str]] = None,
30573065
max_wait_duration: Optional[int] = None,
3066+
psc_interface_config: Optional[
3067+
gca_service_networking.PscInterfaceConfig
3068+
] = None,
30583069
) -> Optional[models.Model]:
30593070
"""Runs the custom training job.
30603071
@@ -3433,6 +3444,9 @@ def run(
34333444
This is the maximum duration that a job will wait for the
34343445
requested resources to be provisioned in seconds. If set to 0,
34353446
the job will wait indefinitely. The default is 30 minutes.
3447+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
3448+
Optional. Configuration for Private Service Connect interface
3449+
used for training.
34363450
34373451
Returns:
34383452
The trained Vertex AI model resource or None if the training
@@ -3504,6 +3518,7 @@ def run(
35043518
persistent_resource_id=persistent_resource_id,
35053519
scheduling_strategy=scheduling_strategy,
35063520
max_wait_duration=max_wait_duration,
3521+
psc_interface_config=psc_interface_config,
35073522
)
35083523

35093524
def submit(
@@ -3564,6 +3579,9 @@ def submit(
35643579
reservation_affinity_key: Optional[str] = None,
35653580
reservation_affinity_values: Optional[List[str]] = None,
35663581
max_wait_duration: Optional[int] = None,
3582+
psc_interface_config: Optional[
3583+
gca_service_networking.PscInterfaceConfig
3584+
] = None,
35673585
) -> Optional[models.Model]:
35683586
"""Submits the custom training job without blocking until completion.
35693587
@@ -3887,6 +3905,9 @@ def submit(
38873905
This is the maximum duration that a job will wait for the
38883906
requested resources to be provisioned in seconds. If set to 0,
38893907
the job will wait indefinitely. The default is 30 minutes.
3908+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
3909+
Optional. Configuration for Private Service Connect interface
3910+
used for training.
38903911
38913912
Returns:
38923913
model: The trained Vertex AI Model resource or None if training did not
@@ -3958,6 +3979,7 @@ def submit(
39583979
persistent_resource_id=persistent_resource_id,
39593980
scheduling_strategy=scheduling_strategy,
39603981
max_wait_duration=max_wait_duration,
3982+
psc_interface_config=psc_interface_config,
39613983
)
39623984

39633985
@base.optional_sync(construct_object_on_arg="managed_model")
@@ -4007,6 +4029,9 @@ def _run(
40074029
persistent_resource_id: Optional[str] = None,
40084030
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
40094031
max_wait_duration: Optional[int] = None,
4032+
psc_interface_config: Optional[
4033+
gca_service_networking.PscInterfaceConfig
4034+
] = None,
40104035
) -> Optional[models.Model]:
40114036
"""Packages local script and launches training_job.
40124037
@@ -4209,6 +4234,8 @@ def _run(
42094234
This is the maximum duration that a job will wait for the
42104235
requested resources to be provisioned in seconds. If set to 0,
42114236
the job will wait indefinitely. The default is 30 minutes.
4237+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
4238+
Optional. The PSC interface config for the job.
42124239
42134240
Returns:
42144241
model: The trained Vertex AI Model resource or None if training did not
@@ -4265,6 +4292,7 @@ def _run(
42654292
persistent_resource_id=persistent_resource_id,
42664293
scheduling_strategy=scheduling_strategy,
42674294
max_wait_duration=max_wait_duration,
4295+
psc_interface_config=psc_interface_config,
42684296
)
42694297

42704298
model = self._run_job(
@@ -4596,6 +4624,9 @@ def run(
45964624
reservation_affinity_key: Optional[str] = None,
45974625
reservation_affinity_values: Optional[List[str]] = None,
45984626
max_wait_duration: Optional[int] = None,
4627+
psc_interface_config: Optional[
4628+
gca_service_networking.PscInterfaceConfig
4629+
] = None,
45994630
) -> Optional[models.Model]:
46004631
"""Runs the custom training job.
46014632
@@ -4912,6 +4943,9 @@ def run(
49124943
This is the maximum duration that a job will wait for the
49134944
requested resources to be provisioned in seconds. If set to 0,
49144945
the job will wait indefinitely. The default is 30 minutes.
4946+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
4947+
Optional. Configuration for Private Service Connect interface
4948+
used for training.
49154949
49164950
Returns:
49174951
model: The trained Vertex AI Model resource or None if training did not
@@ -4982,6 +5016,7 @@ def run(
49825016
persistent_resource_id=persistent_resource_id,
49835017
scheduling_strategy=scheduling_strategy,
49845018
max_wait_duration=max_wait_duration,
5019+
psc_interface_config=psc_interface_config,
49855020
)
49865021

49875022
def submit(
@@ -5042,6 +5077,9 @@ def submit(
50425077
reservation_affinity_key: Optional[str] = None,
50435078
reservation_affinity_values: Optional[List[str]] = None,
50445079
max_wait_duration: Optional[int] = None,
5080+
psc_interface_config: Optional[
5081+
gca_service_networking.PscInterfaceConfig
5082+
] = None,
50455083
) -> Optional[models.Model]:
50465084
"""Submits the custom training job without blocking until completion.
50475085
@@ -5358,6 +5396,9 @@ def submit(
53585396
This is the maximum duration that a job will wait for the
53595397
requested resources to be provisioned in seconds. If set to 0,
53605398
the job will wait indefinitely. The default is 30 minutes.
5399+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
5400+
Optional. Configuration for Private Service Connect interface
5401+
used for training.
53615402
53625403
Returns:
53635404
model: The trained Vertex AI Model resource or None if training did not
@@ -5428,6 +5469,7 @@ def submit(
54285469
persistent_resource_id=persistent_resource_id,
54295470
scheduling_strategy=scheduling_strategy,
54305471
max_wait_duration=max_wait_duration,
5472+
psc_interface_config=psc_interface_config,
54315473
)
54325474

54335475
@base.optional_sync(construct_object_on_arg="managed_model")
@@ -5476,6 +5518,9 @@ def _run(
54765518
persistent_resource_id: Optional[str] = None,
54775519
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
54785520
max_wait_duration: Optional[int] = None,
5521+
psc_interface_config: Optional[
5522+
gca_service_networking.PscInterfaceConfig
5523+
] = None,
54795524
) -> Optional[models.Model]:
54805525
"""Packages local script and launches training_job.
54815526
Args:
@@ -5674,6 +5719,9 @@ def _run(
56745719
This is the maximum duration that a job will wait for the
56755720
requested resources to be provisioned in seconds. If set to 0,
56765721
the job will wait indefinitely. The default is 30 minutes.
5722+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
5723+
Optional. Configuration for Private Service Connect interface
5724+
used for training.
56775725
56785726
Returns:
56795727
model: The trained Vertex AI Model resource or None if training did not
@@ -5724,6 +5772,7 @@ def _run(
57245772
persistent_resource_id=persistent_resource_id,
57255773
scheduling_strategy=scheduling_strategy,
57265774
max_wait_duration=max_wait_duration,
5775+
psc_interface_config=psc_interface_config,
57275776
)
57285777

57295778
model = self._run_job(
@@ -7755,6 +7804,9 @@ def run(
77557804
reservation_affinity_key: Optional[str] = None,
77567805
reservation_affinity_values: Optional[List[str]] = None,
77577806
max_wait_duration: Optional[int] = None,
7807+
psc_interface_config: Optional[
7808+
gca_service_networking.PscInterfaceConfig
7809+
] = None,
77587810
) -> Optional[models.Model]:
77597811
"""Runs the custom training job.
77607812
@@ -8072,6 +8124,9 @@ def run(
80728124
This is the maximum duration that a job will wait for the
80738125
requested resources to be provisioned in seconds. If set to 0,
80748126
the job will wait indefinitely. The default is 30 minutes.
8127+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
8128+
Optional. Configuration for Private Service Connect interface
8129+
used for training.
80758130
80768131
Returns:
80778132
model: The trained Vertex AI Model resource or None if training did not
@@ -8137,6 +8192,7 @@ def run(
81378192
persistent_resource_id=persistent_resource_id,
81388193
scheduling_strategy=scheduling_strategy,
81398194
max_wait_duration=max_wait_duration,
8195+
psc_interface_config=psc_interface_config,
81408196
)
81418197

81428198
@base.optional_sync(construct_object_on_arg="managed_model")
@@ -8184,6 +8240,9 @@ def _run(
81848240
persistent_resource_id: Optional[str] = None,
81858241
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
81868242
max_wait_duration: Optional[int] = None,
8243+
psc_interface_config: Optional[
8244+
gca_service_networking.PscInterfaceConfig
8245+
] = None,
81878246
) -> Optional[models.Model]:
81888247
"""Packages local script and launches training_job.
81898248
@@ -8367,6 +8426,9 @@ def _run(
83678426
This is the maximum duration that a job will wait for the
83688427
requested resources to be provisioned in seconds. If set to 0,
83698428
the job will wait indefinitely. The default is 30 minutes.
8429+
psc_interface_config (gca_service_networking.PscInterfaceConfig):
8430+
Optional. Configuration for Private Service Connect interface
8431+
used for training.
83708432
83718433
Returns:
83728434
model: The trained Vertex AI Model resource or None if training did not
@@ -8417,6 +8479,7 @@ def _run(
84178479
persistent_resource_id=persistent_resource_id,
84188480
scheduling_strategy=scheduling_strategy,
84198481
max_wait_duration=max_wait_duration,
8482+
psc_interface_config=psc_interface_config,
84208483
)
84218484

84228485
model = self._run_job(

tests/unit/aiplatform/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ class TrainingJobConstants:
222222
)
223223
_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default"
224224
_TEST_SPOT_STRATEGY = custom_job.Scheduling.Strategy.SPOT
225+
_TEST_PSC_INTERFACE_CONFIG = {"network_attachment": "network_attachment_value"}
225226

226227
def create_tpu_job_proto(tpu_version):
227228
worker_pool_spec = (

0 commit comments

Comments
 (0)