Cloud Pub/Sub to Cloud Storage template
Use the Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage template to extract data from Pub/Sub to Cloud Storage.
Use the template
Run the template using the gcloud CLI or Dataproc API.
gcloud
Before using any of the command data below, make the following replacements:
- PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
- REGION : Required. Compute Engine region .
- SUBNET
: Optional. If a subnet is not specified, the subnet
in the specified REGION in the
default
network is selected.Example:
projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME
- TEMPLATE_VERSION
: Required. Specify
latest
for the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta
(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binaries
to list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
- SUBSCRIPTION : Required. Pub/Sub subscription name.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME
: Required. Cloud Storage bucket name where output will be stored.
Note:The output files will be stored in the
output/
folder inside the bucket. - FORMAT
: Required. Output data format. Options:
avro
orjson
.Note:If
avro
, you must add "file:///usr/lib/spark/connector/spark-avro.jar
" to thejars
gcloud CLI flag or API field.Example (the
file://
prefix references a Serverless for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
- DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
- NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
- BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
- SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
- PROPERTY
and PROPERTY_VALUE
:
Optional. Comma-separated list of Spark property
=
value
pairs. - LABEL
and LABEL_VALUE
:
Optional. Comma-separated list of
label
=value
pairs. - LOG_LEVEL
: Optional. Level of logging. Can be one of
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
, orWARN
. Default:INFO
. - KMS_KEY
: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest
using a Google-owned and Google-managed encryption key.
Example:
projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME
Execute the following command:
Linux, macOS, or Cloud Shell
gcloud dataproc batches submit spark \ --class = com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version = "1.2" \ --project = " PROJECT_ID " \ --region = " REGION " \ --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" \ --subnet = " SUBNET " \ --kms-key = " KMS_KEY " \ --service-account = " SERVICE_ACCOUNT " \ --properties = " PROPERTY = PROPERTY_VALUE " \ --labels = " LABEL = LABEL_VALUE " \ -- --template = PUBSUBTOGCS \ --templateProperty log.level = " LOG_LEVEL " \ --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " \ --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " \ --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " \ --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " \ --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " \ --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " \ --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " \ --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class = com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version = "1.2" ` --project = " PROJECT_ID " ` --region = " REGION " ` --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ` --subnet = " SUBNET " ` --kms-key = " KMS_KEY " ` --service-account = " SERVICE_ACCOUNT " ` --properties = " PROPERTY = PROPERTY_VALUE " ` --labels = " LABEL = LABEL_VALUE " ` -- --template = PUBSUBTOGCS ` --templateProperty log.level = " LOG_LEVEL " ` --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " ` --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " ` --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " ` --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " ` --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " ` --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " ` --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " ` --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class = com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version = "1.2" ^ --project = " PROJECT_ID " ^ --region = " REGION " ^ --jars = "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ^ --subnet = " SUBNET " ^ --kms-key = " KMS_KEY " ^ --service-account = " SERVICE_ACCOUNT " ^ --properties = " PROPERTY = PROPERTY_VALUE " ^ --labels = " LABEL = LABEL_VALUE " ^ -- --template = PUBSUBTOGCS ^ --templateProperty log.level = " LOG_LEVEL " ^ --templateProperty pubsubtogcs.input.project.id = " PUBSUB_SUBSCRIPTION_PROJECT_ID " ^ --templateProperty pubsubtogcs.input.subscription = " SUBSCRIPTION " ^ --templateProperty pubsubtogcs.gcs.bucket.name = " CLOUD_STORAGE_OUTPUT_BUCKET_NAME " ^ --templateProperty pubsubtogcs.gcs.output.data.format = " FORMAT " ^ --templateProperty pubsubtogcs.timeout.ms = " TIMEOUT " ^ --templateProperty pubsubtogcs.streaming.duration.seconds = " DURATION " ^ --templateProperty pubsubtogcs.total.receivers = " NUM_RECEIVERS " ^ --templateProperty pubsubtogcs.batch.size = " BATCHSIZE "
REST
Before using any of the request data, make the following replacements:
- PROJECT_ID : Required. Your Google Cloud project ID listed in the IAM Settings .
- REGION : Required. Compute Engine region .
- SUBNET
: Optional. If a subnet is not specified, the subnet
in the specified REGION in the
default
network is selected.Example:
projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_NAME
- TEMPLATE_VERSION
: Required. Specify
latest
for the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta
(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binaries
to list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID : Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
- SUBSCRIPTION : Required. Pub/Sub subscription name.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME
: Required. Cloud Storage bucket name where output will be stored.
Note:The output files will be stored in the
output/
folder inside the bucket. - FORMAT
: Required. Output data format. Options:
avro
orjson
.Note:If
avro
, you must add "file:///usr/lib/spark/connector/spark-avro.jar
" to thejars
gcloud CLI flag or API field.Example (the
file://
prefix references a Serverless for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT : Optional. Time in milliseconds before termination of stream. Defaults to 60000 .
- DURATION : Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds .
- NUM_RECEIVERS : Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5 .
- BATCHSIZE : Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000 .
- SERVICE_ACCOUNT : Optional. If not provided, the default Compute Engine service account is used.
- PROPERTY
and PROPERTY_VALUE
:
Optional. Comma-separated list of Spark property
=
value
pairs. - LABEL
and LABEL_VALUE
:
Optional. Comma-separated list of
label
=value
pairs. - LOG_LEVEL
: Optional. Level of logging. Can be one of
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
, orWARN
. Default:INFO
. - KMS_KEY
: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest
using a Google-owned and Google-managed encryption key.
Example:
projects/ PROJECT_ID /regions/ REGION /keyRings/ KEY_RING_NAME /cryptoKeys/ KEY_NAME
HTTP method and URL:
POST https://dataproc.googleapis.com/v1/projects/ PROJECT_ID /locations/ REGION /batches
Request JSON body:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":" SUBNET ", "kmsKey": " KMS_KEY ", "serviceAccount": " SERVICE_ACCOUNT " } }, "labels": { " LABEL ": " LABEL_VALUE " }, "runtimeConfig": { "version": "1.2", "properties": { " PROPERTY ": " PROPERTY_VALUE " } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level= LOG_LEVEL ", "--templateProperty","pubsubtogcs.input.project.id= PUBSUB_SUBSCRIPTION_PROJECT_ID ", "--templateProperty","pubsubtogcs.input.subscription= SUBSCRIPTION ", "--templateProperty","pubsubtogcs.gcs.bucket.name= CLOUD_STORAGE_OUTPUT_BUCKET_NAME ", "--templateProperty","pubsubtogcs.gcs.output.data.format= FORMAT ", "--templateProperty","pubsubtogcs.timeout.ms= TIMEOUT ", "--templateProperty","pubsubtogcs.streaming.duration.seconds= DURATION ", "--templateProperty","pubsubtogcs.total.receivers= NUM_RECEIVERS ", "--templateProperty","pubsubtogcs.batch.size= BATCHSIZE " ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/ TEMPLATE_VERSION /java/dataproc-templates.jar" ] } }
To send your request, expand one of these options:
You should receive a JSON response similar to the following:
{ "name": "projects/ PROJECT_ID /regions/ REGION /operations/ OPERATION_ID ", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/ PROJECT_ID /locations/ REGION /batches/ BATCH_ID ", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }