Flow control

This document provides information about using flow control with messages published to a topic.

About flow control

A publisher client may attempt to publish messages faster than that client is capable of sending data to the Pub/Sub service. Clients are limited by many factors, including:

  • Machine CPU, RAM, and network capacity
  • Network settings, such as the number of outstanding requests and available bandwidth
  • The latency of each publish request, largely determined by the network connections between the Pub/Sub service, the client, and Google Cloud

If the publish request rate exceeds these limits, requests accumulate in memory until they fail with a DEADLINE_EXCEEDED error. This is especially likely when tens of thousands of messages are published in a loop, generating thousands of requests in milliseconds.

You can diagnose this issue by checking the server side metrics in Monitoring. You won't be able to see the requests that have failed with DEADLINE_EXCEEDED , only the successful requests. The rate of successful requests tells you the throughput capacity of your client machines, providing a baseline for configuring flow control.

Go to the Monitoring page

To mitigate flow rate issues, configure your publisher client with flow control to limit the rate of publish requests. You can configure the maximum number of bytes allocated for outstanding requests, and the maximum number of outstanding messages permitted. Set these limits according to the throughput capacity of your client machines.

Before you begin

Before configuring the publish workflow, ensure you have completed the following tasks:

Required roles

To get the permissions that you need to use flow control, ask your administrator to grant you the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on your topic. For more information about granting roles, see Manage access to projects, folders, and organizations .

You might also be able to get the required permissions through custom roles or other predefined roles .

You need additional permissions to create or update topics and subscriptions.

Use flow control with messages

Publisher flow control is available using the Pub/Sub client libraries in the following languages:

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 ) 
  
 { 
  
 auto 
  
 topic 
  
 = 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )); 
  
 // Configure the publisher to block if either (1) 100 or more messages, or 
  
 // (2) messages with 100MiB worth of data have not been acknowledged by the 
  
 // service. By default the publisher never blocks, and its capacity is only 
  
 // limited by the system's memory. 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
  
 std 
 :: 
 move 
 ( 
 topic 
 ), 
  
 Options 
 {} 
  
 . 
 set<pubsub 
 :: 
 MaxPendingMessagesOption 
> ( 
 100 
 ) 
  
 . 
 set<pubsub 
 :: 
 MaxPendingBytesOption 
> ( 
 100 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 . 
 set<pubsub 
 :: 
 FullPublisherActionOption 
> ( 
  
 pubsub 
 :: 
 FullPublisherAction 
 :: 
 kBlocks 
 ))); 
  
 std 
 :: 
 vector<future<void> 
>  
 ids 
 ; 
  
 for 
  
 ( 
 char 
  
 const 
 * 
  
 data 
  
 : 
  
 { 
 "a" 
 , 
  
 "b" 
 , 
  
 "c" 
 }) 
  
 { 
  
 ids 
 . 
 push_back 
 ( 
  
 publisher 
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 (). 
 SetData 
 ( 
 data 
 ). 
 Build 
 ()) 
  
 . 
 then 
 ([ 
 data 
 ]( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 { 
  
 auto 
  
 s 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 s 
 ) 
  
 return 
 ; 
  
 std 
 :: 
 cout 
 << 
 "Sent '" 
 << 
 data 
 << 
 "' (" 
 << 
 * 
 s 
 << 
 ") 
 \n 
 " 
 ; 
  
 })); 
  
 } 
  
 publisher 
 . 
 Flush 
 (); 
  
 // Block until they are actually sent. 
  
 for 
  
 ( 
 auto 
&  
 id 
  
 : 
  
 ids 
 ) 
  
 id 
 . 
 get 
 (); 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "strconv" 
  
 "sync" 
  
 "sync/atomic" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 publishWithFlowControlSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 publisher 
 . 
 PublishSettings 
 . 
 FlowControlSettings 
  
 = 
  
 pubsub 
 . 
 FlowControlSettings 
 { 
  
 MaxOutstandingMessages 
 : 
  
 100 
 , 
  
 // default 1000 
  
 MaxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // default 0 (unlimited) 
  
 LimitExceededBehavior 
 : 
  
 pubsub 
 . 
 FlowControlBlock 
 , 
  
 // default Ignore, other options: Block and SignalError 
  
 } 
  
 var 
  
 wg 
  
 sync 
 . 
 WaitGroup 
  
 var 
  
 totalErrors 
  
 uint64 
  
 numMsgs 
  
 := 
  
 1000 
  
 // Rapidly publishing 1000 messages in a loop may be constrained by flow control. 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 numMsgs 
 ; 
  
 i 
 ++ 
  
 { 
  
 wg 
 . 
 Add 
 ( 
 1 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "message #" 
  
 + 
  
 strconv 
 . 
 Itoa 
 ( 
 i 
 )), 
  
 }) 
  
 go 
  
 func 
 ( 
 i 
  
 int 
 , 
  
 res 
  
 * 
 pubsub 
 . 
 PublishResult 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Publishing message %d\n" 
 , 
  
 i 
 ) 
  
 defer 
  
 wg 
 . 
 Done 
 () 
  
 // The Get method blocks until a server-generated ID or 
  
 // an error is returned for the published message. 
  
 _ 
 , 
  
 err 
  
 := 
  
 res 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // Error handling code can be added here. 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Failed to publish: %v" 
 , 
  
 err 
 ) 
  
 atomic 
 . 
 AddUint64 
 ( 
& totalErrors 
 , 
  
 1 
 ) 
  
 return 
  
 } 
  
 }( 
 i 
 , 
  
 result 
 ) 
  
 } 
  
 wg 
 . 
 Wait 
 () 
  
 if 
  
 totalErrors 
 > 
 0 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "%d of %d messages did not publish successfully" 
 , 
  
 totalErrors 
 , 
  
 numMsgs 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. BatchingSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowController 
. LimitExceededBehavior 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 PublishWithFlowControlExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishWithFlowControlExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishWithFlowControlExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 List<ApiFuture<String> 
>  
 messageIdFutures 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 try 
  
 { 
  
 // Configure how many messages the publisher client can hold in memory 
  
 // and what to do when messages exceed the limit. 
  
  FlowControlSettings 
 
  
 flowControlSettings 
  
 = 
  
  FlowControlSettings 
 
 . 
 newBuilder 
 () 
  
 // Block more messages from being published when the limit is reached. The other 
  
 // options are Ignore (or continue publishing) and ThrowException (or error out). 
  
 . 
 setLimitExceededBehavior 
 ( 
  LimitExceededBehavior 
 
 . 
 Block 
 ) 
  
 . 
 setMaxOutstandingRequestBytes 
 ( 
 10 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 // 10 MiB 
  
 . 
 setMaxOutstandingElementCount 
 ( 
 100L 
 ) 
  
 // 100 messages 
  
 . 
 build 
 (); 
  
 // By default, messages are not batched. 
  
  BatchingSettings 
 
  
 batchingSettings 
  
 = 
  
  BatchingSettings 
 
 . 
 newBuilder 
 (). 
 setFlowControlSettings 
 ( 
 flowControlSettings 
 ). 
 build 
 (); 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 setBatchingSettings 
 ( 
 batchingSettings 
 ). 
 build 
 (); 
  
 // Publish 1000 messages in quick succession may be constrained by publisher flow control. 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 1000 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 String 
  
 message 
  
 = 
  
 "message " 
  
 + 
  
 i 
 ; 
  
  ByteString 
 
  
 data 
  
 = 
  
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (). 
  setData 
 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique within the topic) 
  
 ApiFuture<String> 
  
 messageIdFuture 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 pubsubMessage 
 ); 
  
 messageIdFutures 
 . 
 add 
 ( 
 messageIdFuture 
 ); 
  
 } 
  
 } 
  
 finally 
  
 { 
  
 // Wait on any pending publish requests. 
  
 List<String> 
  
 messageIds 
  
 = 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 messageIdFutures 
 ). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Published " 
  
 + 
  
 messageIds 
 . 
 size 
 () 
  
 + 
  
 " messages with flow control settings." 
 ); 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shut down to free up resources. 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment this variable before running the sample. 
 */ 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 async 
  
 function 
  
 publishWithFlowControl 
 ( 
 topicNameOrId 
 ) 
  
 { 
  
 // Create publisher options 
  
 const 
  
 options 
  
 = 
  
 { 
  
 flowControlOptions 
 : 
  
 { 
  
 maxOutstandingMessages 
 : 
  
 50 
 , 
  
 maxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // 10 MB 
  
 }, 
  
 }; 
  
 // Get a publisher. Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 options 
 ); 
  
 // For flow controlled publishing, we'll use a publisher flow controller 
  
 // instead of `topic.publish()`. 
  
 const 
  
 flow 
  
 = 
  
 topic 
 . 
  flowControlled 
 
 (); 
  
 // Publish messages in a fast loop. 
  
 const 
  
 testMessage 
  
 = 
  
 { 
 data 
 : 
  
 Buffer 
 . 
  from 
 
 ( 
 'test!' 
 )}; 
  
 for 
  
 ( 
 let 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 1000 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 // You can also just `await` on `publish()` unconditionally, but if 
  
 // you want to avoid pausing to the event loop on each iteration, 
  
 // you can manually check the return value before doing so. 
  
 const 
  
 wait 
  
 = 
  
 flow 
 . 
 publish 
 ( 
 testMessage 
 ); 
  
 if 
  
 ( 
 wait 
 ) 
  
 { 
  
 await 
  
 wait 
 ; 
  
 } 
  
 } 
  
 // Wait on any pending publish requests. Note that you can call `all()` 
  
 // earlier if you like, and it will return a Promise for all messages 
  
 // that have been sent to `flowController.publish()` so far. 
  
 const 
  
 messageIds 
  
 = 
  
 await 
  
 flow 
 . 
 all 
 (); 
  
 console 
 . 
 log 
 ( 
 `Published 
 ${ 
 messageIds 
 . 
  length 
 
 } 
 with flow control settings.` 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment this variable before running the sample. 
 */ 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // Imports the Google Cloud client library 
 import 
  
 { 
 PubSub 
 , 
  
 PublishOptions 
 } 
  
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 publishWithFlowControl 
 ( 
 topicNameOrId 
 : 
  
 string 
 ) 
  
 { 
  
 // Create publisher options 
  
 const 
  
 options 
 : 
  
 PublishOptions 
  
 = 
  
 { 
  
 flowControlOptions 
 : 
  
 { 
  
 maxOutstandingMessages 
 : 
  
 50 
 , 
  
 maxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // 10 MB 
  
 }, 
  
 }; 
  
 // Get a publisher. Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 options 
 ); 
  
 // For flow controlled publishing, we'll use a publisher flow controller 
  
 // instead of `topic.publish()`. 
  
 const 
  
 flow 
  
 = 
  
 topic 
 . 
 flowControlled 
 (); 
  
 // Publish messages in a fast loop. 
  
 const 
  
 testMessage 
  
 = 
  
 { 
 data 
 : 
  
 Buffer 
 . 
 from 
 ( 
 'test!' 
 )}; 
  
 for 
  
 ( 
 let 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 1000 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 // You can also just `await` on `publish()` unconditionally, but if 
  
 // you want to avoid pausing to the event loop on each iteration, 
  
 // you can manually check the return value before doing so. 
  
 const 
  
 wait 
  
 = 
  
 flow 
 . 
 publish 
 ( 
 testMessage 
 ); 
  
 if 
  
 ( 
 wait 
 ) 
  
 { 
  
 await 
  
 wait 
 ; 
  
 } 
  
 } 
  
 // Wait on any pending publish requests. Note that you can call `all()` 
  
 // earlier if you like, and it will return a Promise for all messages 
  
 // that have been sent to `flowController.publish()` so far. 
  
 const 
  
 messageIds 
  
 = 
  
 await 
  
 flow 
 . 
 all 
 (); 
  
 console 
 . 
 log 
 ( 
 `Published 
 ${ 
 messageIds 
 . 
 length 
 } 
 with flow control settings.` 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 concurrent 
  
 import 
 futures 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 google.cloud.pubsub_v1.types 
  
 import 
 ( 
  LimitExceededBehavior 
 
 , 
  PublisherOptions 
 
 , 
  PublishFlowControl 
 
 , 
 ) 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # Configure how many messages the publisher client can hold in memory 
 # and what to do when messages exceed the limit. 
 flow_control_settings 
 = 
 PublishFlowControl 
 ( 
 message_limit 
 = 
 100 
 , 
 # 100 messages 
 byte_limit 
 = 
 10 
 * 
 1024 
 * 
 1024 
 , 
 # 10 MiB 
 limit_exceeded_behavior 
 = 
  LimitExceededBehavior 
 
 . 
 BLOCK 
 , 
 ) 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 ( 
 publisher_options 
 = 
 PublisherOptions 
 ( 
 flow_control 
 = 
 flow_control_settings 
 ) 
 ) 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 publish_futures 
 = 
 [] 
 # Resolve the publish future in a separate thread. 
 def 
  
 callback 
 ( 
 publish_future 
 : 
 pubsub_v1 
 . 
 publisher 
 . 
 futures 
 . 
 Future 
 ) 
 - 
> None 
 : 
 message_id 
 = 
 publish_future 
 . 
  result 
 
 () 
 print 
 ( 
 message_id 
 ) 
 # Publish 1000 messages in quick succession may be constrained by 
 # publisher flow control. 
 for 
 n 
 in 
 range 
 ( 
 1 
 , 
 1000 
 ): 
 data_str 
 = 
 f 
 "Message number 
 { 
 n 
 } 
 " 
 # Data must be a bytestring 
 data 
 = 
 data_str 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 publish_future 
 = 
  publish 
 
er . 
  publish 
 
 ( 
 topic_path 
 , 
 data 
 ) 
 # Non-blocking. Allow the publisher client to batch messages. 
 publish_future 
 . 
  add_done_callback 
 
 ( 
 callback 
 ) 
 publish_futures 
 . 
 append 
 ( 
 publish_future 
 ) 
 futures 
 . 
 wait 
 ( 
 publish_futures 
 , 
 return_when 
 = 
 futures 
 . 
 ALL_COMPLETED 
 ) 
 print 
 ( 
 f 
 "Published messages with flow control settings to 
 { 
 topic_path 
 } 
 ." 
 ) 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 # Configure how many messages the publisher client can hold in memory 
  
 # and what to do when messages exceed the limit. 
  
 flow_control 
 : 
  
 { 
  
 message_limit 
 : 
  
 100 
 , 
  
 byte_limit 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 # 10 MiB 
  
 # Block more messages from being published when the limit is reached. The 
  
 # other options are :ignore and :error. 
  
 limit_exceeded_behavior 
 : 
  
 :block 
  
 } 
 } 
 # Rapidly publishing 1000 messages in a loop may be constrained by flow 
 # control. 
 1000 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
 publisher 
 . 
  publish_async 
 
  
 "message 
 #{ 
 i 
 } 
 " 
  
 do 
  
 | 
 result 
 | 
  
 raise 
  
 "Failed to publish the message." 
  
 unless 
  
 result 
 . 
 succeeded? 
  
 end 
 end 
 # Stop the async_publisher to send all queued messages immediately. 
 publisher 
 . 
  async_publisher 
 
 . 
 stop 
 . 
 wait! 
 puts 
  
 "Published messages with flow control settings to 
 #{ 
 topic_id 
 } 
 ." 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: