Pub/Sub to Cloud Storage using Dataflow

Stream Pub/Sub messages to Cloud Storage using Dataflow.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 java.io.IOException 
 ; 
 import 
  
 org.apache.beam.examples.common.WriteOneFilePerWindow 
 ; 
 import 
  
 org.apache.beam.sdk.Pipeline 
 ; 
 import 
  
 org.apache.beam.sdk.io.gcp.pubsub.PubsubIO 
 ; 
 import 
  
 org.apache.beam.sdk.options.Default 
 ; 
 import 
  
 org.apache.beam.sdk.options.Description 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptionsFactory 
 ; 
 import 
  
 org.apache.beam.sdk.options.StreamingOptions 
 ; 
 import 
  
 org.apache.beam.sdk.options.Validation.Required 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.windowing.FixedWindows 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.windowing.Window 
 ; 
 import 
  
 org.joda.time.Duration 
 ; 
 public 
  
 class 
 PubSubToGcs 
  
 { 
  
 /* 
 * Define your own configuration options. Add your own arguments to be processed 
 * by the command-line parser, and specify default values for them. 
 */ 
  
 public 
  
 interface 
 PubSubToGcsOptions 
  
 extends 
  
 StreamingOptions 
  
 { 
  
 @Description 
 ( 
 "The Cloud Pub/Sub topic to read from." 
 ) 
  
 @Required 
  
 String 
  
 getInputTopic 
 (); 
  
 void 
  
 setInputTopic 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "Output file's window size in number of minutes." 
 ) 
  
 @Default.Integer 
 ( 
 1 
 ) 
  
 Integer 
  
 getWindowSize 
 (); 
  
 void 
  
 setWindowSize 
 ( 
 Integer 
  
 value 
 ); 
  
 @Description 
 ( 
 "Path of the output file including its filename prefix." 
 ) 
  
 @Required 
  
 String 
  
 getOutput 
 (); 
  
 void 
  
 setOutput 
 ( 
 String 
  
 value 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 // The maximum number of shards when writing output. 
  
 int 
  
 numShards 
  
 = 
  
 1 
 ; 
  
 PubSubToGcsOptions 
  
 options 
  
 = 
  
 PipelineOptionsFactory 
 . 
 fromArgs 
 ( 
 args 
 ). 
 withValidation 
 (). 
 as 
 ( 
 PubSubToGcsOptions 
 . 
 class 
 ); 
  
 options 
 . 
 setStreaming 
 ( 
 true 
 ); 
  
 Pipeline 
  
 pipeline 
  
 = 
  
 Pipeline 
 . 
 create 
 ( 
 options 
 ); 
  
 pipeline 
  
 // 1) Read string messages from a Pub/Sub topic. 
  
 . 
 apply 
 ( 
 "Read PubSub Messages" 
 , 
  
 PubsubIO 
 . 
 readStrings 
 (). 
 fromTopic 
 ( 
 options 
 . 
 getInputTopic 
 ())) 
  
 // 2) Group the messages into fixed-sized minute intervals. 
  
 . 
 apply 
 ( 
 Window 
 . 
 into 
 ( 
 FixedWindows 
 . 
 of 
 ( 
 Duration 
 . 
 standardMinutes 
 ( 
 options 
 . 
 getWindowSize 
 ())))) 
  
 // 3) Write one file to GCS for every window of messages. 
  
 . 
 apply 
 ( 
 "Write Files to GCS" 
 , 
  
 new 
  
 WriteOneFilePerWindow 
 ( 
 options 
 . 
 getOutput 
 (), 
  
 numShards 
 )); 
  
 // Execute the pipeline and wait until it finishes running. 
  
 pipeline 
 . 
 run 
 (). 
 waitUntilFinish 
 (); 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 argparse 
 from 
  
 datetime 
  
 import 
 datetime 
 import 
  
 logging 
 import 
  
 random 
 from 
  
 apache_beam 
  
 import 
 ( 
 DoFn 
 , 
 GroupByKey 
 , 
 io 
 , 
 ParDo 
 , 
 Pipeline 
 , 
 PTransform 
 , 
 WindowInto 
 , 
 WithKeys 
 , 
 ) 
 from 
  
 apache_beam.options.pipeline_options 
  
 import 
 PipelineOptions 
 from 
  
 apache_beam.transforms.window 
  
 import 
 FixedWindows 
 class 
  
 GroupMessagesByFixedWindows 
 ( 
 PTransform 
 ): 
  
 """A composite transform that groups Pub/Sub messages based on publish time 
 and outputs a list of tuples, each containing a message and its publish time. 
 """ 
 def 
  
 __init__ 
 ( 
 self 
 , 
 window_size 
 , 
 num_shards 
 = 
 5 
 ): 
 # Set window size to 60 seconds. 
 self 
 . 
 window_size 
 = 
 int 
 ( 
 window_size 
 * 
 60 
 ) 
 self 
 . 
 num_shards 
 = 
 num_shards 
 def 
  
 expand 
 ( 
 self 
 , 
 pcoll 
 ): 
 return 
 ( 
 pcoll 
 # Bind window info to each element using element timestamp (or publish time). 
 | 
 "Window into fixed intervals" 
>> WindowInto 
 ( 
 FixedWindows 
 ( 
 self 
 . 
 window_size 
 )) 
 | 
 "Add timestamp to windowed elements" 
>> ParDo 
 ( 
 AddTimestamp 
 ()) 
 # Assign a random key to each windowed element based on the number of shards. 
 | 
 "Add key" 
>> WithKeys 
 ( 
 lambda 
 _ 
 : 
 random 
 . 
 randint 
 ( 
 0 
 , 
 self 
 . 
 num_shards 
 - 
 1 
 )) 
 # Group windowed elements by key. All the elements in the same window must fit 
 # memory for this. If not, you need to use `beam.util.BatchElements`. 
 | 
 "Group by key" 
>> GroupByKey 
 () 
 ) 
 class 
  
 AddTimestamp 
 ( 
 DoFn 
 ): 
 def 
  
 process 
 ( 
 self 
 , 
 element 
 , 
 publish_time 
 = 
 DoFn 
 . 
 TimestampParam 
 ): 
  
 """Processes each windowed element by extracting the message body and its 
 publish time into a tuple. 
 """ 
 yield 
 ( 
 element 
 . 
 decode 
 ( 
 "utf-8" 
 ), 
 datetime 
 . 
 utcfromtimestamp 
 ( 
 float 
 ( 
 publish_time 
 )) 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 ), 
 ) 
 class 
  
 WriteToGCS 
 ( 
 DoFn 
 ): 
 def 
  
 __init__ 
 ( 
 self 
 , 
 output_path 
 ): 
 self 
 . 
 output_path 
 = 
 output_path 
 def 
  
 process 
 ( 
 self 
 , 
 key_value 
 , 
 window 
 = 
 DoFn 
 . 
 WindowParam 
 ): 
  
 """Write messages in a batch to Google Cloud Storage.""" 
 ts_format 
 = 
 "%H:%M" 
 window_start 
 = 
 window 
 . 
 start 
 . 
 to_utc_datetime 
 () 
 . 
 strftime 
 ( 
 ts_format 
 ) 
 window_end 
 = 
 window 
 . 
 end 
 . 
 to_utc_datetime 
 () 
 . 
 strftime 
 ( 
 ts_format 
 ) 
 shard_id 
 , 
 batch 
 = 
 key_value 
 filename 
 = 
 "-" 
 . 
 join 
 ([ 
 self 
 . 
 output_path 
 , 
 window_start 
 , 
 window_end 
 , 
 str 
 ( 
 shard_id 
 )]) 
 with 
 io 
 . 
 gcsio 
 . 
 GcsIO 
 () 
 . 
 open 
 ( 
 filename 
 = 
 filename 
 , 
 mode 
 = 
 "w" 
 ) 
 as 
 f 
 : 
 for 
 message_body 
 , 
 publish_time 
 in 
 batch 
 : 
 f 
 . 
 write 
 ( 
 f 
 " 
 { 
 message_body 
 } 
 , 
 { 
 publish_time 
 } 
 \n 
 " 
 . 
 encode 
 ()) 
 def 
  
 run 
 ( 
 input_topic 
 , 
 output_path 
 , 
 window_size 
 = 
 1.0 
 , 
 num_shards 
 = 
 5 
 , 
 pipeline_args 
 = 
 None 
 ): 
 # Set `save_main_session` to True so DoFns can access globally imported modules. 
 pipeline_options 
 = 
 PipelineOptions 
 ( 
 pipeline_args 
 , 
 streaming 
 = 
 True 
 , 
 save_main_session 
 = 
 True 
 ) 
 with 
 Pipeline 
 ( 
 options 
 = 
 pipeline_options 
 ) 
 as 
 pipeline 
 : 
 ( 
 pipeline 
 # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam 
 # binds the publish time returned by the Pub/Sub server for each message 
 # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. 
 # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub 
 | 
 "Read from Pub/Sub" 
>> io 
 . 
 ReadFromPubSub 
 ( 
 topic 
 = 
 input_topic 
 ) 
 | 
 "Window into" 
>> GroupMessagesByFixedWindows 
 ( 
 window_size 
 , 
 num_shards 
 ) 
 | 
 "Write to GCS" 
>> ParDo 
 ( 
 WriteToGCS 
 ( 
 output_path 
 )) 
 ) 
 if 
 __name__ 
 == 
 "__main__" 
 : 
 logging 
 . 
 getLogger 
 () 
 . 
 setLevel 
 ( 
 logging 
 . 
 INFO 
 ) 
 parser 
 = 
 argparse 
 . 
 ArgumentParser 
 () 
 parser 
 . 
 add_argument 
 ( 
 "--input_topic" 
 , 
 help 
 = 
 "The Cloud Pub/Sub topic to read from." 
 '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".' 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--window_size" 
 , 
 type 
 = 
 float 
 , 
 default 
 = 
 1.0 
 , 
 help 
 = 
 "Output file's window size in minutes." 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--output_path" 
 , 
 help 
 = 
 "Path of the output GCS file including the prefix." 
 , 
 ) 
 parser 
 . 
 add_argument 
 ( 
 "--num_shards" 
 , 
 type 
 = 
 int 
 , 
 default 
 = 
 5 
 , 
 help 
 = 
 "Number of shards to use when writing windowed elements to GCS." 
 , 
 ) 
 known_args 
 , 
 pipeline_args 
 = 
 parser 
 . 
 parse_known_args 
 () 
 run 
 ( 
 known_args 
 . 
 input_topic 
 , 
 known_args 
 . 
 output_path 
 , 
 known_args 
 . 
 window_size 
 , 
 known_args 
 . 
 num_shards 
 , 
 pipeline_args 
 , 
 ) 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: