Subscribe with synchronous pull and lease management

Uses synchronous pull to receive messages and modify their acknowledge deadlines.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 public 
  
 class 
  
 PullMessageWithLeaseManagementSample 
 { 
  
 public 
  
 int 
  
 PullMessageWithLeaseManagement 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberServiceApiClient 
 
  
 subscriberClient 
  
 = 
  
  SubscriberServiceApiClient 
 
 . 
  Create 
 
 (); 
  
 var 
  
 ackIds 
  
 = 
  
 new 
  
 List<string> 
 (); 
  
 try 
  
 { 
  
  PullResponse 
 
  
 response 
  
 = 
  
 subscriberClient 
 . 
  Pull 
 
 ( 
 subscriptionName 
 , 
  
 maxMessages 
 : 
  
 20 
 ); 
  
 // Print out each received message. 
  
 foreach 
  
 ( 
  ReceivedMessage 
 
  
 msg 
  
 in 
  
 response 
 . 
  ReceivedMessages 
 
 ) 
  
 { 
  
 ackIds 
 . 
  Add 
 
 ( 
 msg 
 . 
  AckId 
 
 ); 
  
 string 
  
 text 
  
 = 
  
 msg 
 . 
 Message 
 . 
  Data 
 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {msg.Message.MessageId}: {text}" 
 ); 
  
 // Modify the ack deadline of each received message from the default 10 seconds to 30. 
  
 // This prevents the server from redelivering the message after the default 10 seconds 
  
 // have passed. 
  
 subscriberClient 
 . 
  ModifyAckDeadline 
 
 ( 
 subscriptionName 
 , 
  
 new 
  
 List<string> 
  
 { 
  
 msg 
 . 
  AckId 
 
  
 }, 
  
 30 
 ); 
  
 } 
  
 // If acknowledgement required, send to server. 
  
 if 
  
 ( 
 acknowledge 
 && 
 ackIds 
 . 
 Count 
 > 
 0 
 ) 
  
 { 
  
 subscriberClient 
 . 
  Acknowledge 
 
 ( 
 subscriptionName 
 , 
  
 ackIds 
 ); 
  
 } 
  
 } 
  
 catch 
  
 ( 
  RpcException 
 
  
 ex 
 ) 
  
 when 
  
 ( 
 ex 
 . 
  Status 
 
 . 
  StatusCode 
 
  
 == 
  
  StatusCode 
 
 . 
  Unavailable 
 
 ) 
  
 { 
  
 // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription. 
  
 } 
  
 return 
  
 ackIds 
 . 
  Count 
 
 ; 
  
 } 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub. SubscriberStub 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub.SubscriberStubSettings 
 ; 
 import 
  
 com.google.pubsub.v1. AcknowledgeRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. ModifyAckDeadlineRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullResponse 
 
 ; 
 import 
  
 com.google.pubsub.v1. ReceivedMessage 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 public 
  
 class 
 SubscribeSyncWithLeaseExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 Integer 
  
 numOfMessages 
  
 = 
  
 10 
 ; 
  
 subscribeSyncWithLeaseExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 , 
  
 numOfMessages 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeSyncWithLeaseExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 , 
  
 Integer 
  
 numOfMessages 
 ) 
  
 throws 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 SubscriberStubSettings 
  
 subscriberStubSettings 
  
 = 
  
 SubscriberStubSettings 
 . 
 newBuilder 
 () 
  
 . 
 setTransportChannelProvider 
 ( 
  
 SubscriberStubSettings 
 . 
 defaultGrpcTransportProviderBuilder 
 () 
  
 . 
 setMaxInboundMessageSize 
 ( 
 20 
 << 
 20 
 ) 
  
 // 20 MB 
  
 . 
 build 
 ()) 
  
 . 
 build 
 (); 
  
 try 
  
 ( 
  SubscriberStub 
 
  
 subscriber 
  
 = 
  
 GrpcSubscriberStub 
 . 
 create 
 ( 
 subscriberStubSettings 
 )) 
  
 { 
  
 String 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 format 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  PullRequest 
 
  
 pullRequest 
  
 = 
  
  PullRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setMaxMessages 
 ( 
 numOfMessages 
 ) 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 build 
 (); 
  
 // Use pullCallable().futureCall to asynchronously perform this operation. 
  
  PullResponse 
 
  
 pullResponse 
  
 = 
  
 subscriber 
 . 
 pullCallable 
 (). 
 call 
 ( 
 pullRequest 
 ); 
  
 // Stop the program if the pull response is empty to avoid acknowledging 
  
 // an empty list of ack IDs. 
  
 if 
  
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 (). 
 isEmpty 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "No message was pulled. Exiting." 
 ); 
  
 return 
 ; 
  
 } 
  
 List<String> 
  
 ackIds 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 for 
  
 ( 
  ReceivedMessage 
 
  
 message 
  
 : 
  
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()) 
  
 { 
  
 ackIds 
 . 
 add 
 ( 
 message 
 . 
 getAckId 
 ()); 
  
 // Modify the ack deadline of each received message from the default 10 seconds to 30. 
  
 // This prevents the server from redelivering the message after the default 10 seconds 
  
 // have passed. 
  
  ModifyAckDeadlineRequest 
 
  
 modifyAckDeadlineRequest 
  
 = 
  
  ModifyAckDeadlineRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 addAckIds 
 ( 
 message 
 . 
 getAckId 
 ()) 
  
 . 
 setAckDeadlineSeconds 
 ( 
 30 
 ) 
  
 . 
 build 
 (); 
  
 subscriber 
 . 
 modifyAckDeadlineCallable 
 (). 
 call 
 ( 
 modifyAckDeadlineRequest 
 ); 
  
 } 
  
 // Acknowledge received messages. 
  
  AcknowledgeRequest 
 
  
 acknowledgeRequest 
  
 = 
  
  AcknowledgeRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 addAllAckIds 
 ( 
 ackIds 
 ) 
  
 . 
 build 
 (); 
  
 // Use acknowledgeCallable().futureCall to asynchronously perform this operation. 
  
 subscriber 
 . 
 acknowledgeCallable 
 (). 
 call 
 ( 
 acknowledgeRequest 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const projectId = 'YOUR_PROJECT_ID'; 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // Imports the Google Cloud client library. v1 is for the lower level 
 // proto access. 
 const 
  
 { 
 v1 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use. 
 const 
  
 subClient 
  
 = 
  
 new 
  
 v1 
 . 
  SubscriberClient 
 
 (); 
 async 
  
 function 
  
 synchronousPullWithLeaseManagement 
 () 
  
 { 
  
 // The low level API client requires a name only. 
  
 const 
  
 formattedSubscription 
  
 = 
  
 subscriptionNameOrId 
 . 
 indexOf 
 ( 
 '/' 
 ) 
  
> = 
  
 0 
  
 ? 
  
 subscriptionNameOrId 
  
 : 
  
 subClient 
 . 
 subscriptionPath 
 ( 
 projectId 
 , 
  
 subscriptionNameOrId 
 ); 
  
 // The maximum number of messages returned for this request. 
  
 // Pub/Sub may return fewer than the number specified. 
  
 const 
  
 maxMessages 
  
 = 
  
 1 
 ; 
  
 const 
  
 newAckDeadlineSeconds 
  
 = 
  
 30 
 ; 
  
 const 
  
 request 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 maxMessages 
 : 
  
 maxMessages 
 , 
  
 allowExcessMessages 
 : 
  
 false 
 , 
  
 }; 
  
 let 
  
 isProcessed 
  
 = 
  
 false 
 ; 
  
 // The worker function is meant to be non-blocking. It starts a long- 
  
 // running process, such as writing the message to a table, which may 
  
 // take longer than the default 10-sec acknowledge deadline. 
  
 function 
  
 worker 
 ( 
 message 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 `Processing " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 "...` 
 ); 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 `Finished procesing " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 ".` 
 ); 
  
 isProcessed 
  
 = 
  
 true 
 ; 
  
 }, 
  
 30000 
 ); 
  
 } 
  
 // The subscriber pulls a specified number of messages. 
  
 const 
  
 [ 
 response 
 ] 
  
 = 
  
 await 
  
 subClient 
 . 
 pull 
 ( 
 request 
 ); 
  
 // Obtain the first message. 
  
 const 
  
 message 
  
 = 
  
 response 
 . 
 receivedMessages 
 [ 
 0 
 ]; 
  
 // Send the message to the worker function. 
  
 worker 
 ( 
 message 
 ); 
  
 let 
  
 waiting 
  
 = 
  
 true 
 ; 
  
 while 
  
 ( 
 waiting 
 ) 
  
 { 
  
 await 
  
 new 
  
  Promise 
 
 ( 
 r 
  
 = 
>  
 setTimeout 
 ( 
 r 
 , 
  
 10000 
 )); 
  
 // If the message has been processed.. 
  
 if 
  
 ( 
 isProcessed 
 ) 
  
 { 
  
 const 
  
 ackRequest 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 ackIds 
 : 
  
 [ 
 message 
 . 
 ackId 
 ], 
  
 }; 
  
 //..acknowledges the message. 
  
 await 
  
 subClient 
 . 
 acknowledge 
 ( 
 ackRequest 
 ); 
  
 console 
 . 
 log 
 ( 
 `Acknowledged: " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 ".` 
 ); 
  
 // Exit after the message is acknowledged. 
  
 waiting 
  
 = 
  
 false 
 ; 
  
 console 
 . 
 log 
 ( 
 'Done.' 
 ); 
  
 } 
  
 else 
  
 { 
  
 // If the message is not yet processed.. 
  
 const 
  
 modifyAckRequest 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 ackIds 
 : 
  
 [ 
 message 
 . 
 ackId 
 ], 
  
 ackDeadlineSeconds 
 : 
  
 newAckDeadlineSeconds 
 , 
  
 }; 
  
 //..reset its ack deadline. 
  
 await 
  
 subClient 
 . 
 modifyAckDeadline 
 ( 
 modifyAckRequest 
 ); 
  
 console 
 . 
 log 
 ( 
  
 `Reset ack deadline for " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 " for 
 ${ 
 newAckDeadlineSeconds 
 } 
 s.` 
 , 
  
 ); 
  
 } 
  
 } 
 } 
 synchronousPullWithLeaseManagement 
 (). 
 catch 
 ( 
 console 
 . 
 error 
 ); 
 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 logging 
 import 
  
 multiprocessing 
 import 
  
 sys 
 import 
  
 time 
 from 
  
 google.api_core 
  
 import 
  retry 
 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 multiprocessing 
 . 
 log_to_stderr 
 () 
 logger 
 = 
 multiprocessing 
 . 
 get_logger 
 () 
 logger 
 . 
 setLevel 
 ( 
 logging 
 . 
 INFO 
 ) 
 processes 
 = 
 dict 
 () 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 response 
 = 
 subscriber 
 . 
  pull 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "max_messages" 
 : 
 3 
 }, 
 retry 
 = 
  retry 
 
 . 
 Retry 
 ( 
 deadline 
 = 
 300 
 ), 
 ) 
 if 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 == 
 0 
 : 
 return 
 # Start a process for each message based on its size modulo 10. 
 for 
 message 
 in 
 response 
 . 
 received_messages 
 : 
 process 
 = 
 multiprocessing 
 . 
 Process 
 ( 
 target 
 = 
 time 
 . 
 sleep 
 , 
 args 
 = 
 ( 
 sys 
 . 
 getsizeof 
 ( 
 message 
 ) 
 % 
 10 
 ,) 
 ) 
 processes 
 [ 
 process 
 ] 
 = 
 ( 
 message 
 . 
  ack_id 
 
 , 
 message 
 . 
 message 
 . 
  data 
 
 ) 
 process 
 . 
 start 
 () 
 while 
 processes 
 : 
 # Take a break every second. 
 if 
 processes 
 : 
 time 
 . 
 sleep 
 ( 
 1 
 ) 
 for 
 process 
 in 
 list 
 ( 
 processes 
 ): 
 ack_id 
 , 
 msg_data 
 = 
 processes 
 [ 
 process 
 ] 
 # If the process is running, reset the ack deadline. 
 if 
 process 
 . 
 is_alive 
 (): 
 subscriber 
 . 
 modify_ack_deadline 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "ack_ids" 
 : 
 [ 
 ack_id 
 ], 
 # Must be between 10 and 600. 
 "ack_deadline_seconds" 
 : 
 15 
 , 
 } 
 ) 
 logger 
 . 
 debug 
 ( 
 f 
 "Reset ack deadline for 
 { 
 msg_data 
 } 
 ." 
 ) 
 # If the process is complete, acknowledge the message. 
 else 
 : 
 subscriber 
 . 
  acknowledge 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "ack_ids" 
 : 
 [ 
 ack_id 
 ]} 
 ) 
 logger 
 . 
 debug 
 ( 
 f 
 "Acknowledged 
 { 
 msg_data 
 } 
 ." 
 ) 
 processes 
 . 
 pop 
 ( 
 process 
 ) 
 print 
 ( 
 f 
 "Received and acknowledged 
 { 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 } 
 messages from 
 { 
 subscription_path 
 } 
 ." 
 ) 
 # Close the underlying gPRC channel. Alternatively, wrap subscriber in 
 # a 'with' block to automatically call close() when done. 
 subscriber 
 . 
  close 
 
 () 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 new_ack_deadline 
  
 = 
  
 30 
 processed 
  
 = 
  
 false 
 # The subscriber pulls a specified number of messages. 
 received_messages 
  
 = 
  
 subscriber 
 . 
 pull 
  
 immediate 
 : 
  
 false 
 , 
  
 max 
 : 
  
 1 
 # Obtain the first message. 
 message 
  
 = 
  
 received_messages 
 . 
 first 
 # Send the message to a non-blocking worker that starts a long-running 
 # process, such as writing the message to a table, which may take longer than 
 # the default 10-second acknowledge deadline. 
 Thread 
 . 
  new 
 
  
 do 
  
 sleep 
  
 15 
  
 processed 
  
 = 
  
 true 
  
 puts 
  
 "Finished processing 
 \" 
 #{ 
 message 
 . 
 data 
 } 
 \" 
 ." 
 end 
 loop 
  
 do 
  
 sleep 
  
 1 
  
 if 
  
 processed 
  
 # If the message has been processed, acknowledge the message. 
  
 message 
 . 
  acknowledge! 
 
  
 puts 
  
 "Done." 
  
 # Exit after the message is acknowledged. 
  
 break 
  
 else 
  
 # If the message has not yet been processed, reset its ack deadline. 
  
 message 
 . 
  modify_ack_deadline! 
 
  
 new_ack_deadline 
  
 puts 
  
 "Reset ack deadline for 
 \" 
 #{ 
 message 
 . 
 data 
 } 
 \" 
 for " 
  
 \ 
  
 " 
 #{ 
 new_ack_deadline 
 } 
 seconds." 
  
 end 
 end 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: