Cloud Pub/Sub to Cloud Storage template

Use the Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage template to extract data from Pub/Sub to Cloud Storage.

Use the template

Run the template using the gcloud CLI or Dataproc API.

gcloud

Before using any of the command data below, make the following replacements:

  • PROJECT_ID: Required. Your Google Cloud project ID listed in the IAM Settings.
  • REGION: Required. Compute Engine region.
  • SUBNET: Optional. If a subnet is not specified, the subnet in the specified REGION in the default network is selected.

    Example: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: Required. Specify latest for the latest template version, or the date of a specific version, for example, 2023-03-17_v0.1.0-beta (visit gs://dataproc-templates-binaries or run gcloud storage ls gs://dataproc-templates-binaries to list available template versions).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
  • SUBSCRIPTION: Required. Pub/Sub subscription name.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Required. Cloud Storage bucket name where output will be stored.

    Note: The output files will be stored in the output/ folder inside the bucket.

  • FORMAT: Required. Output data format. Options: avro or json.

    Note: If avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to the jars gcloud CLI flag or API field.

    Example (the file:// prefix references a Serverless for Apache Spark jar file):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Time in milliseconds before termination of stream. Defaults to 60000.
  • DURATION: Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds.
  • NUM_RECEIVERS: Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5.
  • BATCHSIZE: Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000.
  • SERVICE_ACCOUNT: Optional. If not provided, the default Compute Engine service account is used.
  • PROPERTY and PROPERTY_VALUE: Optional. Comma-separated list of Spark property=value pairs.
  • LABEL and LABEL_VALUE: Optional. Comma-separated list of label=value pairs.
  • LOG_LEVEL: Optional. Level of logging. Can be one of ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, or WARN. Default: INFO.
  • KMS_KEY: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.

    Example: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Execute the following command:

Linux, macOS, or Cloud Shell

gcloud dataproc batches submit spark \  --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \  --version="1.2" \  --project="PROJECT_ID" \  --region="REGION" \  --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \  --subnet="SUBNET" \  --kms-key="KMS_KEY" \  --service-account="SERVICE_ACCOUNT" \  --properties="PROPERTY=PROPERTY_VALUE" \  --labels="LABEL=LABEL_VALUE" \  -- --template=PUBSUBTOGCS \  --templateProperty log.level="LOG_LEVEL" \  --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \  --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \  --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \  --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \  --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \  --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \  --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \  --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (PowerShell)

gcloud dataproc batches submit spark `  --class=com.google.cloud.dataproc.templates.main.DataProcTemplate `  --version="1.2" `  --project="PROJECT_ID" `  --region="REGION" `  --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" `  --subnet="SUBNET" `  --kms-key="KMS_KEY" `  --service-account="SERVICE_ACCOUNT" `  --properties="PROPERTY=PROPERTY_VALUE" `  --labels="LABEL=LABEL_VALUE" `  -- --template=PUBSUBTOGCS `  --templateProperty log.level="LOG_LEVEL" `  --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" `  --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" `  --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" `  --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" `  --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" `  --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" `  --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" `  --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (cmd.exe)

gcloud dataproc batches submit spark ^  --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^  --version="1.2" ^  --project="PROJECT_ID" ^  --region="REGION" ^  --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^  --subnet="SUBNET" ^  --kms-key="KMS_KEY" ^  --service-account="SERVICE_ACCOUNT" ^  --properties="PROPERTY=PROPERTY_VALUE" ^  --labels="LABEL=LABEL_VALUE" ^  -- --template=PUBSUBTOGCS ^  --templateProperty log.level="LOG_LEVEL" ^  --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^  --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^  --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^  --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^  --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^  --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^  --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^  --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

REST

Before using any of the request data, make the following replacements:

  • PROJECT_ID: Required. Your Google Cloud project ID listed in the IAM Settings.
  • REGION: Required. Compute Engine region.
  • SUBNET: Optional. If a subnet is not specified, the subnet in the specified REGION in the default network is selected.

    Example: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: Required. Specify latest for the latest template version, or the date of a specific version, for example, 2023-03-17_v0.1.0-beta (visit gs://dataproc-templates-binaries or run gcloud storage ls gs://dataproc-templates-binaries to list available template versions).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
  • SUBSCRIPTION: Required. Pub/Sub subscription name.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Required. Cloud Storage bucket name where output will be stored.

    Note: The output files will be stored in the output/ folder inside the bucket.

  • FORMAT: Required. Output data format. Options: avro or json.

    Note: If avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to the jars gcloud CLI flag or API field.

    Example (the file:// prefix references a Serverless for Apache Spark jar file):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Time in milliseconds before termination of stream. Defaults to 60000.
  • DURATION: Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds.
  • NUM_RECEIVERS: Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5.
  • BATCHSIZE: Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000.
  • SERVICE_ACCOUNT: Optional. If not provided, the default Compute Engine service account is used.
  • PROPERTY and PROPERTY_VALUE: Optional. Comma-separated list of Spark property=value pairs.
  • LABEL and LABEL_VALUE: Optional. Comma-separated list of label=value pairs.
  • LOG_LEVEL: Optional. Level of logging. Can be one of ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, or WARN. Default: INFO.
  • KMS_KEY: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.

    Example: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

HTTP method and URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches

Request JSON body:

 { "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.2", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } } 

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

 { "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }