Cloud Pub/Sub to Cloud Storage template
Use the Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage template to extract data from Pub/Sub to Cloud Storage.
Use the template
Run the template using the gcloud CLI or Dataproc API.
gcloud
Before using any of the command data below, make the following replacements:
- PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
 - REGION : Required. Compute Engine region .
 -  SUBNET 
: Optional. If a subnet is not specified, the subnet
   in the specified REGION in the 
defaultnetwork is selected.Example:
projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME -  TEMPLATE_VERSION 
: Required. Specify 
latestfor the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binariesto list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
 - SUBSCRIPTION : Required. Pub/Sub subscription name.
 -  CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
: Required. Cloud Storage bucket name where output will be stored. 
Note:The output files will be stored in the
output/folder inside the bucket. -  FORMAT 
: Required. Output data format. Options: 
avroorjson.Note:If
avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to thejarsgcloud CLI flag or API field.Example (the
file://prefix references a Serverless for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,[ ... other jars] - TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
 - DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
 - NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
 - BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
 - SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
 -  PROPERTY 
and PROPERTY_VALUE 
:
  Optional. Comma-separated list of Spark property 
= 
valuepairs. -  LABEL 
and LABEL_VALUE 
:
   Optional. Comma-separated list of 
label=valuepairs. -  LOG_LEVEL 
: Optional. Level of logging. Can be one of 
ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE, orWARN. Default:INFO. -  KMS_KEY 
: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest 
using a Google-owned and Google-managed encryption key. 
Example:
projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME 
Execute the following command:
Linux, macOS, or Cloud Shell
gcloud dataproc batches submit spark \ --class = com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version = "1.2" \ --project = " PROJECT_ID " \ --region = " REGION " \ --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" \ --subnet = " SUBNET " \ --kms-key = " KMS_KEY " \ --service-account = " SERVICE_ACCOUNT " \ --properties = " PROPERTY = PROPERTY_VALUE " \ --labels = " LABEL = LABEL_VALUE " \ -- --template = PUBSUBTOGCS \ --templateProperty log.level = " LOG_LEVEL " \ --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " \ --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " \ --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " \ --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " \ --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " \ --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " \ --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " \ --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class = com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version = "1.2" ` --project = " PROJECT_ID " ` --region = " REGION " ` --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ` --subnet = " SUBNET " ` --kms-key = " KMS_KEY " ` --service-account = " SERVICE_ACCOUNT " ` --properties = " PROPERTY = PROPERTY_VALUE " ` --labels = " LABEL = LABEL_VALUE " ` -- --template = PUBSUBTOGCS ` --templateProperty log.level = " LOG_LEVEL " ` --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " ` --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " ` --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " ` --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " ` --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " ` --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " ` --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " ` --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class = com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version = "1.2" ^ --project = " PROJECT_ID " ^ --region = " REGION " ^ --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ^ --subnet = " SUBNET " ^ --kms-key = " KMS_KEY " ^ --service-account = " SERVICE_ACCOUNT " ^ --properties = " PROPERTY = PROPERTY_VALUE " ^ --labels = " LABEL = LABEL_VALUE " ^ -- --template = PUBSUBTOGCS ^ --templateProperty log.level = " LOG_LEVEL " ^ --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " ^ --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " ^ --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " ^ --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " ^ --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " ^ --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " ^ --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " ^ --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
REST
Before using any of the request data, make the following replacements:
- PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
 - REGION : Required. Compute Engine region .
 -  SUBNET 
: Optional. If a subnet is not specified, the subnet
   in the specified REGION in the 
defaultnetwork is selected.Example:
projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME -  TEMPLATE_VERSION 
: Required. Specify 
latestfor the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binariesto list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
 - SUBSCRIPTION : Required. Pub/Sub subscription name.
 -  CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
: Required. Cloud Storage bucket name where output will be stored. 
Note:The output files will be stored in the
output/folder inside the bucket. -  FORMAT 
: Required. Output data format. Options: 
avroorjson.Note:If
avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to thejarsgcloud CLI flag or API field.Example (the
file://prefix references a Serverless for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,[ ... other jars] - TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
 - DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
 - NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
 - BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
 - SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
 -  PROPERTY 
and PROPERTY_VALUE 
:
  Optional. Comma-separated list of Spark property 
= 
valuepairs. -  LABEL 
and LABEL_VALUE 
:
   Optional. Comma-separated list of 
label=valuepairs. -  LOG_LEVEL 
: Optional. Level of logging. Can be one of 
ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE, orWARN. Default:INFO. -  KMS_KEY 
: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest 
using a Google-owned and Google-managed encryption key. 
Example:
projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME 
HTTP method and URL:
POST https://dataproc.googleapis.com/v1/projects/ PROJECT_ID /locations/ REGION /batches
Request JSON body:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":" SUBNET ", "kmsKey": " KMS_KEY ", "serviceAccount": " SERVICE_ACCOUNT " } }, "labels": { " LABEL ": " LABEL_VALUE " }, "runtimeConfig": { "version": "1.2", "properties": { " PROPERTY ": " PROPERTY_VALUE " } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level= LOG_LEVEL ", "--templateProperty","pubsubtogcs.input.project.id= PUBSUB_SUBSCRIPTION_PROJECT_ID ", "--templateProperty","pubsubtogcs.input.subscription= SUBSCRIPTION ", "--templateProperty","pubsubtogcs.gcs.bucket.name= CLOUD_STORAGE_OUTPUT_BUCKET_NAME ", "--templateProperty","pubsubtogcs.gcs.output.data.format= FORMAT ", "--templateProperty","pubsubtogcs.timeout.ms= TIMEOUT ", "--templateProperty","pubsubtogcs.streaming.duration.seconds= DURATION ", "--templateProperty","pubsubtogcs.total.receivers= NUM_RECEIVERS ", "--templateProperty","pubsubtogcs.batch.size= BATCHSIZE " ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ] } }
To send your request, expand one of these options:
You should receive a JSON response similar to the following:
{ "name": "projects/ PROJECT_ID /regions/ REGION /operations/ OPERATION_ID ", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/ PROJECT_ID /locations/ REGION /batches/ BATCH_ID ", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }

