Pub/Sub Subscription to BigQuery template

The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Before running a Dataflow pipeline for this scenario, consider whether a Pub/Sub BigQuery subscription with a UDF meets your requirements.

Pipeline requirements

  • The data field of Pub/Sub messages must use the JSON format, described in this JSON guide . For example, messages with values in the data field formatted as {"k1":"v1", "k2":"v2"} can be inserted into a BigQuery table with two columns, named k1 and k2 , with a string data type.
  • The output table must exist prior to running the pipeline. The table schema must match the input JSON objects.

Template parameters

Required parameters

  • outputTableSpec: The BigQuery output table location, in the format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> .
  • inputSubscription: The Pub/Sub input subscription to read from, in the format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION> .

Optional parameters

  • outputDeadletterTable: The BigQuery table to use for messages that fail to reach the output table, in the format of <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> . If the table doesn't exist, it is created during pipeline execution. If not specified, OUTPUT_TABLE_SPEC_error_records is used.
  • javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example, gs://my-bucket/my-udfs/my_file.js .
  • javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ } , then the function name is myTransform . For sample JavaScript UDFs, see UDF Examples ( https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples ).
  • javascriptTextTransformReloadIntervalMinutes: Define the interval that workers may check for JavaScript UDF changes to reload the files. Defaults to: 0.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates .

Function specification

The UDF has the following specification:

  • Input: the Pub/Sub message data field, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint , select a value from the drop-down menu. The default region is us-central1 .

      For a list of regions where you can run a Dataflow job, see Dataflow locations .

    5. From the Dataflow template drop-down menu, select the Pub/Sub Subscription to BigQuery template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode , select At Least Once .
    8. Click Run job .

    gcloud

    In your shell or terminal, run the template:

    gcloud  
    dataflow  
     jobs 
      
    run  
     JOB_NAME 
      
     \ 
      
    --gcs-location  
    gs://dataflow-templates- REGION_NAME 
    / VERSION 
    /PubSub_Subscription_to_BigQuery  
     \ 
      
    --region  
     REGION_NAME 
      
     \ 
      
    --staging-location  
     STAGING_LOCATION 
      
     \ 
      
    --parameters  
     \ 
     inputSubscription 
     = 
    projects/ PROJECT_ID 
    /subscriptions/ SUBSCRIPTION_NAME 
    , \ 
     outputTableSpec 
     = 
     PROJECT_ID 
    : DATASET 
    . TABLE_NAME 
    , \ 
     outputDeadletterTable 
     = 
     PROJECT_ID 
    : DATASET 
    . TABLE_NAME 
    

    Replace the following:

    • JOB_NAME : a unique job name of your choice
    • REGION_NAME : the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION : the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION : the location for staging local files (for example, gs://your-bucket/staging )
    • SUBSCRIPTION_NAME : your Pub/Sub subscription name
    • DATASET : your BigQuery dataset
    • TABLE_NAME : your BigQuery table name

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch .

     POST 
      
     h 
     tt 
     ps 
     : 
     //dataflow.googleapis.com/v1b3/projects/ PROJECT_ID 
    /locations/ LOCATION 
    /templates:launch?gcsPath=gs://dataflow-templates- LOCATION 
    / VERSION 
    /PubSub_Subscription_to_BigQuery { 
      
     "jobName" 
     : 
      
     " JOB_NAME 
    " 
     , 
      
     "parameters" 
     : 
      
     { 
      
     "inputSubscription" 
     : 
      
     "projects/ PROJECT_ID 
    /subscriptions/ SUBSCRIPTION_NAME 
    " 
     , 
      
     "outputTableSpec" 
     : 
      
     " PROJECT_ID 
    : DATASET 
    . TABLE_NAME 
    " 
      
     }, 
      
     "environment" 
     : 
      
     { 
      
     "ipConfiguration" 
     : 
      
     "WORKER_IP_UNSPECIFIED" 
     , 
      
     "additionalExperiments" 
     : 
      
     [] 
      
     }, 
     } 
     
    

    Replace the following:

    • PROJECT_ID : the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME : a unique job name of your choice
    • LOCATION : the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION : the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION : the location for staging local files (for example, gs://your-bucket/staging )
    • SUBSCRIPTION_NAME : your Pub/Sub subscription name
    • DATASET : your BigQuery dataset
    • TABLE_NAME : your BigQuery table name

    What's next

    Create a Mobile Website
    View Site in Mobile | Classic
    Share by: