Publish messages with flow control settings

Creates a publisher client with custom flow control settings and uses it to publish some messages.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 ) 
  
 { 
  
 auto 
  
 topic 
  
 = 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )); 
  
 // Configure the publisher to block if either (1) 100 or more messages, or 
  
 // (2) messages with 100MiB worth of data have not been acknowledged by the 
  
 // service. By default the publisher never blocks, and its capacity is only 
  
 // limited by the system's memory. 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
  
 std 
 :: 
 move 
 ( 
 topic 
 ), 
  
 Options 
 {} 
  
< . 
 setpubsub 
 :: 
 MaxPendingMessagesOp>tion 
 ( 
 100 
 ) 
  
< . 
 setpubsub 
 :: 
 MaxPendingBytesOp>tion 
 ( 
 100 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
< . 
 setpubsub 
 :: 
 FullPublisherActionOp>tion 
 ( 
  
 pubsub 
 :: 
 FullPublisherAction 
 :: 
 kBlocks 
 ))); 
  
 std 
 :: 
 ve<ctorfu<ture>>void 
  
 ids 
 ; 
  
 for 
  
 ( 
 char 
  
 const 
 * 
  
 data 
  
 : 
  
 { 
 "a" 
 , 
  
 "b" 
 , 
  
 "c" 
 }) 
  
 { 
  
 ids 
 . 
 push_back 
 ( 
  
 publisher 
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 (). 
 SetData 
 ( 
 data 
 ). 
 Build 
 (<)) 
 < 
 . 
 the>>n 
 ([ 
 data 
 ]( 
 futureStatusOrstd 
 :: 
 string 
  
 f 
 ) 
  
 { 
  
 auto 
  
 s 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 s 
<< ) 
  
 return 
 ; 
<< <<       << 
 std 
<< :: 
 cout 
  
 "Sent '" 
  
 data 
  
 "' (" 
  
 * 
 s 
  
 ") 
 \n 
 " 
 ; 
  
 })); 
  
 } 
 & 
 publisher 
 . 
 Flush 
 (); 
  
 
 // Block until they are actually sent. 
  
 for 
  
 ( 
 auto 
  
 id 
  
 : 
  
 ids 
 ) 
  
 id 
 . 
 get 
 (); 
 } 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "strconv" 
  
 "sync" 
  
 "sync/atomic" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 publishWithFlowControlSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 publisher 
 . 
 PublishSettings 
 . 
 FlowControlSettings 
  
 = 
  
 pubsub 
 . 
 FlowControlSettings 
 { 
  
 MaxOutstandingMessages 
 : 
  
 100 
 , 
  
 // default 1000 
  
 MaxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // default 0 (unlimited) 
  
 LimitExceededBehavior 
 : 
  
 pubsub 
 . 
 FlowControlBlock 
 , 
  
 // default Ignore, other options: Block and SignalError 
  
 } 
  
 var 
  
 wg 
  
 sync 
 . 
 WaitGroup 
  
 var 
  
 totalErrors 
  
 uin<t64 
  
 numMsgs 
  
 := 
  
 1000 
  
 // Rapidly publishing 1000 messages in a& loop may be constrained by flow control. 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
  
 numMsgs 
 ; 
  
 i 
 ++ 
  
 { 
  
 wg 
 . 
 Add 
 ( 
 1 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
 pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "message #" 
  
 + 
  
 strconv 
 . 
 Itoa 
 ( 
 i 
 )), 
  
 }) 
  
 go 
  
 func 
 ( 
 i 
  
 int 
 , 
  
 res 
  
 * 
 pubsub 
 . 
 PublishResult 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Publishing message %d\n" 
 , 
  
 i 
 ) 
  
 defer 
  
 wg 
 . 
 Done 
 () 
  
 // The Get method blocks until a server-generated ID or 
  
 // an error is returned for the published message. 
  
 _ 
 , 
  
 err 
  
 := 
  
 res 
 . 
 Get 
 ( 
& ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // Error handling code can be added here. 
  
 fmt 
 . 
 F>printf 
 ( 
 w 
 , 
  
 "Failed to publish: %v" 
 , 
  
 err 
 ) 
  
 atomic 
 . 
 AddUint64 
 ( 
 totalErrors 
 , 
  
 1 
 ) 
  
 return 
  
 } 
  
 }( 
 i 
 , 
  
 resu 
 
lt ) 
  
 } 
  
 wg 
 . 
 Wait 
 () 
  
 if 
  
 totalErrors 
  
 0 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "%d of %d messages did not publish successfully" 
 , 
  
 totalErrors 
 , 
  
 numMsgs 
 ) 
  
 } 
  
 return 
  
 nil 
 } 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. BatchingSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowController 
. LimitExceededBehavior 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 PublishWithFlowControlExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishWithFlowControlExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishWithFlowControlExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 Interru ptedExcep 
tion 
  
 { 
  
 To picName 
 
  
 topicName 
  
 = 
  
 TopicName 
 . 
 of 
 ( 
 proje ctId 
 
 , 
  
 topicId 
 ); 
  
 Publisher 
  
 publis<her 
  
 = 
  
 nul<l 
 ; 
 >> 
 ListApiFutureString 
  
 messageIdFut<>ures 
  
 = 
  
 new 
  
 ArrayList 
 (); 
  
 try 
  
 { 
  
 // Configure how many messages the publisher client can hold in memory 
  
 // and what to do when messages exce ed the limit. 
 
  
 FlowControlSettings 
  
 flowControlS ettings 
 
  
 = 
  
 FlowControlSettings 
 . 
 newBuilder 
 () 
  
 // Block more messages from being published when the limit is reached. The other 
  
 // options are Ignore (or continue publishing) and ThrowException (or error out). 
  
 . 
 setLi mitExceededBehavior 
 
 ( 
 LimitExceededBehavior 
 . 
 Block 
 ) 
  
 . 
 setMaxOutstandingRequestBytes 
 ( 
 10 
  
 * 
  
 1024 
  
 * 
  
 1024L 
 ) 
  
 // 10 MiB 
  
 . 
 setMaxOutstandingElementCount 
 ( 
 100L 
 ) 
  
 // 100 messages 
  
 . 
 build 
 (); 
  
 // By default, messages are not batched. 
 
  
 BatchingSettings 
  
 batchingS ettings 
 
  
 = 
  
 BatchingSettings 
 . 
 newBuilder 
 (). 
 setFlowControlSettings 
 ( 
 flowControlSettings 
 ). 
 build 
 (); 
   
 p 
 
ublisher  
 = 
  
 Publisher 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 setBatchingSettings 
 ( 
 batchingSettings 
 ). 
 build 
 (); 
  
 // Publish 1000 messages in quick succession may be constrained by publisher flow control. 
 < 
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
  
 1000 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 String 
  
 message 
  
 = 
  
 &quo t;message 
" 
  
 + 
   
 i 
 ; 
  
 
  ByteString 
  
 
 data 
  
 = 
  
 ByteString 
 . 
 c opyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
   
 = 
  
 Pubs 
 
ubMessage . 
 newBuilder 
 (). 
 setData 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique withi<n the >topic) 
  
 ApiF utureSt 
rin g 
 
  
 messageIdFuture 
  
 = 
  
 publisher 
 . 
 publish 
 ( 
 pubsubMessage 
 ); 
  
 messageIdFutures 
 . 
 add 
 ( 
 messageIdFuture 
 ); 
  
 } 
  
 } 
  
 finally 
  
 { 
  
 // Wait on any pendin<g publ>ish requests. 
   
 List 
 
S tring  
 mes 
 
sageIds  
 = 
  
 ApiFutures 
 . 
 allAsList 
 ( 
 messageIdFutures 
 ). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Published " 
  
 + 
  
 messageIds 
 . 
 size 
 () 
  
 + 
  
 " messages with flow control settings." 
 ); 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shut dow 
n to free up resources . 
 
  
 publisher 
 . 
 shutdown 
 (); 
  
 publisher 
 . 
 awaitTer 
 
mination ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * TODO(developer): Uncomment this variable before running the sample. 
 */ 
 // 
  
 const 
  
 topicNameOrId 
  
 = 
  
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
  
 Imports 
  
 the 
  
 Google 
  
 Cloud 
  
 client 
  
 library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 '@google-cloud/pubsub' 
 ); 
 // 
  
 Creates 
  
 a 
  
 client 
 ; 
  
 cache 
  
 this 
  
 for 
  
 further 
  
 use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 publishWithFlowControl 
 ( 
 topicNameOrId 
 ) 
  
 { 
  
 // 
  
 Create 
  
 publisher 
  
 options 
  
 const 
  
 options 
  
 = 
  
 { 
  
 flowControlOptions 
 : 
  
 { 
  
 maxOutstandingMessages 
 : 
  
 50 
 , 
  
 maxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // 
  
 10 
  
 MB 
  
 } 
 , 
  
 } 
 ; 
  
 // 
  
 Get 
  
 a 
  
 publisher 
 . 
  
 Cache 
  
 topic 
  
 objects 
  
 ( 
 publishers 
 ) 
  
 and 
  
 reuse 
  
 them 
 . 
  
 const 
  
 topic 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 options 
 ); 
  
 // 
  
 For 
  
 flow 
  
 controlled 
  
 publishing 
 , 
  
 we 
 'll use a publisher flow controller 
 // instead of `topic.publish()`. 
 const flow = topic.flowControlled(); 
 // Publish messages in a fast loop. 
 const testMessage = {data: Buffer.from(' 
 test 
 ! 
 &<#39;)}; 
 for (let i = 0; i  1000; i++) { 
 // You can also just `await` on `publish()` unconditionally, but if 
 // you want to avoid pausing to the event loop on each iteration, 
 // you can manually check the return value before doing so. 
 const wait = flow.publish(testMessage); 
 if (wait) { 
 await wait; 
 } 
 } 
 // Wait on any pending publish requests. Note that you can call `all()` 
 // earlier if you like, and it will return a Promise for all messages 
 // that have been sent to `flowController.publish()` so far. 
 const messageIds = await flow.all(); 
 console.log(`Published ${messageIds.length} with 
 
flow control settings.`); } 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 this 
 variable 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 topicNameOrId 
 = 
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 , 
 PublishOptions 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 async 
 function 
 publishWithFlowControl 
 ( 
 topicNameOrId 
 : 
 string 
 ) 
 { 
 // 
 Create 
 publisher 
 options 
 const 
 options 
 : 
 PublishOptions 
 = 
 { 
 flowControlOptions 
 : 
 { 
 maxOutstandingMessages 
 : 
 50 
 , 
 maxOutstandingBytes 
 : 
 10 
 * 
 1024 
 * 
 1024 
 , 
 // 
 10 
 MB 
 }, 
 }; 
 // 
 Get 
 a 
 publisher 
 . 
 Cache 
 topic 
 objects 
 ( 
 publishers 
 ) 
 and 
 reuse 
 them 
 . 
 const 
 topic 
 = 
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
 options 
 ); 
 // 
 For 
 flow 
 controlled 
 publishing 
 , 
 we 
 'll use a publisher flow controller 
 // 
 instead 
 of 
 ` 
 topic 
 . 
 publish 
 () 
 ` 
 . 
 const 
 flow 
 = 
 topic 
 . 
 flowControlled 
 (); 
 // 
 Publish 
 messages 
 in 
 a 
 fast 
 loop 
 . 
 const 
 testMessage 
 = 
 { 
 data 
 : 
 Buffer 
 . 
 from 
 ( 
 'test!&<#39; 
 )}; 
 for 
 ( 
 let 
 i 
 = 
 0 
 ; 
 i 
 1000 
 ; 
 i 
 ++ 
 ) 
 { 
 // 
 You 
 can 
 also 
 just 
 ` 
 await 
 ` 
 on 
 ` 
 publish 
 () 
 ` 
 unconditionally 
 , 
 but 
 if 
 // 
 you 
 want 
 to 
 avoid 
 pausing 
 to 
 the 
 event 
 loop 
 on 
 each 
 iteration 
 , 
 // 
 you 
 can 
 manually 
 check 
 the 
 return 
 value 
 before 
 doing 
 so 
 . 
 const 
 wait 
 = 
 flow 
 . 
 publish 
 ( 
 testMessage 
 ); 
 if 
 ( 
 wait 
 ) 
 { 
 await 
 wait 
 ; 
 } 
 } 
 // 
 Wait 
 on 
 any 
 pending 
 publish 
 requests 
 . 
 Note 
 that 
 you 
 can 
 call 
 ` 
 all 
 () 
 ` 
 // 
 earlier 
 if 
 you 
 like 
 , 
 and 
 it 
 will 
 return 
 a 
 Promise 
 for 
 all 
 messages 
 // 
 that 
 have 
 been 
 sent 
 to 
 ` 
 flowController 
 . 
 publish 
 () 
 ` 
 so 
 far 
 . 
 const 
 messageIds 
 = 
 await 
 flow 
 . 
 all 
 (); 
 console 
 . 
 log 
 ( 
 ` 
 Published 
 $ 
 { 
 messageIds 
 . 
 length 
 } 
 with 
 
 flow 
 control 
 settings 
 . 
 ` 
 ); 
 } 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 concurrent 
  
 import 
 futures 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 google.cloud.pubsub_v1.types 
  
 import 
 ( 
  LimitExceededBehavior 
 
 , 
  PublisherOptions 
 
 , 
  PublishFlowControl 
 
 , 
 ) 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # Configure how many messages the publisher client can hold in memory 
 # and what to do when messages exceed the limit. 
 flow_control_settings 
 = 
 PublishFlowControl 
 ( 
 message_limit 
 = 
 100 
 , 
 # 100 messages 
 byte_limit 
 = 
 10 
 * 
 1024 
 * 
 1024 
 , 
 # 10 MiB 
 limi t_exceeded_behavior 
 
 = 
 LimitExceededBehavior 
 . 
 BLOCK 
 , 
 ) 
 pu blisher 
 
 = 
 pubsub_v1 
 . 
 PublisherClient 
 ( 
 publisher_options 
 = 
 PublisherOptions 
 ( 
 flow_control 
 = 
 flow_control_settings 
 ) 
 ) 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 publish_futures 
 = 
 [] 
 # Resolve the publish future in a separate thread. 
 def 
  
 callback 
 ( 
 publish_future 
 : 
 pubsub_v1 
 . 
 publish>er 
 . 
 futures 
 . 
 Future 
 ) 
 - 
 None 
 : 
 message_ id 
 
 = 
 publish_future 
 . 
 result 
 () 
 print 
 ( 
 message_id 
 ) 
 # Publish 1000 messages in quick succession may be constrained by 
 # publisher flow control. 
 for 
 n 
 in 
 range 
 ( 
 1 
 , 
 1000 
 ): 
 data_str 
 = 
 f 
 "Message number 
 { 
 n 
 } 
 " 
 # Data must be a bytestring 
 data 
 = 
 data_str 
 . 
 encode 
 ( 
  "u 
 
tf- 8" 
 ) 
 publish_future 
 = 
 publisher 
 . 
 publish 
 ( 
 topic_path 
 , 
 data 
 ) 
 # Non-blocking. Allow the publisher clien t to batch messag 
es. 
 publish_future 
 . 
 add_done_callback 
 ( 
 callback 
 ) 
 publish_futures 
 . 
 append 
 ( 
 publish_future 
 ) 
 futures 
 . 
 wait 
 ( 
 publish_futures 
 , 
 return_when 
 = 
 futures 
 . 
 ALL_COMPLETED 
 ) 
 print 
 ( 
 f 
 "Published messages 
 
with flow control settings to { 
 topic_path 
 } 
 ." 
 ) 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Googl e 
 
 :: 
 Clou d 
 
 :: 
 PubSub 
 . 
 new 
 publisher 
   
 = 
  
 pubsub 
 
 . 
 publisher 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 # Configure how many messages the publisher client can hold in memory 
  
 # and what to do when messages exceed the limit. 
  
 flow_control 
 : 
  
 { 
  
 message_limit 
 : 
  
 100 
 , 
  
 byte_limit 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 # 10 MiB 
  
 # Block more messages from being published when the limit is reached. The 
  
 # other options are :ignore and :error. 
  
 limit_exceeded_behavior 
 : 
  
 :block 
  
 } 
 } 
 # Rapidly publishing 1000 messages in a loop may be constrained by flow 
 # control. 
 1000 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
  publisher 
 . 
 pub 
 
lish_async  
 "message 
 #{ 
 i 
 } 
 " 
  
 do 
  
 | 
 result 
 | 
  
 raise 
  
 "Failed to publish the message." 
  
 unless 
  
 result 
 . 
 succeeded? 
  
 end 
 end 
 # Stop the async_publisher to send all queued me ssages immediat 
ely. 
 publisher 
 . 
 async_publisher 
 . 
 stop 
 . 
 wait! 
 puts 
  
 "Published messages with flo 
 
w control settings to #{ 
 topic_id 
 } 
 ." 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Design a Mobile Site
View Site in Mobile | Classic
Share by: