Stream messages from Pub/Sub by using Dataflow and Cloud Storage

Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you how to use Dataflow to:

  • Read messages published to a Pub/Sub topic
  • Window (or group) the messages by timestamp
  • Write the messages to Cloud Storage

This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported. This quickstart is also offered as a Google Cloud Skills Boost tutorial which offers temporary credentials to get you started.

You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  4. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  5. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project .

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud  
    services  
     enable 
      
    dataflow.googleapis.com  
      compute.googleapis.com  
      logging.googleapis.com  
      storage-component.googleapis.com  
      storage-api.googleapis.com  
      pubsub.googleapis.com  
      cloudresourcemanager.googleapis.com  
      cloudscheduler.googleapis.com
  8. Set up authentication:

    1. Create the service account:

      gcloud  
      iam  
      service-accounts  
      create  
       SERVICE_ACCOUNT_NAME 
      

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin :

      gcloud  
      projects  
      add-iam-policy-binding  
       PROJECT_ID 
        
      --member = 
       "serviceAccount: SERVICE_ACCOUNT_NAME 
      @ PROJECT_ID 
      .iam.gserviceaccount.com" 
        
      --role = 
       ROLE 
      

      Replace the following:

      • SERVICE_ACCOUNT_NAME : the name of the service account
      • PROJECT_ID : the project ID where you created the service account
      • ROLE : the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud  
      iam  
      service-accounts  
      add-iam-policy-binding  
       SERVICE_ACCOUNT_NAME 
      @ PROJECT_ID 
      .iam.gserviceaccount.com  
      --member = 
       "user: USER_EMAIL 
      " 
        
      --role = 
      roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME : the name of the service account
      • PROJECT_ID : the project ID where you created the service account
      • USER_EMAIL : the email address for a Google Account
  9. Install the Google Cloud CLI.

  10. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  11. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  12. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  13. Verify that billing is enabled for your Google Cloud project .

  14. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud  
    services  
     enable 
      
    dataflow.googleapis.com  
      compute.googleapis.com  
      logging.googleapis.com  
      storage-component.googleapis.com  
      storage-api.googleapis.com  
      pubsub.googleapis.com  
      cloudresourcemanager.googleapis.com  
      cloudscheduler.googleapis.com
  15. Set up authentication:

    1. Create the service account:

      gcloud  
      iam  
      service-accounts  
      create  
       SERVICE_ACCOUNT_NAME 
      

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin :

      gcloud  
      projects  
      add-iam-policy-binding  
       PROJECT_ID 
        
      --member = 
       "serviceAccount: SERVICE_ACCOUNT_NAME 
      @ PROJECT_ID 
      .iam.gserviceaccount.com" 
        
      --role = 
       ROLE 
      

      Replace the following:

      • SERVICE_ACCOUNT_NAME : the name of the service account
      • PROJECT_ID : the project ID where you created the service account
      • ROLE : the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud  
      iam  
      service-accounts  
      add-iam-policy-binding  
       SERVICE_ACCOUNT_NAME 
      @ PROJECT_ID 
      .iam.gserviceaccount.com  
      --member = 
       "user: USER_EMAIL 
      " 
        
      --role = 
      roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME : the name of the service account
      • PROJECT_ID : the project ID where you created the service account
      • USER_EMAIL : the email address for a Google Account
  16. Create local authentication credentials for your user account:

    gcloud  
    auth  
    application-default  
    login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity .

Set up your Pub/Sub project

  1. Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflow region close to where you run the commands in this quickstart. The value of the REGION variable must be a valid region name. For more information about regions and locations, see Dataflow locations .

     BUCKET_NAME 
     = 
     BUCKET_NAME 
     PROJECT_ID 
     = 
     $( 
    gcloud  
    config  
    get-value  
    project ) 
     TOPIC_ID 
     = 
     TOPIC_ID 
     REGION 
     = 
     DATAFLOW_REGION 
     SERVICE_ACCOUNT 
     = 
     SERVICE_ACCOUNT_NAME 
    @ PROJECT_ID 
    .iam.gserviceaccount.com
  2. Create a Cloud Storage bucket owned by this project:

    gcloud  
    storage  
    buckets  
    create  
    gs:// $BUCKET_NAME 
    
  3. Create a Pub/Sub topic in this project:

    gcloud  
    pubsub  
    topics  
    create  
     $TOPIC_ID 
    
  4. Create a Cloud Scheduler job in this project. The job publishes a message to a Pub/Sub topic at one-minute intervals.

    If an App Engine app does not exist for the project, this step will create one.

    gcloud  
    scheduler  
     jobs 
      
    create  
    pubsub  
    publisher-job  
    --schedule = 
     "* * * * *" 
      
     \ 
      
    --topic = 
     $TOPIC_ID 
      
    --message-body = 
     "Hello!" 
      
    --location = 
     $REGION 
    

    Start the job.

    gcloud  
    scheduler  
     jobs 
      
    run  
    publisher-job  
    --location = 
     $REGION 
    
  5. Use the following commands to clone the quickstart repository and navigate to the sample code directory:

    Java

    git  
    clone  
    https://github.com/GoogleCloudPlatform/java-docs-samples.git cd 
      
    java-docs-samples/pubsub/streaming-analytics

    Python

    git  
    clone  
    https://github.com/GoogleCloudPlatform/python-docs-samples.git cd 
      
    python-docs-samples/pubsub/streaming-analytics
    pip  
    install  
    -r  
    requirements.txt  
     # Install Apache Beam dependencies 
    

Stream messages from Pub/Sub to Cloud Storage

Code sample

This sample code uses Dataflow to:

  • Read Pub/Sub messages.
  • Window (or group) messages into fixed-size intervals by publish timestamps.
  • Write the messages in each window to files in Cloud Storage.

Java

  import 
  
 java.io.IOException 
 ; 
 import 
  
 org.apache.beam.examples.common.WriteOneFilePerWindow 
 ; 
 import 
  
 org.apache.beam.sdk.Pipeline 
 ; 
 import 
  
 org.apache.beam.sdk.io.gcp.pubsub.PubsubIO 
 ; 
 import 
  
 org.apache.beam.sdk.options.Default 
 ; 
 import 
  
 org.apache.beam.sdk.options.Description 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptionsFactory 
 ; 
 import 
  
 org.apache.beam.sdk.options.StreamingOptions 
 ; 
 import 
  
 org.apache.beam.sdk.options.Validation.Required 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.windowing.FixedWindows 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.windowing.Window 
 ; 
 import 
  
 org.joda.time.Duration 
 ; 
 public 
  
 class 
 PubSubToGcs 
  
 { 
  
 /* 
 * Define your own configuration options. Add your own arguments to be processed 
 * by the command-line parser, and specify default values for them. 
 */ 
  
 public 
  
 interface 
 PubSubToGcsOptions 
  
 extends 
  
 StreamingOptions 
  
 { 
  
 @Description 
 ( 
 "The Cloud Pub/Sub topic to read from." 
 ) 
  
 @Required 
  
 String 
  
 getInputTopic 
 (); 
  
 void 
  
 setInputTopic 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "Output file's window size in number of minutes." 
 ) 
  
 @Default.Integer 
 ( 
 1 
 ) 
  
 Integer 
  
 getWindowSize 
 (); 
  
 void 
  
 setWindowSize 
 ( 
 Integer 
  
 value 
 ); 
  
 @Description 
 ( 
 "Path of the output file including its filename prefix." 
 ) 
  
 @Required 
  
 String 
  
 getOutput 
 (); 
  
 void 
  
 setOutput 
 ( 
 String 
  
 value 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 // The maximum number of shards when writing output. 
  
 int 
  
 numShards 
  
 = 
  
 1 
 ; 
  
 PubSubToGcsOptions 
  
 options 
  
 = 
  
 PipelineOptionsFactory 
 . 
 fromArgs 
 ( 
 args 
 ). 
 withValidation 
 (). 
 as 
 ( 
 PubSubToGcsOptions 
 . 
 class 
 ); 
  
 options 
 . 
 setStreaming 
 ( 
 true 
 ); 
  
 Pipeline 
  
 pipeline 
  
 = 
  
 Pipeline 
 . 
 create 
 ( 
 options 
 ); 
  
 pipeline 
  
 // 1) Read string messages from a Pub/Sub topic. 
  
 . 
 apply 
 ( 
 "Read PubSub Messages" 
 , 
  
 PubsubIO 
 . 
 readStrings 
 (). 
 fromTopic 
 ( 
 options 
 . 
 getInputTopic 
 ())) 
  
 // 2) Group the messages into fixed-sized minute intervals. 
  
 . 
 apply 
 ( 
 Window 
 . 
 into 
 ( 
 FixedWindows 
 . 
 of 
 ( 
 Duration 
 . 
 standardMinutes 
 ( 
 options 
 . 
 getWindowSize 
 ())))) 
  
 // 3) Write one file to GCS for every window of messages. 
  
 . 
 apply 
 ( 
 "Write Files to GCS" 
 , 
  
 new 
  
 WriteOneFilePerWindow 
 ( 
 options 
 . 
 getOutput 
 (), 
  
 numShards 
 )); 
  
 // Execute the pipeline and wait until it finishes running. 
  
 pipeline 
 . 
 run 
 (). 
 waitUntilFinish 
 (); 
  
 } 
 } 
 

Python

  import 
  
 argparse 
 from 
  
 datetime 
  
 import 
 datetime 
 import 
  
 logging 
 import 
  
 random 
 from 
  
 apache_beam 
  
 import 
 ( 
 DoFn 
 , 
 GroupByKey 
 , 
 io 
 , 
 ParDo 
 , 
 Pipeline 
 , 
 PTransform 
 , 
 WindowInto 
 , 
 WithKeys 
 , 
 ) 
 from 
  
 apache_beam.options.pipeline_options 
  
 import 
 PipelineOptions 
 from 
  
 apache_beam.transforms.window 
  
 import 
 FixedWindows 
 class 
  
 GroupMessagesByFixedWindows 
 ( 
 PTransform 
 ): 
  
 """A composite transform that groups Pub/Sub messages based on publish time 
 and outputs a list of tuples, each containing a message and its publish time. 
 """ 
 def 
  
 __init__ 
 ( 
 self 
 , 
 window_size 
 , 
 num_shards 
 = 
 5 
 ): 
 # Set window size to 60 seconds. 
 self 
 . 
 window_size 
 = 
 int 
 ( 
 window_size 
 * 
 60 
 ) 
 self 
 . 
 num_shards 
 = 
 num_shards 
 def 
  
 expand 
 ( 
 self 
 , 
 pcoll 
 ): 
 return 
 ( 
 pcoll 
 # Bind window info to each element using element timestamp (or publish time). 
 | 
 "Window into fixed intervals" 
>> WindowInto 
 ( 
 FixedWindows 
 ( 
 self 
 . 
 window_size 
 )) 
 | 
 "Add timestamp to windowed elements" 
>> ParDo 
 ( 
 AddTimestamp 
 ()) 
 # Assign a random key to each windowed element based on the number of shards. 
 | 
 "Add key" 
>> WithKeys 
 ( 
 lambda 
 _ 
 : 
 random 
 . 
 randint 
 ( 
 0 
 , 
 self 
 . 
 num_shards 
 - 
 1 
 )) 
 # Group windowed elements by key. All the elements in the same window must fit 
 # memory for this. If not, you need to use `beam.util.BatchElements`. 
 | 
 "Group by key" 
>> GroupByKey 
 () 
 ) 
 class 
  
 AddTimestamp 
 ( 
 DoFn 
 ): 
 def 
  
 process 
 ( 
 self 
 , 
 element 
 , 
 publish_time 
 = 
 DoFn 
 . 
 TimestampParam 
 ): 
  
 """Processes each windowed element by extracting the message body and its 
 publish time into a tuple. 
 """ 
 yield 
 ( 
 element 
 . 
 decode 
 ( 
 "utf-8" 
 ), 
 datetime 
 . 
 utcfromtimestamp 
 ( 
 float 
 ( 
 publish_time 
 )) 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 ), 
 ) 
 class 
  
 WriteToGCS 
 ( 
 DoFn 
 ): 
 def 
  
 __init__ 
 ( 
 self 
 , 
 output_path 
 ): 
 self 
 . 
 output_path 
 = 
 output_path 
 def 
  
 process 
 ( 
 self 
 , 
 key_value 
 , 
 window 
 = 
 DoFn 
 . 
 WindowParam 
 ): 
  
 """Write messages in a batch to Google Cloud Storage.""" 
 ts_format 
 = 
 "%H:%M" 
 window_start 
 = 
 window 
 . 
 start 
 . 
 to_utc_datetime 
 () 
 . 
 strftime 
 ( 
 ts_format 
 ) 
 window_end 
 = 
 window 
 . 
 end 
 . 
 to_utc_datetime 
 () 
 . 
 strftime 
 ( 
 ts_format 
 ) 
 shard_id 
 , 
 batch 
 = 
 key_value 
 filename 
 = 
 "-" 
 . 
 join 
 ([ 
 self 
 . 
 output_path 
 , 
 window_start 
 , 
 window_end 
 , 
 str 
 ( 
 shard_id 
 )]) 
 with 
 io 
 . 
 gcsio 
 . 
 GcsIO 
 () 
 . 
 open 
 ( 
 filename 
 = 
 filename 
 , 
 mode 
 = 
 "w" 
 ) 
 as 
 f 
 : 
 for 
 message_body 
 , 
 publish_time 
 in 
 batch 
 : 
 f 
 . 
 write 
 ( 
 f 
 " 
 { 
 message_body 
 } 
 , 
 { 
 publish_time 
 } 
 \n 
 " 
 . 
 encode 
 ()) 
 def 
  
 run 
 ( 
 input_topic 
 , 
 output_path 
 , 
 window_size 
 = 
 1.0 
 , 
 num_shards 
 = 
 5 
 , 
 pipeline_args 
 = 
 None 
 ): 
 # Set `save_main_session` to True so DoFns can access globally imported modules. 
 pipeline_options 
 = 
 PipelineOptions 
 ( 
 pipeline_args 
 , 
 streaming 
 = 
 True 
 , 
 save_main_session 
 = 
 True 
 ) 
 with 
 Pipeline 
 ( 
 options 
 = 
 pipeline_options 
 ) 
 as 
 pipeline 
 : 
 ( 
 pipeline 
 # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam 
 # binds the publish time returned by the Pub/Sub server for each message 
 # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. 
 # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub 
 | 
 "Read from Pub/Sub" 
>> io 
 . 
 ReadFromPubSub 
 ( 
 topic 
 = 
 input_topic 
 ) 
 | 
 "Window into" 
>> GroupMessagesByFixedWindows 
 ( 
 window_size 
 , 
 num_shards 
 ) 
 | 
 "Write to GCS" 
>> ParDo 
 ( 
 WriteToGCS 
 ( 
 output_path 
 )) 
 ) 
 if 
 __name__ 
 == 
 "__main__" 
 : 
 logging 
 . 
 getLogger 
 () 
 . 
 setLevel 
 ( 
 logging 
 . 
 INFO 
 ) 
 parser 
 = 
 argparse 
 . 
 ArgumentParser 
 () 
 parser 
 . 
 add_argument 
 ( 
 "--input_topic" 
 , 
 help 
 = 
 "The Cloud Pub/Sub topic to read from." 
 '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".' 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--window_size" 
 , 
 type 
 = 
 float 
 , 
 default 
 = 
 1.0 
 , 
 help 
 = 
 "Output file's window size in minutes." 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--output_path" 
 , 
 help 
 = 
 "Path of the output GCS file including the prefix." 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--num_shards" 
 , 
 type 
 = 
 int 
 , 
 default 
 = 
 5 
 , 
 help 
 = 
 "Number of shards to use when writing windowed elements to GCS." 
 , 
 ) 
 known_args 
 , 
 pipeline_args 
 = 
 parser 
 . 
 parse_known_args 
 () 
 run 
 ( 
 known_args 
 . 
 input_topic 
 , 
 known_args 
 . 
 output_path 
 , 
 known_args 
 . 
 window_size 
 , 
 known_args 
 . 
 num_shards 
 , 
 pipeline_args 
 , 
 ) 
 

Start the pipeline

To start the pipeline, run the following command:

Java

mvn  
compile  
exec:java  
 \ 
  
-Dexec.mainClass = 
com.examples.pubsub.streaming.PubSubToGcs  
 \ 
  
-Dexec.cleanupDaemonThreads = 
 false 
  
 \ 
  
-Dexec.args = 
 " \ 
 --project= 
 $PROJECT_ID 
 \ 
 --region= 
 $REGION 
 \ 
 --inputTopic=projects/ 
 $PROJECT_ID 
 /topics/ 
 $TOPIC_ID 
 \ 
 --output=gs:// 
 $BUCKET_NAME 
 /samples/output \ 
 --gcpTempLocation=gs:// 
 $BUCKET_NAME 
 /temp \ 
 --runner=DataflowRunner \ 
 --windowSize=2 \ 
 --serviceAccount= 
 $SERVICE_ACCOUNT 
 " 

Python

python  
PubSubToGCS.py  
 \ 
  
--project = 
 $PROJECT_ID 
  
 \ 
  
--region = 
 $REGION 
  
 \ 
  
--input_topic = 
projects/ $PROJECT_ID 
/topics/ $TOPIC_ID 
  
 \ 
  
--output_path = 
gs:// $BUCKET_NAME 
/samples/output  
 \ 
  
--runner = 
DataflowRunner  
 \ 
  
--window_size = 
 2 
  
 \ 
  
--num_shards = 
 2 
  
 \ 
  
--temp_location = 
gs:// $BUCKET_NAME 
/temp  
 \ 
  
--service_account_email = 
 $SERVICE_ACCOUNT 

The preceding command runs locally and launches a Dataflow job that runs in the cloud. When the command returns JOB_MESSAGE_DETAILED: Workers have started successfully , exit the local program using Ctrl+C .

Observe job and pipeline progress

You can observe the job's progress in the Dataflow console.

Go to the Dataflow console

Observe the job's progress

Open the job details view to see:

  • Job structure
  • Job logs
  • Stage metrics

Observe the job's progress

You may have to wait a few minutes to see the output files in Cloud Storage.

Observe the job's progress

Alternatively, use the command line below to check which files have been written out.

gcloud  
storage  
ls  
gs:// ${ 
 BUCKET_NAME 
 } 
/samples/

The output should look like the following:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

  1. Delete the Cloud Scheduler job.

    gcloud  
    scheduler  
     jobs 
      
    delete  
    publisher-job  
    --location = 
     $REGION 
    
  2. In the Dataflow console, stop the job. Cancel the pipeline without draining it.

  3. Delete the topic.

    gcloud  
    pubsub  
    topics  
    delete  
     $TOPIC_ID 
    
  4. Delete the files created by the pipeline.

    gcloud  
    storage  
    rm  
     "gs:// 
     ${ 
     BUCKET_NAME 
     } 
     /samples/output*" 
      
    --recursive  
    --continue-on-error
    gcloud  
    storage  
    rm  
     "gs:// 
     ${ 
     BUCKET_NAME 
     } 
     /temp/*" 
      
    --recursive  
    --continue-on-error
  5. Remove the Cloud Storage bucket.

    gcloud  
    storage  
    rm  
    gs:// ${ 
     BUCKET_NAME 
     } 
      
    --recursive
  6. Delete the service account:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL 
    
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud  
    auth  
    application-default  
    revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud  
    auth  
    revoke

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: