v1 Publish messages with flow control settings (DEPRECATED)

(DEPRECATED) Publish messages with flow control settings

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "strconv" 
  
 "sync" 
  
 "sync/atomic" 
  
 "cloud.google.com/go/pubsub" 
 ) 
 func 
  
 publishWithFlowControlSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 t 
  
 := 
  
 client 
 . 
 Topic 
 ( 
 topicID 
 ) 
  
 t 
 . 
 PublishSettings 
 . 
  FlowControlSettings 
 
  
 = 
  
 pubsub 
 . 
  FlowControlSettings 
 
 { 
  
 MaxOutstandingMessages 
 : 
  
 100 
 , 
  
 // default 1000 
  
 MaxOutstandingBytes 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 // default 0 (unlimited) 
  
 LimitExceededBehavior 
 : 
  
 pubsub 
 . 
  FlowControlBlock 
 
 , 
  
 // default Ignore, other options: Block and SignalError 
  
 } 
  
 var 
  
 wg 
  
 sync 
 . 
 WaitGroup 
  
 var 
  
 totalErrors 
  
 uint64 
  
 numMsgs 
  
 := 
  
 1000 
  
 // Rapidly publishing 1000 messages in a loop may be constrained by flow control. 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 numMsgs 
 ; 
  
 i 
 ++ 
  
 { 
  
 wg 
 . 
 Add 
 ( 
 1 
 ) 
  
 result 
  
 := 
  
 t 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "message #" 
  
 + 
  
 strconv 
 . 
 Itoa 
 ( 
 i 
 )), 
  
 }) 
  
 go 
  
 func 
 ( 
 i 
  
 int 
 , 
  
 res 
  
 * 
 pubsub 
 . 
  PublishResult 
 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Publishing message %d\n" 
 , 
  
 i 
 ) 
  
 defer 
  
 wg 
 . 
 Done 
 () 
  
 // The Get method blocks until a server-generated ID or 
  
 // an error is returned for the published message. 
  
 _ 
 , 
  
 err 
  
 := 
  
 res 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // Error handling code can be added here. 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Failed to publish: %v" 
 , 
  
 err 
 ) 
  
 atomic 
 . 
 AddUint64 
 ( 
& totalErrors 
 , 
  
 1 
 ) 
  
 return 
  
 } 
  
 }( 
 i 
 , 
  
 result 
 ) 
  
 } 
  
 wg 
 . 
 Wait 
 () 
  
 if 
  
 totalErrors 
 > 
 0 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "%d of %d messages did not publish successfully" 
 , 
  
 totalErrors 
 , 
  
 numMsgs 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 topic 
  
 = 
  
 pubsub 
 . 
 topic 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 # Configure how many messages the publisher client can hold in memory 
  
 # and what to do when messages exceed the limit. 
  
 flow_control 
 : 
  
 { 
  
 message_limit 
 : 
  
 100 
 , 
  
 byte_limit 
 : 
  
 10 
  
 * 
  
 1024 
  
 * 
  
 1024 
 , 
  
 # 10 MiB 
  
 # Block more messages from being published when the limit is reached. The 
  
 # other options are :ignore and :error. 
  
 limit_exceeded_behavior 
 : 
  
 :block 
  
 } 
 } 
 # Rapidly publishing 1000 messages in a loop may be constrained by flow control. 
 1000 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
 topic 
 . 
 publish_async 
  
 "message 
 #{ 
 i 
 } 
 " 
  
 do 
  
 | 
 result 
 | 
  
 raise 
  
 "Failed to publish the message." 
  
 unless 
  
 result 
 . 
 succeeded? 
  
 end 
 end 
 # Stop the async_publisher to send all queued messages immediately. 
 topic 
 . 
 async_publisher 
 . 
 stop 
 . 
 wait! 
 puts 
  
 "Published messages with flow control settings to 
 #{ 
 topic_id 
 } 
 ." 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: