Cloud Pub/Sub to Cloud Storage template

Use the Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage template to extract data from Pub/Sub to Cloud Storage.

Use the template

Run the template using the gcloud CLI or Dataproc API.

gcloud

Before using any of the command data below, make the following replacements:

  • PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
  • REGION : Required. Compute Engine region .
  • SUBNET : Optional. If a subnet is not specified, the subnet in the specified REGION in the default network is selected.

    Example: projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME

  • TEMPLATE_VERSION : Required. Specify latest for the latest template version, or the date of a specific version, for example, 2023-03-17_v0.1.0-beta (visit gs://dataproc-templates-binaries or run gcloud storage ls gs://dataproc-templates-binaries to list available template versions).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
  • SUBSCRIPTION : Required. Pub/Sub subscription name.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME : Required. Cloud Storage bucket name where output will be stored.

    Note:The output files will be stored in the output/ folder inside the bucket.

  • FORMAT : Required. Output data format. Options: avro or json .

    Note:If avro , you must add " file:///usr/lib/spark/connector/spark-avro.jar " to the jars gcloud CLI flag or API field.

    Example (the file:// prefix references a Serverless for Apache Spark jar file):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
  • DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
  • NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
  • BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
  • SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
  • PROPERTY and PROPERTY_VALUE : Optional. Comma-separated list of Spark property = value pairs.
  • LABEL and LABEL_VALUE : Optional. Comma-separated list of label = value pairs.
  • LOG_LEVEL : Optional. Level of logging. Can be one of ALL , DEBUG , ERROR , FATAL , INFO , OFF , TRACE , or WARN . Default: INFO .
  • KMS_KEY : Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.

    Example: projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME

Execute the following command:

Linux, macOS, or Cloud Shell

gcloud  
dataproc  
batches  
submit  
spark  
 \ 
  
--class = 
com.google.cloud.dataproc.templates.main.DataProcTemplate  
 \ 
  
--version = 
 "1.2" 
  
 \ 
  
--project = 
 " PROJECT_ID 
" 
  
 \ 
  
--region = 
 " REGION 
" 
  
 \ 
  
--jars = 
 "gs://dataproc-templates-binaries/ TEMPLATE_VERSION 
/java/dataproc-templates.jar" 
  
 \ 
  
--subnet = 
 " SUBNET 
" 
  
 \ 
  
--kms-key = 
 " KMS_KEY 
" 
  
 \ 
  
--service-account = 
 " SERVICE_ACCOUNT 
" 
  
 \ 
  
--properties = 
 " PROPERTY 
= PROPERTY_VALUE 
" 
  
 \ 
  
--labels = 
 " LABEL 
= LABEL_VALUE 
" 
  
 \ 
  
--  
--template = 
PUBSUBTOGCS  
 \ 
  
--templateProperty  
log.level = 
 " LOG_LEVEL 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.input.project.id = 
 " PUBSUB_SUBSCRIPTION_PROJECT_ID 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.input.subscription = 
 " SUBSCRIPTION 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.gcs.bucket.name = 
 " CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.gcs.output.data.format = 
 " FORMAT 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.timeout.ms = 
 " TIMEOUT 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.streaming.duration.seconds = 
 " DURATION 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.total.receivers = 
 " NUM_RECEIVERS 
" 
  
 \ 
  
--templateProperty  
pubsubtogcs.batch.size = 
 " BATCHSIZE 
" 

Windows (PowerShell)

gcloud  
dataproc  
batches  
submit  
spark  
 ` 
  
--class = 
com.google.cloud.dataproc.templates.main.DataProcTemplate  
 ` 
  
--version = 
 "1.2" 
  
 ` 
  
--project = 
 " PROJECT_ID 
" 
  
 ` 
  
--region = 
 " REGION 
" 
  
 ` 
  
--jars = 
 "gs://dataproc-templates-binaries/ TEMPLATE_VERSION 
/java/dataproc-templates.jar" 
  
 ` 
  
--subnet = 
 " SUBNET 
" 
  
 ` 
  
--kms-key = 
 " KMS_KEY 
" 
  
 ` 
  
--service-account = 
 " SERVICE_ACCOUNT 
" 
  
 ` 
  
--properties = 
 " PROPERTY 
= PROPERTY_VALUE 
" 
  
 ` 
  
--labels = 
 " LABEL 
= LABEL_VALUE 
" 
  
 ` 
  
--  
--template = 
PUBSUBTOGCS  
 ` 
  
--templateProperty  
log.level = 
 " LOG_LEVEL 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.input.project.id = 
 " PUBSUB_SUBSCRIPTION_PROJECT_ID 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.input.subscription = 
 " SUBSCRIPTION 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.gcs.bucket.name = 
 " CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.gcs.output.data.format = 
 " FORMAT 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.timeout.ms = 
 " TIMEOUT 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.streaming.duration.seconds = 
 " DURATION 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.total.receivers = 
 " NUM_RECEIVERS 
" 
  
 ` 
  
--templateProperty  
pubsubtogcs.batch.size = 
 " BATCHSIZE 
" 

Windows (cmd.exe)

gcloud  
dataproc  
batches  
submit  
spark  
^  
--class = 
com.google.cloud.dataproc.templates.main.DataProcTemplate  
^  
--version = 
 "1.2" 
  
^  
--project = 
 " PROJECT_ID 
" 
  
^  
--region = 
 " REGION 
" 
  
^  
--jars = 
 "gs://dataproc-templates-binaries/ TEMPLATE_VERSION 
/java/dataproc-templates.jar" 
  
^  
--subnet = 
 " SUBNET 
" 
  
^  
--kms-key = 
 " KMS_KEY 
" 
  
^  
--service-account = 
 " SERVICE_ACCOUNT 
" 
  
^  
--properties = 
 " PROPERTY 
= PROPERTY_VALUE 
" 
  
^  
--labels = 
 " LABEL 
= LABEL_VALUE 
" 
  
^  
--  
--template = 
PUBSUBTOGCS  
^  
--templateProperty  
log.level = 
 " LOG_LEVEL 
" 
  
^  
--templateProperty  
pubsubtogcs.input.project.id = 
 " PUBSUB_SUBSCRIPTION_PROJECT_ID 
" 
  
^  
--templateProperty  
pubsubtogcs.input.subscription = 
 " SUBSCRIPTION 
" 
  
^  
--templateProperty  
pubsubtogcs.gcs.bucket.name = 
 " CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
" 
  
^  
--templateProperty  
pubsubtogcs.gcs.output.data.format = 
 " FORMAT 
" 
  
^  
--templateProperty  
pubsubtogcs.timeout.ms = 
 " TIMEOUT 
" 
  
^  
--templateProperty  
pubsubtogcs.streaming.duration.seconds = 
 " DURATION 
" 
  
^  
--templateProperty  
pubsubtogcs.total.receivers = 
 " NUM_RECEIVERS 
" 
  
^  
--templateProperty  
pubsubtogcs.batch.size = 
 " BATCHSIZE 
" 

REST

Before using any of the request data, make the following replacements:

  • PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
  • REGION : Required. Compute Engine region .
  • SUBNET : Optional. If a subnet is not specified, the subnet in the specified REGION in the default network is selected.

    Example: projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME

  • TEMPLATE_VERSION : Required. Specify latest for the latest template version, or the date of a specific version, for example, 2023-03-17_v0.1.0-beta (visit gs://dataproc-templates-binaries or run gcloud storage ls gs://dataproc-templates-binaries to list available template versions).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
  • SUBSCRIPTION : Required. Pub/Sub subscription name.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME : Required. Cloud Storage bucket name where output will be stored.

    Note:The output files will be stored in the output/ folder inside the bucket.

  • FORMAT : Required. Output data format. Options: avro or json .

    Note:If avro , you must add " file:///usr/lib/spark/connector/spark-avro.jar " to the jars gcloud CLI flag or API field.

    Example (the file:// prefix references a Serverless for Apache Spark jar file):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
  • DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
  • NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
  • BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
  • SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
  • PROPERTY and PROPERTY_VALUE : Optional. Comma-separated list of Spark property = value pairs.
  • LABEL and LABEL_VALUE : Optional. Comma-separated list of label = value pairs.
  • LOG_LEVEL : Optional. Level of logging. Can be one of ALL , DEBUG , ERROR , FATAL , INFO , OFF , TRACE , or WARN . Default: INFO .
  • KMS_KEY : Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.

    Example: projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME

HTTP method and URL:

POST https://dataproc.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ REGION 
/batches

Request JSON body:

{
  "environmentConfig":{
    "executionConfig":{
      "subnetworkUri":" SUBNET 
",
      "kmsKey": " KMS_KEY 
",
      "serviceAccount": " SERVICE_ACCOUNT 
"
    }
  },
  "labels": {
    " LABEL 
": " LABEL_VALUE 
"
  },
  "runtimeConfig": {
    "version": "1.2",
    "properties": {
      " PROPERTY 
": " PROPERTY_VALUE 
"
    }
  },
  "sparkBatch":{
    "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
    "args":[
      "--template","PUBSUBTOGCS",
      "--templateProperty","log.level= LOG_LEVEL 
",
      "--templateProperty","pubsubtogcs.input.project.id= PUBSUB_SUBSCRIPTION_PROJECT_ID 
",
      "--templateProperty","pubsubtogcs.input.subscription= SUBSCRIPTION 
",
      "--templateProperty","pubsubtogcs.gcs.bucket.name= CLOUD_STORAGE_OUTPUT_BUCKET_NAME 
",
      "--templateProperty","pubsubtogcs.gcs.output.data.format= FORMAT 
",
      "--templateProperty","pubsubtogcs.timeout.ms= TIMEOUT 
",
      "--templateProperty","pubsubtogcs.streaming.duration.seconds= DURATION 
",
      "--templateProperty","pubsubtogcs.total.receivers= NUM_RECEIVERS 
",
      "--templateProperty","pubsubtogcs.batch.size= BATCHSIZE 
"
    ],
    "jarFileUris":[
      "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/ TEMPLATE_VERSION 
/java/dataproc-templates.jar"
    ]
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "name": "projects/ PROJECT_ID 
/regions/ REGION 
/operations/ OPERATION_ID 
",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
    "batch": "projects/ PROJECT_ID 
/locations/ REGION 
/batches/ BATCH_ID 
",
    "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
    "createTime": "2023-02-24T03:31:03.440329Z",
    "operationType": "BATCH",
    "description": "Batch"
  }
}
Create a Mobile Website
View Site in Mobile | Classic
Share by: