Pub/Sub to BigQuery with Python UDF template

The Pub/Sub to BigQuery with Python UDF template is a streaming pipeline that reads JSON-formatted messages from Pub/Sub and writes them to a BigQuery table. Optionally, you can provide a user-defined function (UDF) written in Python to process the incoming messages.

Pipeline requirements

  • The BigQuery table must exist and have a schema.
  • The Pub/Sub message data must use JSON format, or you must provide a UDF that converts the message data to JSON. The JSON data must match the BigQuery table schema. For example, if the JSON payloads are formatted as {"k1":"v1", "k2":"v2"} , the BigQuery table must have two string columns named k1 and k2 .
  • Specify the inputSubscription or inputTopic parameter, but not both.

Template parameters

Required parameters

  • outputTableSpec: The BigQuery table to write to, formatted as PROJECT_ID:DATASET_NAME.TABLE_NAME .

Optional parameters

  • inputTopic: The Pub/Sub topic to read from, formatted as projects/<PROJECT_ID>/topics/<TOPIC_NAME> .
  • inputSubscription: The Pub/Sub subscription to read from, formatted as projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME> .
  • outputDeadletterTable: The BigQuery table to use for messages that failed to reach the output table, formatted as PROJECT_ID:DATASET_NAME.TABLE_NAME . If the table doesn't exist, it is created when the pipeline runs. If this parameter is not specified, the value OUTPUT_TABLE_SPEC_error_records is used instead.
  • useStorageWriteApiAtLeastOnce: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics ( https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics ), set this parameter to true. To use exactly-once semantics, set the parameter to false . This parameter applies only when useStorageWriteApi is true . The default value is false .
  • useStorageWriteApi: If true, the pipeline uses the BigQuery Storage Write API ( https://cloud.google.com/bigquery/docs/write-api ). The default value is false . For more information, see Using the Storage Write API ( https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api ).
  • numStorageWriteApiStreams: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false , then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false , then you must set this parameter.
  • pythonExternalTextTransformGcsPath: The Cloud Storage path pattern for the Python code containing your user-defined functions. For example, gs://your-bucket/your-function.py .
  • pythonExternalTextTransformFunctionName: The name of the function to call from your Python file. Use only letters, digits, and underscores. For example, 'transform' or 'transform_udf1' .

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates .

Function specification

The UDF has the following specification:

  • Input: the Pub/Sub message data field, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint , select a value from the drop-down menu. The default region is us-central1 .

      For a list of regions where you can run a Dataflow job, see Dataflow locations .

    5. From the Dataflow template drop-down menu, select the Pub/Sub to BigQuery with Python UDF template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode , select At Least Once .
    8. Click Run job .

    gcloud

    In your shell or terminal, run the template:

    gcloud  
    dataflow  
    flex-template  
    run  
     JOB_NAME 
      
     \ 
      
    --template-file-gcs-location  
    gs://dataflow-templates- REGION_NAME 
    / VERSION 
    /flex/PubSub_to_BigQuery_Xlang  
     \ 
      
    --region  
     REGION_NAME 
      
     \ 
      
    --staging-location  
     STAGING_LOCATION 
      
     \ 
      
    --parameters  
     \ 
     inputTopic 
     = 
    projects/ PROJECT_ID 
    /topics/ TOPIC_NAME 
    , \ 
     outputTableSpec 
     = 
     PROJECT_ID 
    : DATASET 
    . TABLE_NAME 
    

    Replace the following:

    • JOB_NAME : a unique job name of your choice
    • REGION_NAME : the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION : the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION : the location for staging local files (for example, gs://your-bucket/staging )
    • TOPIC_NAME : your Pub/Sub topic name
    • DATASET : your BigQuery dataset
    • TABLE_NAME : your BigQuery table name

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch .

     POST 
      
     h 
     tt 
     ps 
     : 
     //dataflow.googleapis.com/v1b3/projects/ PROJECT_ID 
    /locations/ LOCATION 
    /flexTemplates:launch { 
      
     "launch_parameter" 
     : 
      
     { 
      
     "jobName" 
     : 
      
     " JOB_NAME 
    " 
     , 
      
     "parameters" 
     : 
      
     { 
      
     "inputTopic" 
     : 
      
     "projects/ PROJECT_ID 
    /subscriptions/ SUBSCRIPTION_NAME 
    " 
     , 
      
     "outputTableSpec" 
     : 
      
     " PROJECT_ID 
    : DATASET 
    . TABLE_NAME 
    " 
      
     }, 
      
     "containerSpecGcsPath" 
     : 
      
     "gs://dataflow-templates- LOCATION 
    / VERSION 
    /flex/PubSub_to_BigQuery_Xlang" 
     , 
      
     } 
     } 
     
    

    Replace the following:

    • PROJECT_ID : the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME : a unique job name of your choice
    • LOCATION : the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION : the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION : the location for staging local files (for example, gs://your-bucket/staging )
    • TOPIC_NAME : your Pub/Sub topic name
    • DATASET : your BigQuery dataset
    • TABLE_NAME : your BigQuery table name

    What's next

    Design a Mobile Site
    View Site in Mobile | Classic
    Share by: