Subscribe with flow control

Creates a subscriber with flow control settings, and receives messages.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 auto 
  
 sample 
  
 = 
  
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 subscription_id 
 ) 
  
 { 
  
 // Change the flow control watermarks, by default the client library uses 
  
 // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the 
  
 // size watermarks. Recall that the library stops requesting messages if 
  
 // any of the high watermarks are reached, and the library resumes 
  
 // requesting messages when *both* low watermarks are reached. 
  
 auto 
  
 constexpr 
  
 kMiB 
  
 = 
  
 1024 
  
 * 
  
 1024L 
 ; 
  
 auto 
  
 subscriber 
  
 = 
  
 pubsub 
 :: 
 Subscriber 
 ( 
 pubsub 
 :: 
 MakeSubscriberConnection 
 ( 
  
 pubsub 
 :: 
 Subscription 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 subscription_id 
 )), 
  
 Options 
 {} 
  
 . 
 set<pubsub 
 :: 
 MaxOutstandingMessagesOption 
> ( 
 1000 
 ) 
  
 . 
 set<pubsub 
 :: 
 MaxOutstandingBytesOption 
> ( 
 8 
  
 * 
  
 kMiB 
 ))); 
  
 auto 
  
 session 
  
 = 
  
 subscriber 
 . 
 Subscribe 
 ( 
  
 []( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 m 
 << 
 " 
 \n 
 " 
 ; 
  
 PleaseIgnoreThisSimplifiesTestingTheSamples 
 (); 
  
 }); 
  
 return 
  
 std 
 :: 
 make_pair 
 ( 
 subscriber 
 , 
  
 std 
 :: 
 move 
 ( 
 session 
 )); 
 }; 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Api.Gax 
 
 ; 
 using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PullMessagesWithFlowControlAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PullMessagesWithFlowControlAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
 new 
  
  SubscriberClientBuilder 
 
  
 { 
  
 SubscriptionName 
  
 = 
  
 subscriptionName 
 , 
  
 Settings 
  
 = 
  
 new 
  
 SubscriberClient 
 . 
 Settings 
  
 { 
  
 AckExtensionWindow 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 4 
 ), 
  
 AckDeadline 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 10 
 ), 
  
 FlowControlSettings 
  
 = 
  
 new 
  
  FlowControlSettings 
 
 ( 
 maxOutstandingElementCount 
 : 
  
 100 
 , 
  
 maxOutstandingByteCount 
 : 
  
 10240 
 ) 
  
 } 
  
 }. 
 BuildAsync 
 (); 
  
 // SubscriberClient runs your message handle function on multiple 
  
 // threads to maximize throughput. 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
  CancellationToken 
 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 string 
  
 text 
  
 = 
  
 message 
 . 
 Data 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {text}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
 acknowledge 
  
 ? 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
  
 : 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Nack 
 
 ); 
  
 }); 
  
 // Run for 5 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 5000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
  CancellationToken 
 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 pullMsgsFlowControlSettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 // MaxOutstandingMessages is the maximum number of unprocessed messages the 
  
 // subscriber client will pull from the server before pausing. This also configures 
  
 // the maximum number of concurrent handlers for received messages. 
  
 // 
  
 // For more information, see https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages. 
  
 sub 
 . 
 ReceiveSettings 
 . 
 MaxOutstandingMessages 
  
 = 
  
 100 
  
 // MaxOutstandingBytes is the maximum size of unprocessed messages, 
  
 // that the subscriber client will pull from the server before pausing. 
  
 sub 
 . 
 ReceiveSettings 
 . 
 MaxOutstandingBytes 
  
 = 
  
 1e8 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got message: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "sub.Receive: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 SubscribeWithFlowControlSettingsExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeWithFlowControlSettingsExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeWithFlowControlSettingsExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 // The subscriber will pause the message stream and stop receiving more messsages from the 
  
 // server if any one of the conditions is met. 
  
  FlowControlSettings 
 
  
 flowControlSettings 
  
 = 
  
  FlowControlSettings 
 
 . 
 newBuilder 
 () 
  
 // 1,000 outstanding messages. Must be >0. It controls the maximum number of messages 
  
 // the subscriber receives before pausing the message stream. 
  
 . 
 setMaxOutstandingElementCount 
 ( 
 1000L 
 ) 
  
 // 100 MiB. Must be >0. It controls the maximum size of messages the subscriber 
  
 // receives before pausing the message stream. 
  
 . 
 setMaxOutstandingRequestBytes 
 ( 
 100L 
  
 * 
  
 1024L 
  
 * 
  
 1024L 
 ) 
  
 . 
 build 
 (); 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ) 
  
 . 
 setFlowControlSettings 
 ( 
 flowControlSettings 
 ) 
  
 . 
 build 
 (); 
  
 // Start the subscriber. 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 // Allow the subscriber to run for 30s unless an unrecoverable error occurs. 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 // Shut down the subscriber after 30s. Stop receiving messages. 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 inventory 
 : 
  
 10 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: