Batch messaging

Batch messaging creates a publisher client with custom batching settings and uses it to publish some messages.

This document provides information about using batch messaging with messages published to a topic.

Before you begin

Before configuring the publish workflow, ensure you have completed the following tasks:

Required roles

To get the permissions that you need to publish messages to a topic, ask your administrator to grant you the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on the topic. For more information about granting roles, see Manage access to projects, folders, and organizations .

You might also be able to get the required permissions through custom roles or other predefined roles .

You need additional permissions to create or update topics and subscriptions.

Use batch messaging

See the following code samples to learn how to configure batch messaging settings for your publisher.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 ) 
  
 { 
  
 auto 
  
 topic 
  
 = 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )); 
  
 // By default, the publisher will flush a batch after 10ms, after it 
  
 // contains more than 100 message, or after it contains more than 1MiB of 
  
 // data, whichever comes first. This changes those defaults. 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
  
 std 
 :: 
 move 
 ( 
 topic 
 ), 
  
 Options 
 {} 
  
 . 
 set<pubsub 
 :: 
 MaxHoldTimeOption 
> ( 
 std 
 :: 
 chrono 
 :: 
 milliseconds 
 ( 
 20 
 )) 
  
 . 
 set<pubsub 
 :: 
 MaxBatchBytesOption 
> ( 
 4 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 . 
 set<pubsub 
 :: 
 MaxBatchMessagesOption 
> ( 
 200 
 ))); 
  
 std 
 :: 
 vector<future<void> 
>  
 ids 
 ; 
  
 for 
  
 ( 
 char 
  
 const 
 * 
  
 data 
  
 : 
  
 { 
 "1" 
 , 
  
 "2" 
 , 
  
 "3" 
 , 
  
 "go!" 
 }) 
  
 { 
  
 ids 
 . 
 push_back 
 ( 
  
 publisher 
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 (). 
 SetData 
 ( 
 data 
 ). 
 Build 
 ()) 
  
 . 
 then 
 ([ 
 data 
 ]( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 { 
  
 auto 
  
 s 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 s 
 ) 
  
 return 
 ; 
  
 std 
 :: 
 cout 
 << 
 "Sent '" 
 << 
 data 
 << 
 "' (" 
 << 
 * 
 s 
 << 
 ") 
 \n 
 " 
 ; 
  
 })); 
  
 } 
  
 publisher 
 . 
 Flush 
 (); 
  
 // Block until they are actually sent. 
  
 for 
  
 ( 
 auto 
&  
 id 
  
 : 
  
 ids 
 ) 
  
 id 
 . 
 get 
 (); 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Api.Gax 
 
 ; 
 using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 using 
  
 System.Linq 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PublishBatchedMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PublishBatchMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 IEnumerable<string> 
  
 messageTexts 
 ) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
  FromProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 // Default Settings: 
  
 // byteCountThreshold: 1000000 
  
 // elementCountThreshold: 100 
  
 // delayThreshold: 10 milliseconds 
  
 var 
  
 customSettings 
  
 = 
  
 new 
  
 PublisherClient 
 . 
 Settings 
  
 { 
  
 BatchingSettings 
  
 = 
  
 new 
  
  BatchingSettings 
 
 ( 
  
 elementCountThreshold 
 : 
  
 50 
 , 
  
 byteCountThreshold 
 : 
  
 10240 
 , 
  
 delayThreshold 
 : 
  
 TimeSpan 
 . 
 FromMilliseconds 
 ( 
 500 
 )) 
  
 }; 
  
  PublisherClient 
 
  
 publisher 
  
 = 
  
 await 
  
 new 
  
  PublisherClientBuilder 
 
  
 { 
  
 TopicName 
  
 = 
  
 topicName 
 , 
  
 Settings 
  
 = 
  
 customSettings 
  
 }. 
 BuildAsync 
 (); 
  
 int 
  
 publishedMessageCount 
  
 = 
  
 0 
 ; 
  
 var 
  
 publishTasks 
  
 = 
  
 messageTexts 
 . 
 Select 
 ( 
 async 
  
 text 
  
 = 
>  
 { 
  
 try 
  
 { 
  
 string 
  
 message 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 text 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Published message {message}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 publishedMessageCount 
 ); 
  
 } 
  
 catch 
  
 ( 
 Exception 
  
 exception 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"An error occurred when publishing message {text}: {exception.Message}" 
 ); 
  
 } 
  
 }); 
  
 await 
  
 Task 
 . 
 WhenAll 
 ( 
 publishTasks 
 ); 
  
 return 
  
 publishedMessageCount 
 ; 
  
 } 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "strconv" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 publishWithSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 publisher 
 . 
 PublishSettings 
 . 
 ByteThreshold 
  
 = 
  
 5000 
  
 publisher 
 . 
 PublishSettings 
 . 
 CountThreshold 
  
 = 
  
 10 
  
 publisher 
 . 
 PublishSettings 
 . 
 DelayThreshold 
  
 = 
  
 100 
  
 * 
  
 time 
 . 
 Millisecond 
  
 var 
  
 results 
  
 [] 
 * 
 pubsub 
 . 
 PublishResult 
  
 var 
  
 resultErrors 
  
 [] 
 error 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 10 
 ; 
  
 i 
 ++ 
  
 { 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "Message " 
  
 + 
  
 strconv 
 . 
 Itoa 
 ( 
 i 
 )), 
  
 }) 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 result 
 ) 
  
 } 
  
 // The Get method blocks until a server-generated ID or 
  
 // an error is returned for the published message. 
  
 for 
  
 i 
 , 
  
 res 
  
 := 
  
 range 
  
 results 
  
 { 
  
 id 
 , 
  
 err 
  
 := 
  
 res 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 resultErrors 
  
 = 
  
 append 
 ( 
 resultErrors 
 , 
  
 err 
 ) 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Failed to publish: %v" 
 , 
  
 err 
 ) 
  
 continue 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Published message %d; msg ID: %v\n" 
 , 
  
 i 
 , 
  
 id 
 ) 
  
 } 
  
 if 
  
 len 
 ( 
 resultErrors 
 ) 
  
 != 
  
 0 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Get: %v" 
 , 
  
 resultErrors 
 [ 
 len 
 ( 
 resultErrors 
 ) 
 - 
 1 
 ]) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Published messages with batch settings." 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. BatchingSettings 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 org.threeten.bp. Duration 
 
 ; 
 public 
  
 class 
 PublishWithBatchSettingsExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishWithBatchSettingsExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishWithBatchSettingsExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 List<ApiFuture<String> 
>  
 messageIdFutures 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 try 
  
 { 
  
 // Batch settings control how the publisher batches messages 
  
 long 
  
 requestBytesThreshold 
  
 = 
  
 5000L 
 ; 
  
 // default : 1000 bytes 
  
 long 
  
 messageCountBatchSize 
  
 = 
  
 100L 
 ; 
  
 // default : 100 message 
  
  Duration 
 
  
 publishDelayThreshold 
  
 = 
  
  Duration 
 
 . 
 ofMillis 
 ( 
 100 
 ); 
  
 // default : 1 ms 
  
 // Publish request get triggered based on request size, messages count & time since last 
  
 // publish, whichever condition is met first. 
  
  BatchingSettings 
 
  
 batchingSettings 
  
 = 
  
  BatchingSettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setElementCountThreshold 
 
 ( 
 messageCountBatchSize 
 ) 
  
 . 
  setRequestByteThreshold 
 
 ( 
 requestBytesThreshold 
 ) 
  
 . 
  setDelayThreshold 
 
 ( 
 publishDelayThreshold 
 ) 
  
 . 
 build 
 (); 
  
 // Create a publisher instance with default settings bound to the topic 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 setBatchingSettings 
 ( 
 batchingSettings 
 ). 
 build 
 (); 
  
 // schedule publishing one message at a time : messages get automatically batched 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 100 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 String 
  
 message 
  
 = 
  
 "message " 
  
 + 
  
 i 
 ; 
  
  ByteString 
 
  
 data 
  
 = 
  
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (). 
  setData 
 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique within the topic) 
  
 ApiFuture<String> 
  
 messageIdFuture 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 pubsubMessage 
 ); 
  
 messageIdFutures 
 . 
 add 
 ( 
 messageIdFuture 
 ); 
  
 } 
  
 } 
  
 finally 
  
 { 
  
 // Wait on any pending publish requests. 
  
 List<String> 
  
 messageIds 
  
 = 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 messageIdFutures 
 ). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Published " 
  
 + 
  
 messageIds 
 . 
 size 
 () 
  
 + 
  
 " messages with batch settings." 
 ); 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shutdown to free up resources. 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const topicName = 'YOUR_TOPIC_NAME'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // const maxMessages = 10; 
 // const maxWaitTime = 10; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 async 
  
 function 
  
 publishBatchedMessages 
 ( 
  
 topicNameOrId 
 , 
  
 data 
 , 
  
 maxMessages 
 , 
  
 maxWaitTime 
 , 
 ) 
  
 { 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
  from 
 
 ( 
 data 
 ); 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 publishOptions 
  
 = 
  
 { 
  
 batching 
 : 
  
 { 
  
 maxMessages 
 : 
  
 maxMessages 
 , 
  
 maxMilliseconds 
 : 
  
 maxWaitTime 
  
 * 
  
 1000 
 , 
  
 }, 
  
 }; 
  
 const 
  
 batchPublisher 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 publishOptions 
 ); 
  
 const 
  
 promises 
  
 = 
  
 []; 
  
 for 
  
 ( 
 let 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 10 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 promises 
 . 
 push 
 ( 
  
 ( 
 async 
  
 () 
  
 = 
>  
 { 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 batchPublisher 
 . 
 publishMessage 
 ({ 
  
 data 
 : 
  
 dataBuffer 
 , 
  
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
  
 })(), 
  
 ); 
  
 } 
  
 await 
  
  Promise 
 
 . 
 all 
 ( 
 promises 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const topicName = 'YOUR_TOPIC_NAME'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // const maxMessages = 10; 
 // const maxWaitTime = 10; 
 // Imports the Google Cloud client library 
 import 
  
 { 
 PublishOptions 
 , 
  
 PubSub 
 } 
  
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 publishBatchedMessages 
 ( 
  
 topicNameOrId 
 : 
  
 string 
 , 
  
 data 
 : 
  
 string 
 , 
  
 maxMessages 
 : 
  
 number 
 , 
  
 maxWaitTime 
 : 
  
 number 
 , 
 ) 
  
 { 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
 from 
 ( 
 data 
 ); 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 publishOptions 
 : 
  
 PublishOptions 
  
 = 
  
 { 
  
 batching 
 : 
  
 { 
  
 maxMessages 
 : 
  
 maxMessages 
 , 
  
 maxMilliseconds 
 : 
  
 maxWaitTime 
  
 * 
  
 1000 
 , 
  
 }, 
  
 }; 
  
 const 
  
 batchPublisher 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 publishOptions 
 ); 
  
 const 
  
 promises 
 : 
  
 Promise<void> 
 [] 
  
 = 
  
 []; 
  
 for 
  
 ( 
 let 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 10 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 promises 
 . 
 push 
 ( 
  
 ( 
 async 
  
 () 
  
 = 
>  
 { 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 batchPublisher 
 . 
 publishMessage 
 ({ 
  
 data 
 : 
  
 dataBuffer 
 , 
  
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
  
 })(), 
  
 ); 
  
 } 
  
 await 
  
 Promise 
 . 
 all 
 ( 
 promises 
 ); 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub PHP API reference documentation .

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Publishes a message for a Pub/Sub topic. 
 * 
 * The publisher should be used in conjunction with the `google-cloud-batch` 
 * daemon, which should be running in the background. 
 * 
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $message    The message to publish. 
 */ 
 function publish_message_batch($projectId, $topicName, $message) 
 { 
 // Check if the batch daemon is running. 
 if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') { 
 trigger_error( 
 'The batch daemon is not running. Call ' . 
 '`vendor/bin/google-cloud-batch daemon` from ' . 
 'your project root to start the daemon.', 
 E_USER_NOTICE 
 ); 
 } 
 $batchOptions = [ 
 'batchSize' => 100, // Max messages for each batch. 
 'callPeriod' => 0.01, // Max time in seconds between each batch publish. 
 ]; 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $topic = $pubsub->topic($topicName); 
 $publisher = $topic->batchPublisher([ 
 'batchOptions' => $batchOptions 
 ]); 
 for ($i = 0; $i < 10; $i++) { 
 $publisher->publish(['data' => $message]); 
 } 
 print('Messages enqueued for publication.' . PHP_EOL); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 concurrent 
  
 import 
 futures 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # Configure the batch to publish as soon as there are 10 messages 
 # or 1 KiB of data, or 1 second has passed. 
 batch_settings 
 = 
 pubsub_v1 
 . 
 types 
 . 
  BatchSettings 
 
 ( 
 max_messages 
 = 
 10 
 , 
 # default 100 
 max_bytes 
 = 
 1024 
 , 
 # default 1 MB 
 max_latency 
 = 
 1 
 , 
 # default 10 ms 
 ) 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 ( 
 batch_settings 
 ) 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 publish_futures 
 = 
 [] 
 # Resolve the publish future in a separate thread. 
 def 
  
 callback 
 ( 
 future 
 : 
 pubsub_v1 
 . 
 publisher 
 . 
 futures 
 . 
 Future 
 ) 
 - 
> None 
 : 
 message_id 
 = 
 future 
 . 
  result 
 
 () 
 print 
 ( 
 message_id 
 ) 
 for 
 n 
 in 
 range 
 ( 
 1 
 , 
 10 
 ): 
 data_str 
 = 
 f 
 "Message number 
 { 
 n 
 } 
 " 
 # Data must be a bytestring 
 data 
 = 
 data_str 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 publish_future 
 = 
  publish 
 
er . 
  publish 
 
 ( 
 topic_path 
 , 
 data 
 ) 
 # Non-blocking. Allow the publisher client to batch multiple messages. 
 publish_future 
 . 
 add_done_callback 
 ( 
 callback 
 ) 
 publish_futures 
 . 
 append 
 ( 
 publish_future 
 ) 
 futures 
 . 
 wait 
 ( 
 publish_futures 
 , 
 return_when 
 = 
 futures 
 . 
 ALL_COMPLETED 
 ) 
 print 
 ( 
 f 
 "Published messages with batch settings to 
 { 
 topic_path 
 } 
 ." 
 ) 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 # Start sending messages in one request once the size of all queued messages 
 # reaches 1 MB or the number of queued messages reaches 20 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 max_bytes 
 : 
  
 1_000_000 
 , 
  
 max_messages 
 : 
  
 20 
 } 
 10 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
 publisher 
 . 
  publish_async 
 
  
 "This is message # 
 #{ 
 i 
 } 
 ." 
 end 
 # Stop the async_publisher to send all queued messages immediately. 
 publisher 
 . 
  async_publisher 
 
 . 
 stop 
 . 
 wait! 
 puts 
  
 "Messages published asynchronously in batch." 
 

Disable batch messaging

To turn off batching in your client library, set the value of max_messages to 1.

Batch messaging and ordered delivery

With ordered delivery, failing to acknowledge any message in the batch means that all the messages in the batch, including the ones sent before the message that was not acknowledged, are all redelivered.

Quotas and limits on batch messaging

Before configuring batch messaging, consider the effect of factors such as publish throughput quota and the maximum size of a batch. The high-level client libraries ensure that batch requests are kept within the specified limits.

  • 1000 bytes is the minimum request size considered for cost purposes, even if the actual message size might be smaller than 1000 bytes.
  • Pub/Sub has a limit of 10-MB size or 1000 number of messages for a single batch publish request.

For more information, see Pub/Sub quotas and limits .

What's next

To learn how to configure advanced publishing options, see the following:

Create a Mobile Website
View Site in Mobile | Classic
Share by: