Skip to content

prefect_gcp.workers.cloud_run_v2

CloudRunWorkerJobV2Configuration

Bases: BaseJobConfiguration

The configuration for the Cloud Run worker V2.

The schema for this class is used to populate the job_body section of the default base job template.

Source code in prefect_gcp/workers/cloud_run_v2.py
 87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
class CloudRunWorkerJobV2Configuration(BaseJobConfiguration):  """  The configuration for the Cloud Run worker V2.  The schema for this class is used to populate the `job_body` section of the  default base job template.  """ credentials: GcpCredentials = Field( title="GCP Credentials", default_factory=GcpCredentials, description=( "The GCP Credentials used to connect to Cloud Run. " "If not provided credentials will be inferred from " "the local environment." ), ) env_from_secrets: Dict[str, SecretKeySelector] = Field( default_factory=dict, title="Environment Variables from Secrets", description="Environment variables to set from GCP secrets when starting a flow run.", ) job_body: Dict[str, Any] = Field( json_schema_extra=dict(template=_get_default_job_body_template()), ) keep_job: bool = Field( default=False, title="Keep Job After Completion", description="Keep the completed Cloud run job on Google Cloud Platform.", ) region: str = Field( default="us-central1", description="The region in which to run the Cloud Run job", ) timeout: int = Field( default=600, gt=0, le=86400, description=( "Max allowed duration the Job may be active before Cloud Run will " "actively try to mark it failed and kill associated containers (maximum of 86400 seconds, 1 day)." ), ) _job_name: str = PrivateAttr(default=None) @property def project(self) -> str:  """  Returns the GCP project associated with the credentials.  Returns:  str: The GCP project associated with the credentials.  """ return self.credentials.project @property def job_name(self) -> str:  """  Returns the name of the job.  Returns:  str: The name of the job.  """ if self._job_name is None: base_job_name = slugify_name(self.name) job_name = f"{base_job_name}-{uuid4().hex}" self._job_name = job_name return self._job_name def prepare_for_flow_run( self, flow_run: "FlowRun", deployment: Optional["DeploymentResponse"] = None, flow: Optional["Flow"] = None, ):  """  Prepares the job configuration for a flow run.  Ensures that necessary values are present in the job body and that the  job body is valid.  Args:  flow_run: The flow run to prepare the job configuration for  deployment: The deployment associated with the flow run used for  preparation.  flow: The flow associated with the flow run used for preparation.  """ super().prepare_for_flow_run( flow_run=flow_run, deployment=deployment, flow=flow, ) self._populate_env() self._populate_or_format_command() self._format_args_if_present() self._populate_image_if_not_present() self._populate_timeout() self._remove_vpc_access_if_unset() def _populate_timeout(self):  """  Populates the job body with the timeout.  """ self.job_body["template"]["template"]["timeout"] = f"{self.timeout}s" def _populate_env(self):  """  Populates the job body with environment variables.  """ envs = [{"name": k, "value": v} for k, v in self.env.items()] envs_from_secrets = [ { "name": k, "valueSource": {"secretKeyRef": v.model_dump()}, } for k, v in self.env_from_secrets.items() ] envs.extend(envs_from_secrets) self.job_body["template"]["template"]["containers"][0]["env"] = envs def _populate_image_if_not_present(self):  """  Populates the job body with the image if not present.  """ if "image" not in self.job_body["template"]["template"]["containers"][0]: self.job_body["template"]["template"]["containers"][0][ "image" ] = f"docker.io/{get_prefect_image_name()}" def _populate_or_format_command(self):  """  Populates the job body with the command if not present.  """ command = self.job_body["template"]["template"]["containers"][0].get("command") if command is None: self.job_body["template"]["template"]["containers"][0][ "command" ] = shlex.split(self._base_flow_run_command()) elif isinstance(command, str): self.job_body["template"]["template"]["containers"][0][ "command" ] = shlex.split(command) def _format_args_if_present(self):  """  Formats the job body args if present.  """ args = self.job_body["template"]["template"]["containers"][0].get("args") if args is not None and isinstance(args, str): self.job_body["template"]["template"]["containers"][0][ "args" ] = shlex.split(args) def _remove_vpc_access_if_unset(self):  """  Removes vpcAccess if unset.  """ if "vpcAccess" not in self.job_body["template"]["template"]: return vpc_access = self.job_body["template"]["template"]["vpcAccess"] # if vpcAccess is unset or connector is unset, remove the entire vpcAccess block # otherwise leave the user provided value. if not vpc_access or ( len(vpc_access) == 1 and "connector" in vpc_access and vpc_access["connector"] is None ): self.job_body["template"]["template"].pop("vpcAccess") # noinspection PyMethodParameters @field_validator("job_body") @classmethod def _ensure_job_includes_all_required_components(cls, value: Dict[str, Any]):  """  Ensures that the job body includes all required components.  Args:  value: The job body to validate.  Returns:  The validated job body.  """ patch = JsonPatch.from_diff(value, _get_base_job_body()) missing_paths = sorted([op["path"] for op in patch if op["op"] == "add"]) if missing_paths: raise ValueError( f"Job body is missing required components: {', '.join(missing_paths)}" ) return value # noinspection PyMethodParameters @field_validator("job_body") @classmethod def _ensure_job_has_compatible_values(cls, value: Dict[str, Any]):  """Ensure that the job body has compatible values.""" patch = JsonPatch.from_diff(value, _get_base_job_body()) incompatible = sorted( [ f"{op['path']} must have value {op['value']!r}" for op in patch if op["op"] == "replace" ] ) if incompatible: raise ValueError( "Job has incompatible values for the following attributes: " f"{', '.join(incompatible)}" ) return value 

job_name: str property

Returns the name of the job.

Returns:

Name Type Description
str str

The name of the job.

project: str property

Returns the GCP project associated with the credentials.

Returns:

Name Type Description
str str

The GCP project associated with the credentials.

prepare_for_flow_run(flow_run, deployment=None, flow=None)

Prepares the job configuration for a flow run.

Ensures that necessary values are present in the job body and that the job body is valid.

Parameters:

Name Type Description Default
flow_run FlowRun

The flow run to prepare the job configuration for

required
deployment Optional[DeploymentResponse]

The deployment associated with the flow run used for preparation.

None
flow Optional[Flow]

The flow associated with the flow run used for preparation.

None
Source code in prefect_gcp/workers/cloud_run_v2.py
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
def prepare_for_flow_run( self, flow_run: "FlowRun", deployment: Optional["DeploymentResponse"] = None, flow: Optional["Flow"] = None, ):  """  Prepares the job configuration for a flow run.  Ensures that necessary values are present in the job body and that the  job body is valid.  Args:  flow_run: The flow run to prepare the job configuration for  deployment: The deployment associated with the flow run used for  preparation.  flow: The flow associated with the flow run used for preparation.  """ super().prepare_for_flow_run( flow_run=flow_run, deployment=deployment, flow=flow, ) self._populate_env() self._populate_or_format_command() self._format_args_if_present() self._populate_image_if_not_present() self._populate_timeout() self._remove_vpc_access_if_unset() 

CloudRunWorkerV2

Bases: BaseWorker

The Cloud Run worker V2.

Source code in prefect_gcp/workers/cloud_run_v2.py
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
class CloudRunWorkerV2(BaseWorker):  """  The Cloud Run worker V2.  """ type = "cloud-run-v2" job_configuration = CloudRunWorkerJobV2Configuration job_configuration_variables = CloudRunWorkerV2Variables _description = "Execute flow runs within containers on Google Cloud Run (V2 API). Requires a Google Cloud Platform account." # noqa _display_name = "Google Cloud Run V2" _documentation_url = "https://prefecthq.github.io/prefect-gcp/worker_v2/" _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/4SpnOBvMYkHp6z939MDKP6/549a91bc1ce9afd4fb12c68db7b68106/social-icon-google-cloud-1200-630.png?h=250" # noqa async def run( self, flow_run: "FlowRun", configuration: CloudRunWorkerJobV2Configuration, task_status: Optional[TaskStatus] = None, ) -> CloudRunWorkerV2Result:  """  Runs the flow run on Cloud Run and waits for it to complete.  Args:  flow_run: The flow run to run.  configuration: The configuration for the job.  task_status: The task status to update.  Returns:  The result of the job.  """ logger = self.get_flow_run_logger(flow_run) with self._get_client(configuration=configuration) as cr_client: await run_sync_in_worker_thread( self._create_job_and_wait_for_registration, configuration=configuration, cr_client=cr_client, logger=logger, ) execution = await run_sync_in_worker_thread( self._begin_job_execution, configuration=configuration, cr_client=cr_client, logger=logger, ) if task_status: task_status.started(configuration.job_name) result = await run_sync_in_worker_thread( self._watch_job_execution_and_get_result, configuration=configuration, cr_client=cr_client, execution=execution, logger=logger, ) return result @staticmethod def _get_client( configuration: CloudRunWorkerJobV2Configuration, ) -> ResourceWarning:  """  Get the base client needed for interacting with GCP Cloud Run V2 API.  Returns:  Resource: The base client needed for interacting with GCP Cloud Run V2 API.  """ api_endpoint = "https://run.googleapis.com" gcp_creds = configuration.credentials.get_credentials_from_service_account() options = ClientOptions(api_endpoint=api_endpoint) return ( discovery.build( "run", "v2", client_options=options, credentials=gcp_creds, num_retries=3, # Set to 3 in case of intermittent/connection issues ) .projects() .locations() ) def _create_job_and_wait_for_registration( self, configuration: CloudRunWorkerJobV2Configuration, cr_client: Resource, logger: PrefectLogAdapter, ):  """  Creates the Cloud Run job and waits for it to register.  Args:  configuration: The configuration for the job.  cr_client: The Cloud Run client.  logger: The logger to use.  """ try: logger.info(f"Creating Cloud Run JobV2 {configuration.job_name}") JobV2.create( cr_client=cr_client, project=configuration.project, location=configuration.region, job_id=configuration.job_name, body=configuration.job_body, ) except HttpError as exc: self._create_job_error( exc=exc, configuration=configuration, ) try: self._wait_for_job_creation( cr_client=cr_client, configuration=configuration, logger=logger, ) except Exception as exc: logger.critical( f"Failed to create Cloud Run JobV2 {configuration.job_name}.\n{exc}" ) if not configuration.keep_job: try: JobV2.delete( cr_client=cr_client, project=configuration.project, location=configuration.region, job_name=configuration.job_name, ) except Exception as exc2: logger.critical( f"Failed to delete Cloud Run JobV2 {configuration.job_name}." f"\n{exc2}" ) raise @staticmethod def _wait_for_job_creation( cr_client: Resource, configuration: CloudRunWorkerJobV2Configuration, logger: PrefectLogAdapter, poll_interval: int = 5, ):  """  Waits for the Cloud Run job to be created.  Args:  cr_client: The Cloud Run client.  configuration: The configuration for the job.  logger: The logger to use.  poll_interval: The interval to poll the Cloud Run job, defaults to 5  seconds.  """ job = JobV2.get( cr_client=cr_client, project=configuration.project, location=configuration.region, job_name=configuration.job_name, ) while not job.is_ready(): if not (ready_condition := job.get_ready_condition()): ready_condition = "waiting for condition update" logger.info(f"Current Job Condition: {ready_condition}") job = JobV2.get( cr_client=cr_client, project=configuration.project, location=configuration.region, job_name=configuration.job_name, ) time.sleep(poll_interval) @staticmethod def _create_job_error( exc: HttpError, configuration: CloudRunWorkerJobV2Configuration, ):  """  Creates a formatted error message for the Cloud Run V2 API errors  """ # noinspection PyUnresolvedReferences if exc.status_code == 404: raise RuntimeError( f"Failed to find resources at {exc.uri}. Confirm that region" f" '{configuration.region}' is the correct region for your Cloud" f" Run Job and that {configuration.project} is the correct GCP " f" project. If your project ID is not correct, you are using a " f"Credentials block with permissions for the wrong project." ) from exc raise exc def _begin_job_execution( self, cr_client: Resource, configuration: CloudRunWorkerJobV2Configuration, logger: PrefectLogAdapter, ) -> ExecutionV2:  """  Begins the Cloud Run job execution.  Args:  cr_client: The Cloud Run client.  configuration: The configuration for the job.  logger: The logger to use.  Returns:  The Cloud Run job execution.  """ try: logger.info( f"Submitting Cloud Run Job V2 {configuration.job_name} for execution..." ) submission = JobV2.run( cr_client=cr_client, project=configuration.project, location=configuration.region, job_name=configuration.job_name, ) job_execution = ExecutionV2.get( cr_client=cr_client, execution_id=submission["metadata"]["name"], ) command = ( " ".join(configuration.command) if configuration.command else "default container command" ) logger.info( f"Cloud Run Job V2 {configuration.job_name} submitted for execution " f"with command: {command}" ) return job_execution except Exception as exc: self._job_run_submission_error( exc=exc, configuration=configuration, ) raise def _watch_job_execution_and_get_result( self, cr_client: Resource, configuration: CloudRunWorkerJobV2Configuration, execution: ExecutionV2, logger: PrefectLogAdapter, poll_interval: int = 5, ) -> CloudRunWorkerV2Result:  """  Watch the job execution and get the result.  Args:  cr_client (Resource): The base client needed for interacting with GCP  Cloud Run V2 API.  configuration (CloudRunWorkerJobV2Configuration): The configuration for  the job.  execution (ExecutionV2): The execution to watch.  logger (PrefectLogAdapter): The logger to use.  poll_interval (int): The number of seconds to wait between polls.  Defaults to 5 seconds.  Returns:  The result of the job.  """ try: execution = self._watch_job_execution( cr_client=cr_client, configuration=configuration, execution=execution, poll_interval=poll_interval, ) except Exception as exc: logger.critical( f"Encountered an exception while waiting for job run completion - " f"{exc}" ) raise if execution.succeeded(): status_code = 0 logger.info(f"Cloud Run Job V2 {configuration.job_name} succeeded") else: status_code = 1 error_mg = execution.condition_after_completion().get("message") logger.error( f"Cloud Run Job V2 {configuration.job_name} failed - {error_mg}" ) logger.info(f"Job run logs can be found on GCP at: {execution.logUri}") if not configuration.keep_job: logger.info( f"Deleting completed Cloud Run Job {configuration.job_name!r} from " "Google Cloud Run..." ) try: JobV2.delete( cr_client=cr_client, project=configuration.project, location=configuration.region, job_name=configuration.job_name, ) except Exception as exc: logger.critical( "Received an exception while deleting the Cloud Run Job V2 " f"- {configuration.job_name} - {exc}" ) return CloudRunWorkerV2Result( identifier=configuration.job_name, status_code=status_code, ) # noinspection DuplicatedCode @staticmethod def _watch_job_execution( cr_client: Resource, configuration: CloudRunWorkerJobV2Configuration, execution: ExecutionV2, poll_interval: int, ) -> ExecutionV2:  """  Update execution status until it is no longer running.  Args:  cr_client (Resource): The base client needed for interacting with GCP  Cloud Run V2 API.  configuration (CloudRunWorkerJobV2Configuration): The configuration for  the job.  execution (ExecutionV2): The execution to watch.  poll_interval (int): The number of seconds to wait between polls.  Returns:  The execution.  """ while execution.is_running(): execution = ExecutionV2.get( cr_client=cr_client, execution_id=execution.name, ) time.sleep(poll_interval) return execution @staticmethod def _job_run_submission_error( exc: Exception, configuration: CloudRunWorkerJobV2Configuration, ):  """  Creates a formatted error message for the Cloud Run V2 API errors  Args:  exc: The exception to format.  configuration: The configuration for the job.  """ # noinspection PyUnresolvedReferences if exc.status_code == 404: pat1 = r"The requested URL [^ ]+ was not found on this server" if re.findall(pat1, str(exc)): # noinspection PyUnresolvedReferences raise RuntimeError( f"Failed to find resources at {exc.uri}. " f"Confirm that region '{configuration.region}' is " f"the correct region for your Cloud Run Job " f"and that '{configuration.project}' is the " f"correct GCP project. If your project ID is not " f"correct, you are using a Credentials " f"block with permissions for the wrong project." ) from exc else: raise exc 

run(flow_run, configuration, task_status=None) async

Runs the flow run on Cloud Run and waits for it to complete.

Parameters:

Name Type Description Default
flow_run FlowRun

The flow run to run.

required
configuration CloudRunWorkerJobV2Configuration

The configuration for the job.

required
task_status Optional[TaskStatus]

The task status to update.

None

Returns:

Type Description
CloudRunWorkerV2Result

The result of the job.

Source code in prefect_gcp/workers/cloud_run_v2.py
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
async def run( self, flow_run: "FlowRun", configuration: CloudRunWorkerJobV2Configuration, task_status: Optional[TaskStatus] = None, ) -> CloudRunWorkerV2Result:  """  Runs the flow run on Cloud Run and waits for it to complete.  Args:  flow_run: The flow run to run.  configuration: The configuration for the job.  task_status: The task status to update.  Returns:  The result of the job.  """ logger = self.get_flow_run_logger(flow_run) with self._get_client(configuration=configuration) as cr_client: await run_sync_in_worker_thread( self._create_job_and_wait_for_registration, configuration=configuration, cr_client=cr_client, logger=logger, ) execution = await run_sync_in_worker_thread( self._begin_job_execution, configuration=configuration, cr_client=cr_client, logger=logger, ) if task_status: task_status.started(configuration.job_name) result = await run_sync_in_worker_thread( self._watch_job_execution_and_get_result, configuration=configuration, cr_client=cr_client, execution=execution, logger=logger, ) return result 

CloudRunWorkerV2Result

Bases: BaseWorkerResult

The result of a Cloud Run worker V2 job.

Source code in prefect_gcp/workers/cloud_run_v2.py
423 424 425 426
class CloudRunWorkerV2Result(BaseWorkerResult):  """  The result of a Cloud Run worker V2 job.  """ 

CloudRunWorkerV2Variables

Bases: BaseVariables

Default variables for the v2 Cloud Run worker.

The schema for this class is used to populate the variables section of the default base job template.

Source code in prefect_gcp/workers/cloud_run_v2.py
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
class CloudRunWorkerV2Variables(BaseVariables):  """  Default variables for the v2 Cloud Run worker.  The schema for this class is used to populate the `variables` section of the  default base job template.  """ credentials: GcpCredentials = Field( title="GCP Credentials", default_factory=GcpCredentials, description=( "The GCP Credentials used to connect to Cloud Run. " "If not provided credentials will be inferred from " "the local environment." ), ) region: str = Field( default="us-central1", description="The region in which to run the Cloud Run job", ) image: Optional[str] = Field( default="prefecthq/prefect:3-latest", title="Image Name", description=( "The image to use for the Cloud Run job. " "If not provided the default Prefect image will be used." ), ) args: List[str] = Field( default_factory=list, description=( "The arguments to pass to the Cloud Run Job V2's entrypoint command." ), ) env_from_secrets: Dict[str, SecretKeySelector] = Field( default_factory=dict, title="Environment Variables from Secrets", description="Environment variables to set from GCP secrets when starting a flow run.", example={ "ENV_VAR_NAME": { "secret": "SECRET_NAME", "version": "latest", } }, ) keep_job: bool = Field( default=False, title="Keep Job After Completion", description="Keep the completed Cloud run job on Google Cloud Platform.", ) launch_stage: Literal[ "ALPHA", "BETA", "GA", "DEPRECATED", "EARLY_ACCESS", "PRELAUNCH", "UNIMPLEMENTED", "LAUNCH_TAG_UNSPECIFIED", ] = Field( "BETA", description=( "The launch stage of the Cloud Run Job V2. " "See https://cloud.google.com/run/docs/about-features-categories " "for additional details." ), ) max_retries: int = Field( default=0, title="Max Retries", description="The number of times to retry the Cloud Run job.", ) cpu: str = Field( default="1000m", title="CPU", description="The CPU to allocate to the Cloud Run job.", ) memory: str = Field( default="512Mi", title="Memory", description=( "The memory to allocate to the Cloud Run job along with the units, which" "could be: G, Gi, M, Mi." ), examples=["512Mi"], pattern=r"^\d+(?:G|Gi|M|Mi)$", ) timeout: int = Field( default=600, gt=0, le=86400, title="Job Timeout", description=( "Max allowed time duration the Job may be active before Cloud Run will " " actively try to mark it failed and kill associated containers (maximum of 86400 seconds, 1 day)." ), ) vpc_connector_name: Optional[str] = Field( default=None, title="VPC Connector Name", description="The name of the VPC connector to use for the Cloud Run job.", ) service_account_name: Optional[str] = Field( default=None, title="Service Account Name", description=( "The name of the service account to use for the task execution " "of Cloud Run Job. By default Cloud Run jobs run as the default " "Compute Engine Service Account." ), examples=["service-account@example.iam.gserviceaccount.com"], )