Publish messages with flow control settings

Creates a publisher client with custom flow control settings and uses it to publish some messages.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 ) 
  
 { 
  
 auto 
  
 topic 
  
 = 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )); 
  
 // Configure the publisher to block if either (1) 100 or more messages, or 
  
 // (2) messages with 100MiB worth of data have not been acknowledged by the 
  
 // service. By default the publisher never blocks, and its capacity is only 
  
 // limited by the system's memory. 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
  
 std 
 :: 
 move 
 ( 
 topic 
 ), 
  
 Options 
 {} 
  
 . 
 set<pubsub 
 :: 
 MaxPendingMessagesOption 
> ( 
 100 
 ) 
  
 . 
 set<pubsub 
 :: 
 MaxPendingBytesOption 
> ( 
 100 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 . 
 set<pubsub 
 :: 
 FullPublisherActionOption 
> ( 
  
 pubsub 
 :: 
 FullPublisherAction 
 :: 
 kBlocks 
 ))); 
  
 std 
 :: 
 vector<future<void> 
>  
 ids 
 ; 
  
 for 
  
 ( 
 char 
  
 const 
 * 
  
 data 
  
 : 
  
 { 
 "a" 
 , 
  
 "b" 
 , 
  
 "c" 
 }) 
  
 { 
  
 ids 
 . 
 push_back 
 ( 
  
 publisher 
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 (). 
 SetData 
 ( 
 data 
 ). 
 Build 
 ()) 
  
 . 
 then 
 ([ 
 data 
 ]( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 { 
  
 auto 
  
 s 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 s 
 ) 
  
 return 
 ; 
  
 std 
 :: 
 cout 
 << 
 "Sent '" 
 << 
 data 
 << 
 "' (" 
 << 
 * 
 s 
 << 
 ") 
 \n 
 " 
 ; 
  
 })); 
  
 } 
  
 publisher 
 . 
 Flush 
 (); 
  
 // Block until they are actually sent. 
  
 for 
  
 ( 
 auto 
&  
 id 
  
 : 
  
 ids 
 ) 
  
 id 
 . 
 get 
 (); 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "strconv" 
  
 "sync" 
  
 "sync/atomic" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 publishWithFlowControlSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 publisher 
 . 
 PublishSettings 
 . 
 FlowControlSettings 
  
 = 
  
 pubsub 
 . 
 FlowControlSettings 
 { 
  
 MaxOutstandingMessages 
 : 
  
 100 
 , 
  
 // default 1000 
  
 MaxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // default 0 (unlimited) 
  
 LimitExceededBehavior 
 : 
  
 pubsub 
 . 
 FlowControlBlock 
 , 
  
 // default Ignore, other options: Block and SignalError 
  
 } 
  
 var 
  
 wg 
  
 sync 
 . 
 WaitGroup 
  
 var 
  
 totalErrors 
  
 uint64 
  
 numMsgs 
  
 := 
  
 1000 
  
 // Rapidly publishing 1000 messages in a loop may be constrained by flow control. 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 numMsgs 
 ; 
  
 i 
 ++ 
  
 { 
  
 wg 
 . 
 Add 
 ( 
 1 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "message #" 
  
 + 
  
 strconv 
 . 
 Itoa 
 ( 
 i 
 )), 
  
 }) 
  
 go 
  
 func 
 ( 
 i 
  
 int 
 , 
  
 res 
  
 * 
 pubsub 
 . 
 PublishResult 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Publishing message %d\n" 
 , 
  
 i 
 ) 
  
 defer 
  
 wg 
 . 
 Done 
 () 
  
 // The Get method blocks until a server-generated ID or 
  
 // an error is returned for the published message. 
  
 _ 
 , 
  
 err 
  
 := 
  
 res 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // Error handling code can be added here. 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Failed to publish: %v" 
 , 
  
 err 
 ) 
  
 atomic 
 . 
 AddUint64 
 ( 
& totalErrors 
 , 
  
 1 
 ) 
  
 return 
  
 } 
  
 }( 
 i 
 , 
  
 result 
 ) 
  
 } 
  
 wg 
 . 
 Wait 
 () 
  
 if 
  
 totalErrors 
 > 
 0 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "%d of %d messages did not publish successfully" 
 , 
  
 totalErrors 
 , 
  
 numMsgs 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. BatchingSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowController 
. LimitExceededBehavior 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 PublishWithFlowControlExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishWithFlowControlExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishWithFlowControlExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 List<ApiFuture<String> 
>  
 messageIdFutures 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 try 
  
 { 
  
 // Configure how many messages the publisher client can hold in memory 
  
 // and what to do when messages exceed the limit. 
  
  FlowControlSettings 
 
  
 flowControlSettings 
  
 = 
  
  FlowControlSettings 
 
 . 
 newBuilder 
 () 
  
 // Block more messages from being published when the limit is reached. The other 
  
 // options are Ignore (or continue publishing) and ThrowException (or error out). 
  
 . 
 setLimitExceededBehavior 
 ( 
  LimitExceededBehavior 
 
 . 
 Block 
 ) 
  
 . 
 setMaxOutstandingRequestBytes 
 ( 
 10 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 // 10 MiB 
  
 . 
 setMaxOutstandingElementCount 
 ( 
 100L 
 ) 
  
 // 100 messages 
  
 . 
 build 
 (); 
  
 // By default, messages are not batched. 
  
  BatchingSettings 
 
  
 batchingSettings 
  
 = 
  
  BatchingSettings 
 
 . 
 newBuilder 
 (). 
 setFlowControlSettings 
 ( 
 flowControlSettings 
 ). 
 build 
 (); 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 setBatchingSettings 
 ( 
 batchingSettings 
 ). 
 build 
 (); 
  
 // Publish 1000 messages in quick succession may be constrained by publisher flow control. 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 1000 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 String 
  
 message 
  
 = 
  
 "message " 
  
 + 
  
 i 
 ; 
  
  ByteString 
 
  
 data 
  
 = 
  
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (). 
  setData 
 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique within the topic) 
  
 ApiFuture<String> 
  
 messageIdFuture 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 pubsubMessage 
 ); 
  
 messageIdFutures 
 . 
 add 
 ( 
 messageIdFuture 
 ); 
  
 } 
  
 } 
  
 finally 
  
 { 
  
 // Wait on any pending publish requests. 
  
 List<String> 
  
 messageIds 
  
 = 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 messageIdFutures 
 ). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Published " 
  
 + 
  
 messageIds 
 . 
 size 
 () 
  
 + 
  
 " messages with flow control settings." 
 ); 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shut down to free up resources. 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 # Configure how many messages the publisher client can hold in memory 
  
 # and what to do when messages exceed the limit. 
  
 flow_control 
 : 
  
 { 
  
 message_limit 
 : 
  
 100 
 , 
  
 byte_limit 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 # 10 MiB 
  
 # Block more messages from being published when the limit is reached. The 
  
 # other options are :ignore and :error. 
  
 limit_exceeded_behavior 
 : 
  
 :block 
  
 } 
 } 
 # Rapidly publishing 1000 messages in a loop may be constrained by flow 
 # control. 
 1000 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
 publisher 
 . 
  publish_async 
 
  
 "message 
 #{ 
 i 
 } 
 " 
  
 do 
  
 | 
 result 
 | 
  
 raise 
  
 "Failed to publish the message." 
  
 unless 
  
 result 
 . 
 succeeded? 
  
 end 
 end 
 # Stop the async_publisher to send all queued messages immediately. 
 publisher 
 . 
  async_publisher 
 
 . 
 stop 
 . 
 wait! 
 puts 
  
 "Published messages with flow control settings to 
 #{ 
 topic_id 
 } 
 ." 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: