Extend ack time with lease management

When a message is delivered to a pull subscriber, the subscriber must process and acknowledge (ack) the message within the acknowledgment deadline. Else, the subscriber must extend the deadline with a call to modify the acknowledgment deadline.

The Pub/Sub high-level client libraries provide lease management as a feature that automatically extends the deadline of a message that has not yet been acknowledged. By default, the client libraries can extend the deadline to an hour by issuing periodic modifyAckDeadline requests.The high-level client libraries for Python, Go, and Java use the 99th percentile of acknowledgment delay to determine the length of each extension.

Lease management lets you have more granular control over the acknowledgment deadline for messages compared to configuring the subscription-level property. If you're only using the subscription-level acknowledgment deadline, you have to balance the tradeoff between a low value and a high value. A low value increases the likelihood of duplicates and a high value delays the redelivery of failed messages. Determining the right value can be difficult, especially when the expected processing time for different messages varies greatly.

For more information about the properties of a subscription, including acknowledgment deadline, see Subscription properties .

Lease management configuration

You can configure the following properties in the high-level client libraries to control lease management.

  • Maximum acknowledgment extension period.The maximum amount of time for which the client library can extend the acknowledgment deadline of a message by using the modify acknowledgment deadline request. This property lets you determine how long you want the subscriber clients to process messages.

  • Maximum duration for each acknowledgment extension.The maximum amount of time by which to extend the acknowledgment deadline for each of the modify acknowledgment deadline requests. This property lets you define the amount of time Pub/Sub takes to redeliver a message. Redelivery occurs when the first subscriber processing the message crashes or becomes unhealthy and is no longer able to send the modify acknowledgment deadline request.

  • Minimum duration for each acknowledgment extension.The minimum amount of time by which to extend the acknowledgment deadline for each of the modify acknowledgment deadline requests. This property lets you specify the minimum amount of time that must pass before the redelivery of a message occurs.

Acknowledgment deadlines are not guaranteed to be respected unless you enable exactly-once delivery .

Manually managing ack deadlines

To avoid expiration and redelivery of messages when using unary pull or the low-level client libraries, use the modify acknowledgment deadline request to extend their acknowledgment deadlines. The exceptions are the Go and C++ high-level client libraries which provide lease management when using unary pull. See the following samples for unary pull with lease management:

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 public 
  
 class 
  
 PullMessageWithLeaseManagementSample 
 { 
  
 public 
  
 int 
  
 PullMessageWithLeaseManagement 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberServiceApiClient 
 
  
 subscriberClient 
  
 = 
  
  SubscriberServiceApiClient 
 
 . 
  Create 
 
 (); 
  
 var 
  
 ackIds 
  
 = 
  
 new 
  
 List<string> 
 (); 
  
 try 
  
 { 
  
  PullResponse 
 
  
 response 
  
 = 
  
 subscriberClient 
 . 
  Pull 
 
 ( 
 subscriptionName 
 , 
  
 maxMessages 
 : 
  
 20 
 ); 
  
 // Print out each received message. 
  
 foreach 
  
 ( 
  ReceivedMessage 
 
  
 msg 
  
 in 
  
 response 
 . 
  ReceivedMessages 
 
 ) 
  
 { 
  
 ackIds 
 . 
  Add 
 
 ( 
 msg 
 . 
  AckId 
 
 ); 
  
 string 
  
 text 
  
 = 
  
 msg 
 . 
 Message 
 . 
  Data 
 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {msg.Message.MessageId}: {text}" 
 ); 
  
 // Modify the ack deadline of each received message from the default 10 seconds to 30. 
  
 // This prevents the server from redelivering the message after the default 10 seconds 
  
 // have passed. 
  
 subscriberClient 
 . 
  ModifyAckDeadline 
 
 ( 
 subscriptionName 
 , 
  
 new 
  
 List<string> 
  
 { 
  
 msg 
 . 
  AckId 
 
  
 }, 
  
 30 
 ); 
  
 } 
  
 // If acknowledgement required, send to server. 
  
 if 
  
 ( 
 acknowledge 
 && 
 ackIds 
 . 
 Count 
 > 
 0 
 ) 
  
 { 
  
 subscriberClient 
 . 
  Acknowledge 
 
 ( 
 subscriptionName 
 , 
  
 ackIds 
 ); 
  
 } 
  
 } 
  
 catch 
  
 ( 
  RpcException 
 
  
 ex 
 ) 
  
 when 
  
 ( 
 ex 
 . 
  Status 
 
 . 
  StatusCode 
 
  
 == 
  
  StatusCode 
 
 . 
  Unavailable 
 
 ) 
  
 { 
  
 // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription. 
  
 } 
  
 return 
  
 ackIds 
 . 
  Count 
 
 ; 
  
 } 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub. SubscriberStub 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub.SubscriberStubSettings 
 ; 
 import 
  
 com.google.pubsub.v1. AcknowledgeRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. ModifyAckDeadlineRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullResponse 
 
 ; 
 import 
  
 com.google.pubsub.v1. ReceivedMessage 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 public 
  
 class 
 SubscribeSyncWithLeaseExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 Integer 
  
 numOfMessages 
  
 = 
  
 10 
 ; 
  
 subscribeSyncWithLeaseExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 , 
  
 numOfMessages 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeSyncWithLeaseExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 , 
  
 Integer 
  
 numOfMessages 
 ) 
  
 throws 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 SubscriberStubSettings 
  
 subscriberStubSettings 
  
 = 
  
 SubscriberStubSettings 
 . 
 newBuilder 
 () 
  
 . 
 setTransportChannelProvider 
 ( 
  
 SubscriberStubSettings 
 . 
 defaultGrpcTransportProviderBuilder 
 () 
  
 . 
 setMaxInboundMessageSize 
 ( 
 20 
 << 
 20 
 ) 
  
 // 20 MB 
  
 . 
 build 
 ()) 
  
 . 
 build 
 (); 
  
 try 
  
 ( 
  SubscriberStub 
 
  
 subscriber 
  
 = 
  
 GrpcSubscriberStub 
 . 
 create 
 ( 
 subscriberStubSettings 
 )) 
  
 { 
  
 String 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 format 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  PullRequest 
 
  
 pullRequest 
  
 = 
  
  PullRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setMaxMessages 
 ( 
 numOfMessages 
 ) 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 build 
 (); 
  
 // Use pullCallable().futureCall to asynchronously perform this operation. 
  
  PullResponse 
 
  
 pullResponse 
  
 = 
  
 subscriber 
 . 
 pullCallable 
 (). 
 call 
 ( 
 pullRequest 
 ); 
  
 // Stop the program if the pull response is empty to avoid acknowledging 
  
 // an empty list of ack IDs. 
  
 if 
  
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 (). 
 isEmpty 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "No message was pulled. Exiting." 
 ); 
  
 return 
 ; 
  
 } 
  
 List<String> 
  
 ackIds 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 for 
  
 ( 
  ReceivedMessage 
 
  
 message 
  
 : 
  
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()) 
  
 { 
  
 ackIds 
 . 
 add 
 ( 
 message 
 . 
 getAckId 
 ()); 
  
 // Modify the ack deadline of each received message from the default 10 seconds to 30. 
  
 // This prevents the server from redelivering the message after the default 10 seconds 
  
 // have passed. 
  
  ModifyAckDeadlineRequest 
 
  
 modifyAckDeadlineRequest 
  
 = 
  
  ModifyAckDeadlineRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 addAckIds 
 ( 
 message 
 . 
 getAckId 
 ()) 
  
 . 
 setAckDeadlineSeconds 
 ( 
 30 
 ) 
  
 . 
 build 
 (); 
  
 subscriber 
 . 
 modifyAckDeadlineCallable 
 (). 
 call 
 ( 
 modifyAckDeadlineRequest 
 ); 
  
 } 
  
 // Acknowledge received messages. 
  
  AcknowledgeRequest 
 
  
 acknowledgeRequest 
  
 = 
  
  AcknowledgeRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 addAllAckIds 
 ( 
 ackIds 
 ) 
  
 . 
 build 
 (); 
  
 // Use acknowledgeCallable().futureCall to asynchronously perform this operation. 
  
 subscriber 
 . 
 acknowledgeCallable 
 (). 
 call 
 ( 
 acknowledgeRequest 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const projectId = 'YOUR_PROJECT_ID'; 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // Imports the Google Cloud client library. v1 is for the lower level 
 // proto access. 
 const 
  
 { 
 v1 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use. 
 const 
  
 subClient 
  
 = 
  
 new 
  
 v1 
 . 
  SubscriberClient 
 
 (); 
 async 
  
 function 
  
 synchronousPullWithLeaseManagement 
 () 
  
 { 
  
 // The low level API client requires a name only. 
  
 const 
  
 formattedSubscription 
  
 = 
  
 subscriptionNameOrId 
 . 
 indexOf 
 ( 
 '/' 
 ) 
  
> = 
  
 0 
  
 ? 
  
 subscriptionNameOrId 
  
 : 
  
 subClient 
 . 
 subscriptionPath 
 ( 
 projectId 
 , 
  
 subscriptionNameOrId 
 ); 
  
 // The maximum number of messages returned for this request. 
  
 // Pub/Sub may return fewer than the number specified. 
  
 const 
  
 maxMessages 
  
 = 
  
 1 
 ; 
  
 const 
  
 newAckDeadlineSeconds 
  
 = 
  
 30 
 ; 
  
 const 
  
 request 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 maxMessages 
 : 
  
 maxMessages 
 , 
  
 allowExcessMessages 
 : 
  
 false 
 , 
  
 }; 
  
 let 
  
 isProcessed 
  
 = 
  
 false 
 ; 
  
 // The worker function is meant to be non-blocking. It starts a long- 
  
 // running process, such as writing the message to a table, which may 
  
 // take longer than the default 10-sec acknowledge deadline. 
  
 function 
  
 worker 
 ( 
 message 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 `Processing " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 "...` 
 ); 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 `Finished procesing " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 ".` 
 ); 
  
 isProcessed 
  
 = 
  
 true 
 ; 
  
 }, 
  
 30000 
 ); 
  
 } 
  
 // The subscriber pulls a specified number of messages. 
  
 const 
  
 [ 
 response 
 ] 
  
 = 
  
 await 
  
 subClient 
 . 
 pull 
 ( 
 request 
 ); 
  
 // Obtain the first message. 
  
 const 
  
 message 
  
 = 
  
 response 
 . 
 receivedMessages 
 [ 
 0 
 ]; 
  
 // Send the message to the worker function. 
  
 worker 
 ( 
 message 
 ); 
  
 let 
  
 waiting 
  
 = 
  
 true 
 ; 
  
 while 
  
 ( 
 waiting 
 ) 
  
 { 
  
 await 
  
 new 
  
  Promise 
 
 ( 
 r 
  
 = 
>  
 setTimeout 
 ( 
 r 
 , 
  
 10000 
 )); 
  
 // If the message has been processed.. 
  
 if 
  
 ( 
 isProcessed 
 ) 
  
 { 
  
 const 
  
 ackRequest 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 ackIds 
 : 
  
 [ 
 message 
 . 
 ackId 
 ], 
  
 }; 
  
 //..acknowledges the message. 
  
 await 
  
 subClient 
 . 
 acknowledge 
 ( 
 ackRequest 
 ); 
  
 console 
 . 
 log 
 ( 
 `Acknowledged: " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 ".` 
 ); 
  
 // Exit after the message is acknowledged. 
  
 waiting 
  
 = 
  
 false 
 ; 
  
 console 
 . 
 log 
 ( 
 'Done.' 
 ); 
  
 } 
  
 else 
  
 { 
  
 // If the message is not yet processed.. 
  
 const 
  
 modifyAckRequest 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 ackIds 
 : 
  
 [ 
 message 
 . 
 ackId 
 ], 
  
 ackDeadlineSeconds 
 : 
  
 newAckDeadlineSeconds 
 , 
  
 }; 
  
 //..reset its ack deadline. 
  
 await 
  
 subClient 
 . 
 modifyAckDeadline 
 ( 
 modifyAckRequest 
 ); 
  
 console 
 . 
 log 
 ( 
  
 `Reset ack deadline for " 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 " for 
 ${ 
 newAckDeadlineSeconds 
 } 
 s.` 
 , 
  
 ); 
  
 } 
  
 } 
 } 
 synchronousPullWithLeaseManagement 
 (). 
 catch 
 ( 
 console 
 . 
 error 
 ); 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  import 
  
 logging 
 import 
  
 multiprocessing 
 import 
  
 sys 
 import 
  
 time 
 from 
  
 google.api_core 
  
 import 
  retry 
 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 multiprocessing 
 . 
 log_to_stderr 
 () 
 logger 
 = 
 multiprocessing 
 . 
 get_logger 
 () 
 logger 
 . 
 setLevel 
 ( 
 logging 
 . 
 INFO 
 ) 
 processes 
 = 
 dict 
 () 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 response 
 = 
 subscriber 
 . 
  pull 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "max_messages" 
 : 
 3 
 }, 
 retry 
 = 
  retry 
 
 . 
 Retry 
 ( 
 deadline 
 = 
 300 
 ), 
 ) 
 if 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 == 
 0 
 : 
 return 
 # Start a process for each message based on its size modulo 10. 
 for 
 message 
 in 
 response 
 . 
 received_messages 
 : 
 process 
 = 
 multiprocessing 
 . 
 Process 
 ( 
 target 
 = 
 time 
 . 
 sleep 
 , 
 args 
 = 
 ( 
 sys 
 . 
 getsizeof 
 ( 
 message 
 ) 
 % 
 10 
 ,) 
 ) 
 processes 
 [ 
 process 
 ] 
 = 
 ( 
 message 
 . 
  ack_id 
 
 , 
 message 
 . 
 message 
 . 
  data 
 
 ) 
 process 
 . 
 start 
 () 
 while 
 processes 
 : 
 # Take a break every second. 
 if 
 processes 
 : 
 time 
 . 
 sleep 
 ( 
 1 
 ) 
 for 
 process 
 in 
 list 
 ( 
 processes 
 ): 
 ack_id 
 , 
 msg_data 
 = 
 processes 
 [ 
 process 
 ] 
 # If the process is running, reset the ack deadline. 
 if 
 process 
 . 
 is_alive 
 (): 
 subscriber 
 . 
 modify_ack_deadline 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "ack_ids" 
 : 
 [ 
 ack_id 
 ], 
 # Must be between 10 and 600. 
 "ack_deadline_seconds" 
 : 
 15 
 , 
 } 
 ) 
 logger 
 . 
 debug 
 ( 
 f 
 "Reset ack deadline for 
 { 
 msg_data 
 } 
 ." 
 ) 
 # If the process is complete, acknowledge the message. 
 else 
 : 
 subscriber 
 . 
  acknowledge 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "ack_ids" 
 : 
 [ 
 ack_id 
 ]} 
 ) 
 logger 
 . 
 debug 
 ( 
 f 
 "Acknowledged 
 { 
 msg_data 
 } 
 ." 
 ) 
 processes 
 . 
 pop 
 ( 
 process 
 ) 
 print 
 ( 
 f 
 "Received and acknowledged 
 { 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 } 
 messages from 
 { 
 subscription_path 
 } 
 ." 
 ) 
 # Close the underlying gPRC channel. Alternatively, wrap subscriber in 
 # a 'with' block to automatically call close() when done. 
 subscriber 
 . 
  close 
 
 () 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 new_ack_deadline 
  
 = 
  
 30 
 processed 
  
 = 
  
 false 
 # The subscriber pulls a specified number of messages. 
 received_messages 
  
 = 
  
 subscriber 
 . 
 pull 
  
 immediate 
 : 
  
 false 
 , 
  
 max 
 : 
  
 1 
 # Obtain the first message. 
 message 
  
 = 
  
 received_messages 
 . 
 first 
 # Send the message to a non-blocking worker that starts a long-running 
 # process, such as writing the message to a table, which may take longer than 
 # the default 10-second acknowledge deadline. 
 Thread 
 . 
  new 
 
  
 do 
  
 sleep 
  
 15 
  
 processed 
  
 = 
  
 true 
  
 puts 
  
 "Finished processing 
 \" 
 #{ 
 message 
 . 
 data 
 } 
 \" 
 ." 
 end 
 loop 
  
 do 
  
 sleep 
  
 1 
  
 if 
  
 processed 
  
 # If the message has been processed, acknowledge the message. 
  
 message 
 . 
  acknowledge! 
 
  
 puts 
  
 "Done." 
  
 # Exit after the message is acknowledged. 
  
 break 
  
 else 
  
 # If the message has not yet been processed, reset its ack deadline. 
  
 message 
 . 
  modify_ack_deadline! 
 
  
 new_ack_deadline 
  
 puts 
  
 "Reset ack deadline for 
 \" 
 #{ 
 message 
 . 
 data 
 } 
 \" 
 for " 
  
 \ 
  
 " 
 #{ 
 new_ack_deadline 
 } 
 seconds." 
  
 end 
 end 
 

What's next

Read about the other delivery options you can configure for a subscription:

Create a Mobile Website
View Site in Mobile | Classic
Share by: