Retry requests

Publishing failures are typically caused by client-side bottlenecks, such as insufficient service CPUs, bad thread health, or network congestion. The publisher retry policy defines the number of times Pub/Sub tries to deliver a message and the length of time between each attempt.

This document provides information about using retry requests with messages published to a topic.

Before you begin

Before configuring the publish workflow, ensure you have completed the following tasks:

Required roles

To get the permissions that you need to retry message requests to a topic, ask your administrator to grant you the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on the topic. For more information about granting roles, see Manage access to projects, folders, and organizations .

You might also be able to get the required permissions through custom roles or other predefined roles .

You need additional permissions to create or update topics and subscriptions.

About retry requests

Retry settings control how the Pub/Sub client libraries retry publish requests. The client libraries have any of the following retry settings:

  • Initial request timeout: the amount of time before a client library stops waiting for the initial publish request to complete.
  • Retry delay: the amount of time after a request times out that a client library waits to retry the request.
  • Total timeout: the amount of time after a client library stops retrying publish requests.

To retry publish requests, the initial request timeout must be shorter than the total timeout. For example, if you're using exponential backoff, the client libraries compute the request timeout and retry delay as follows:

  • After each publish request, the request timeout increases by the request timeout multiplier, up to the maximum request timeout.
  • After each retry, the retry delay increases by the retry delay multiplier, up to the maximum retry delay.

Retry a message request

During the publishing process, you might see transient or permanent publishing failures. For transient errors, you typically don't need to take any special action as Pub/Sub automatically retries the messages.

An error can also occur when a publish operation succeeds but the publish response is not received in time by the publisher client. In this case too, the publish operation is retried. As a result, you can have two identical messages with different message IDs.

In case of persistent errors, consider implementing appropriate actions outside of the publishing process to avoid overwhelming Pub/Sub.

Publishing failures are automatically retried, except for errors that don't warrant retries. This sample code demonstrates creating a publisher with custom retry settings (note that not all client libraries support custom retry settings; see the API Reference documentation for your chosen language):

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 Options 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 ) 
  
 { 
  
 auto 
  
 topic 
  
 = 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )); 
  
 // By default a publisher will retry for 60 seconds, with an initial backoff 
  
 // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by 
  
 // 30% after each attempt. This changes those defaults. 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
  
 std 
 :: 
 move 
 ( 
 topic 
 ), 
  
 Options 
 {} 
  
 . 
 set<pubsub 
 :: 
 RetryPolicyOption 
> ( 
  
 pubsub 
 :: 
 LimitedTimeRetryPolicy 
 ( 
  
 /*maximum_duration=*/ 
 std 
 :: 
 chrono 
 :: 
 minutes 
 ( 
 10 
 )) 
  
 . 
 clone 
 ()) 
  
 . 
 set<pubsub 
 :: 
 BackoffPolicyOption 
> ( 
  
 pubsub 
 :: 
 ExponentialBackoffPolicy 
 ( 
  
 /*initial_delay=*/ 
 std 
 :: 
 chrono 
 :: 
 milliseconds 
 ( 
 200 
 ), 
  
 /*maximum_delay=*/ 
 std 
 :: 
 chrono 
 :: 
 seconds 
 ( 
 45 
 ), 
  
 /*scaling=*/ 
 2.0 
 ) 
  
 . 
 clone 
 ()))); 
  
 std 
 :: 
 vector<future<bool> 
>  
 done 
 ; 
  
 for 
  
 ( 
 char 
  
 const 
 * 
  
 data 
  
 : 
  
 { 
 "1" 
 , 
  
 "2" 
 , 
  
 "3" 
 , 
  
 "go!" 
 }) 
  
 { 
  
 done 
 . 
 push_back 
 ( 
  
 publisher 
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 (). 
 SetData 
 ( 
 data 
 ). 
 Build 
 ()) 
  
 . 
 then 
 ([]( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 { 
  
 return 
  
 f 
 . 
 get 
 (). 
 ok 
 (); 
  
 })); 
  
 } 
  
 publisher 
 . 
 Flush 
 (); 
  
 int 
  
 count 
  
 = 
  
 0 
 ; 
  
 for 
  
 ( 
 auto 
&  
 f 
  
 : 
  
 done 
 ) 
  
 { 
  
 if 
  
 ( 
 f 
 . 
 get 
 ()) 
  
 ++ 
 count 
 ; 
  
 } 
  
 std 
 :: 
 cout 
 << 
 count 
 << 
 " messages sent successfully 
 \n 
 " 
 ; 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Api.Gax.Grpc 
 
 ; 
 using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PublishMessageWithRetrySettingsAsyncSample 
 { 
  
 public 
  
 async 
  
 Task 
  
 PublishMessageWithRetrySettingsAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 string 
  
 messageText 
 ) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
  FromProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 // Retry settings control how the publisher handles retry-able failures 
  
 var 
  
 maxAttempts 
  
 = 
  
 3 
 ; 
  
 var 
  
 initialBackoff 
  
 = 
  
 TimeSpan 
 . 
 FromMilliseconds 
 ( 
 110 
 ); 
  
 // default: 100 ms 
  
 var 
  
 maxBackoff 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 70 
 ); 
  
 // default : 60 seconds 
  
 var 
  
 backoffMultiplier 
  
 = 
  
 1.3 
 ; 
  
 // default: 1.0 
  
 var 
  
 totalTimeout 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 100 
 ); 
  
 // default: 600 seconds 
  
 var 
  
 publisher 
  
 = 
  
 await 
  
 new 
  
  PublisherClientBuilder 
 
  
 { 
  
 TopicName 
  
 = 
  
 topicName 
 , 
  
 ApiSettings 
  
 = 
  
 new 
  
  PublisherServiceApiSettings 
 
  
 { 
  
 PublishSettings 
  
 = 
  
  CallSettings 
 
 . 
  FromRetry 
 
 ( 
  RetrySettings 
 
 . 
  FromExponentialBackoff 
 
 ( 
  
 maxAttempts 
 : 
  
 maxAttempts 
 , 
  
 initialBackoff 
 : 
  
 initialBackoff 
 , 
  
 maxBackoff 
 : 
  
 maxBackoff 
 , 
  
 backoffMultiplier 
 : 
  
 backoffMultiplier 
 , 
  
 retryFilter 
 : 
  
  RetrySettings 
 
 . 
  FilterForStatusCodes 
 
 ( 
  StatusCode 
 
 . 
  Unavailable 
 
 ))) 
  
 . 
  WithTimeout 
 
 ( 
 totalTimeout 
 ) 
  
 } 
  
 }. 
 BuildAsync 
 (); 
  
 string 
  
 message 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 messageText 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Published message {message}" 
 ); 
  
 } 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 vkit 
  
 "cloud.google.com/go/pubsub/v2/apiv1" 
  
 gax 
  
 "github.com/googleapis/gax-go/v2" 
  
 "google.golang.org/grpc/codes" 
 ) 
 func 
  
 publishWithRetrySettings 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
 , 
  
 msg 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // msg := "Hello World" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 config 
  
 := 
  
& pubsub 
 . 
 ClientConfig 
 { 
  
 TopicAdminCallOptions 
 : 
  
& vkit 
 . 
 TopicAdminCallOptions 
 { 
  
 Publish 
 : 
  
 [] 
 gax 
 . 
 CallOption 
 { 
  
 gax 
 . 
 WithRetry 
 ( 
 func 
 () 
  
 gax 
 . 
 Retryer 
  
 { 
  
 return 
  
 gax 
 . 
 OnCodes 
 ([] 
 codes 
 . 
 Code 
 { 
  
 codes 
 . 
 Aborted 
 , 
  
 codes 
 . 
 Canceled 
 , 
  
 codes 
 . 
 Internal 
 , 
  
 codes 
 . 
 ResourceExhausted 
 , 
  
 codes 
 . 
 Unknown 
 , 
  
 codes 
 . 
 Unavailable 
 , 
  
 codes 
 . 
 DeadlineExceeded 
 , 
  
 }, 
  
 gax 
 . 
 Backoff 
 { 
  
 Initial 
 : 
  
 250 
  
 * 
  
 time 
 . 
 Millisecond 
 , 
  
 // default 100 milliseconds 
  
 Max 
 : 
  
 60 
  
 * 
  
 time 
 . 
 Second 
 , 
  
 // default 60 seconds 
  
 Multiplier 
 : 
  
 1.45 
 , 
  
 // default 1.3 
  
 }) 
  
 }), 
  
 }, 
  
 }, 
  
 } 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClientWithConfig 
 ( 
 ctx 
 , 
  
 projectID 
 , 
  
 config 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub: NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 msg 
 ), 
  
 }) 
  
 // Block until the result is returned and a server-generated 
  
 // ID is returned for the published message. 
  
 id 
 , 
  
 err 
  
 := 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub: result.Get: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Published a message; msg ID: %v\n" 
 , 
  
 id 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 org.threeten.bp. Duration 
 
 ; 
 public 
  
 class 
 PublishWithRetrySettingsExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishWithRetrySettingsExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishWithRetrySettingsExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 // Retry settings control how the publisher handles retry-able failures 
  
  Duration 
 
  
 initialRetryDelay 
  
 = 
  
  Duration 
 
 . 
 ofMillis 
 ( 
 100 
 ); 
  
 // default: 100 ms 
  
 double 
  
 retryDelayMultiplier 
  
 = 
  
 2.0 
 ; 
  
 // back off for repeated failures, default: 1.3 
  
  Duration 
 
  
 maxRetryDelay 
  
 = 
  
  Duration 
 
 . 
 ofSeconds 
 ( 
 60 
 ); 
  
 // default : 60 seconds 
  
  Duration 
 
  
 initialRpcTimeout 
  
 = 
  
  Duration 
 
 . 
 ofSeconds 
 ( 
 1 
 ); 
  
 // default: 5 seconds 
  
 double 
  
 rpcTimeoutMultiplier 
  
 = 
  
 1.0 
 ; 
  
 // default: 1.0 
  
  Duration 
 
  
 maxRpcTimeout 
  
 = 
  
  Duration 
 
 . 
 ofSeconds 
 ( 
 600 
 ); 
  
 // default: 600 seconds 
  
  Duration 
 
  
 totalTimeout 
  
 = 
  
  Duration 
 
 . 
 ofSeconds 
 ( 
 600 
 ); 
  
 // default: 600 seconds 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
 initialRetryDelay 
 ) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 retryDelayMultiplier 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
 maxRetryDelay 
 ) 
  
 . 
  setInitialRpcTimeout 
 
 ( 
 initialRpcTimeout 
 ) 
  
 . 
  setRpcTimeoutMultiplier 
 
 ( 
 rpcTimeoutMultiplier 
 ) 
  
 . 
  setMaxRpcTimeout 
 
 ( 
 maxRpcTimeout 
 ) 
  
 . 
  setTotalTimeout 
 
 ( 
 totalTimeout 
 ) 
  
 . 
 build 
 (); 
  
 // Create a publisher instance with default settings bound to the topic 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
  setRetrySettings 
 
 ( 
 retrySettings 
 ). 
 build 
 (); 
  
 String 
  
 message 
  
 = 
  
 "first message" 
 ; 
  
  ByteString 
 
  
 data 
  
 = 
  
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (). 
  setData 
 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique within the topic) 
  
 ApiFuture<String> 
  
 messageIdFuture 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 pubsubMessage 
 ); 
  
 String 
  
 messageId 
  
 = 
  
 messageIdFuture 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Published a message with retry settings: " 
  
 + 
  
 messageId 
 ); 
  
 } 
  
 finally 
  
 { 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shutdown to free up resources. 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const projectId = 'YOUR_PROJECT_ID' 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // Imports the Google Cloud client library. v1 is for the lower level 
 // proto access. 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 async 
  
 function 
  
 publishWithRetrySettings 
 ( 
 topicNameOrId 
 , 
  
 data 
 ) 
  
 { 
  
 const 
  
 pubsubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
  
 // Retry settings control how the publisher handles retryable failures. Default values are shown. 
  
 // The `retryCodes` array determines which grpc errors will trigger an automatic retry. 
  
 // The `backoffSettings` object lets you specify the behaviour of retries over time. 
  
 // 
  
 // Reference this document to see the current defaults for publishing: 
  
 // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59 
  
 // 
  
 // Please note that _all_ items must be included when passing these settings to topic(). 
  
 // Otherwise, unpredictable (incorrect) defaults may be assumed. 
  
 const 
  
 retrySettings 
  
 = 
  
 { 
  
 retryCodes 
 : 
  
 [ 
  
 10 
 , 
  
 // 'ABORTED' 
  
 1 
 , 
  
 // 'CANCELLED', 
  
 4 
 , 
  
 // 'DEADLINE_EXCEEDED' 
  
 13 
 , 
  
 // 'INTERNAL' 
  
 8 
 , 
  
 // 'RESOURCE_EXHAUSTED' 
  
 14 
 , 
  
 // 'UNAVAILABLE' 
  
 2 
 , 
  
 // 'UNKNOWN' 
  
 ], 
  
 backoffSettings 
 : 
  
 { 
  
 // The initial delay time, in milliseconds, between the completion 
  
 // of the first failed request and the initiation of the first retrying request. 
  
 initialRetryDelayMillis 
 : 
  
 100 
 , 
  
 // The multiplier by which to increase the delay time between the completion 
  
 // of failed requests, and the initiation of the subsequent retrying request. 
  
 retryDelayMultiplier 
 : 
  
 4 
 , 
  
 // The maximum delay time, in milliseconds, between requests. 
  
 // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time. 
  
 maxRetryDelayMillis 
 : 
  
 60000 
 , 
  
 // The initial timeout parameter to the request. 
  
 initialRpcTimeoutMillis 
 : 
  
 60000 
 , 
  
 // The multiplier by which to increase the timeout parameter between failed requests. 
  
 rpcTimeoutMultiplier 
 : 
  
 1.0 
 , 
  
 // The maximum timeout parameter, in milliseconds, for a request. When this value is reached, 
  
 // rpcTimeoutMultiplier will no longer be used to increase the timeout. 
  
 maxRpcTimeoutMillis 
 : 
  
 60000 
 , 
  
 // The total time, in milliseconds, starting from when the initial request is sent, 
  
 // after which an error will be returned, regardless of the retrying attempts made meanwhile. 
  
 totalTimeoutMillis 
 : 
  
 600000 
 , 
  
 }, 
  
 }; 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubsubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 { 
  
 gaxOpts 
 : 
  
 { 
  
 retry 
 : 
  
 retrySettings 
 , 
  
 }, 
  
 }); 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
  from 
 
 ( 
 data 
 ); 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 topic 
 . 
  publishMessage 
 
 ({ 
 data 
 : 
  
 dataBuffer 
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const projectId = 'YOUR_PROJECT_ID' 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // Imports the Google Cloud client library. v1 is for the lower level 
 // proto access. 
 import 
  
 { 
 PubSub 
 } 
  
 from 
  
 '@google-cloud/pubsub' 
 ; 
 async 
  
 function 
  
 publishWithRetrySettings 
 ( 
 topicNameOrId 
 : 
  
 string 
 , 
  
 data 
 : 
  
 string 
 ) 
  
 { 
  
 const 
  
 pubsubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
  
 // Retry settings control how the publisher handles retryable failures. Default values are shown. 
  
 // The `retryCodes` array determines which grpc errors will trigger an automatic retry. 
  
 // The `backoffSettings` object lets you specify the behaviour of retries over time. 
  
 // 
  
 // Reference this document to see the current defaults for publishing: 
  
 // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59 
  
 // 
  
 // Please note that _all_ items must be included when passing these settings to topic(). 
  
 // Otherwise, unpredictable (incorrect) defaults may be assumed. 
  
 const 
  
 retrySettings 
  
 = 
  
 { 
  
 retryCodes 
 : 
  
 [ 
  
 10 
 , 
  
 // 'ABORTED' 
  
 1 
 , 
  
 // 'CANCELLED', 
  
 4 
 , 
  
 // 'DEADLINE_EXCEEDED' 
  
 13 
 , 
  
 // 'INTERNAL' 
  
 8 
 , 
  
 // 'RESOURCE_EXHAUSTED' 
  
 14 
 , 
  
 // 'UNAVAILABLE' 
  
 2 
 , 
  
 // 'UNKNOWN' 
  
 ], 
  
 backoffSettings 
 : 
  
 { 
  
 // The initial delay time, in milliseconds, between the completion 
  
 // of the first failed request and the initiation of the first retrying request. 
  
 initialRetryDelayMillis 
 : 
  
 100 
 , 
  
 // The multiplier by which to increase the delay time between the completion 
  
 // of failed requests, and the initiation of the subsequent retrying request. 
  
 retryDelayMultiplier 
 : 
  
 4 
 , 
  
 // The maximum delay time, in milliseconds, between requests. 
  
 // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time. 
  
 maxRetryDelayMillis 
 : 
  
 60000 
 , 
  
 // The initial timeout parameter to the request. 
  
 initialRpcTimeoutMillis 
 : 
  
 60000 
 , 
  
 // The multiplier by which to increase the timeout parameter between failed requests. 
  
 rpcTimeoutMultiplier 
 : 
  
 1.0 
 , 
  
 // The maximum timeout parameter, in milliseconds, for a request. When this value is reached, 
  
 // rpcTimeoutMultiplier will no longer be used to increase the timeout. 
  
 maxRpcTimeoutMillis 
 : 
  
 60000 
 , 
  
 // The total time, in milliseconds, starting from when the initial request is sent, 
  
 // after which an error will be returned, regardless of the retrying attempts made meanwhile. 
  
 totalTimeoutMillis 
 : 
  
 600000 
 , 
  
 }, 
  
 }; 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubsubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 { 
  
 gaxOpts 
 : 
  
 { 
  
 retry 
 : 
  
 retrySettings 
 , 
  
 }, 
  
 }); 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
 from 
 ( 
 data 
 ); 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 topic 
 . 
 publishMessage 
 ({ 
 data 
 : 
  
 dataBuffer 
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 google 
  
 import 
 api_core 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # Configure the retry settings. Defaults shown in comments are values applied 
 # by the library by default, instead of default values in the Retry object. 
 custom_retry 
 = 
 api_core 
 . 
  retry 
 
 . 
 Retry 
 ( 
 initial 
 = 
 0.250 
 , 
 # seconds (default: 0.1) 
 maximum 
 = 
 90.0 
 , 
 # seconds (default: 60.0) 
 multiplier 
 = 
 1.45 
 , 
 # default: 1.3 
 deadline 
 = 
 300.0 
 , 
 # seconds (default: 60.0) 
 predicate 
 = 
 api_core 
 . 
  retry 
 
 . 
 if_exception_type 
 ( 
 api_core 
 . 
 exceptions 
 . 
 Aborted 
 , 
 api_core 
 . 
 exceptions 
 . 
 DeadlineExceeded 
 , 
 api_core 
 . 
 exceptions 
 . 
 InternalServerError 
 , 
 api_core 
 . 
 exceptions 
 . 
 ResourceExhausted 
 , 
 api_core 
 . 
 exceptions 
 . 
 ServiceUnavailable 
 , 
 api_core 
 . 
 exceptions 
 . 
 Unknown 
 , 
 api_core 
 . 
 exceptions 
 . 
 Cancelled 
 , 
 ), 
 ) 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 for 
 n 
 in 
 range 
 ( 
 1 
 , 
 10 
 ): 
 data_str 
 = 
 f 
 "Message number 
 { 
 n 
 } 
 " 
 # Data must be a bytestring 
 data 
 = 
 data_str 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 future 
 = 
  publish 
 
er . 
  publish 
 
 ( 
 topic 
 = 
 topic_path 
 , 
 data 
 = 
 data 
 , 
 retry 
 = 
 custom_retry 
 ) 
 print 
 ( 
 future 
 . 
 result 
 ()) 
 print 
 ( 
 f 
 "Published messages with retry settings to 
 { 
 topic_path 
 } 
 ." 
 ) 
 

Retry requests with ordering keys

Assume you have a single publisher client. You are using the Pub/Sub client libraries to publish messages 1, 2, and 3 for the same ordering key A. Now, assume that the published response for message 1 is not received by the publisher client before the RPC deadline expires. Message 1 must be republished. The sequence of messages received by the subscriber client then becomes 1, 1, 2, and 3, if you assume message 2 is published only after message 1 gets completed successfully. Each published message has its own message ID. From the subscriber client's perspective, four messages were published, with the first two having identical content.

Retrying publish requests with ordering keys can also be complicated by batch settings. The client library batches messages together for more efficient publishing. Continue with the previous example and assume that messages 1 and 2 are batched together. This batch is sent to the server as a single request. If the server fails to return a response in time, the publisher client retries this batch of two messages. Therefore, it is possible that the subscriber client receives messages 1, 2, 1, 2, and 3. If you use a Pub/Sub client library for publishing messages in order and a publish operation fails, the service fails the publish operations for all remaining messages on the same ordering key. A publisher client can then decide to follow any one of the following operations:

  • Republish all the failed messages in order

  • Republish a subset of the failed messages in order

  • Publish a new set of messages

If a non-retryable error occurs, the client library doesn't publish the message and stops publishing other messages with the same ordering key. For example, when a publisher sends a message to a topic that doesn't exist, a non-retryable error occurs. To continue publishing messages with the same ordering key, call a method to resume publishing and then start publishing again.

The following sample shows you how to resume publishing messages with the same ordering key.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 pubsub 
 :: 
 Publisher 
  
 publisher 
 ) 
  
 { 
  
 struct 
  
 SampleData 
  
 { 
  
 std 
 :: 
 string 
  
 ordering_key 
 ; 
  
 std 
 :: 
 string 
  
 data 
 ; 
  
 } 
  
 data 
 [] 
  
 = 
  
 { 
  
 { 
 "key1" 
 , 
  
 "message1" 
 }, 
  
 { 
 "key2" 
 , 
  
 "message2" 
 }, 
  
 { 
 "key1" 
 , 
  
 "message3" 
 }, 
  
 { 
 "key1" 
 , 
  
 "message4" 
 }, 
  
 { 
 "key1" 
 , 
  
 "message5" 
 }, 
  
 }; 
  
 std 
 :: 
 vector<future<void> 
>  
 done 
 ; 
  
 for 
  
 ( 
 auto 
&  
 datum 
  
 : 
  
 data 
 ) 
  
 { 
  
 auto 
  
 const 
&  
 da 
  
 = 
  
 datum 
 ; 
  
 // workaround MSVC lambda capture confusion 
  
 auto 
  
 handler 
  
 = 
  
 [ 
 da 
 , 
  
 publisher 
 ]( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 mutable 
  
 { 
  
 auto 
  
 const 
  
 msg 
  
 = 
  
 da 
 . 
 ordering_key 
  
 + 
  
 "#" 
  
 + 
  
 da 
 . 
 data 
 ; 
  
 auto 
  
 id 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 id 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "An error has occurred publishing " 
 << 
 msg 
 << 
 " 
 \n 
 " 
 ; 
  
 publisher 
 . 
 ResumePublish 
 ( 
 da 
 . 
 ordering_key 
 ); 
  
 return 
 ; 
  
 } 
  
 std 
 :: 
 cout 
 << 
 "Message " 
 << 
 msg 
 << 
 " published as id=" 
 << 
 * 
 id 
 << 
 " 
 \n 
 " 
 ; 
  
 }; 
  
 done 
 . 
 push_back 
 ( 
  
 publisher 
  
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 {} 
  
 . 
 SetData 
 ( 
 "Hello World! [" 
  
 + 
  
 datum 
 . 
 data 
  
 + 
  
 "]" 
 ) 
  
 . 
 SetOrderingKey 
 ( 
 datum 
 . 
 ordering_key 
 ) 
  
 . 
 Build 
 ()) 
  
 . 
 then 
 ( 
 handler 
 )); 
  
 } 
  
 publisher 
 . 
 Flush 
 (); 
  
 // Block until all the messages are published (optional) 
  
 for 
  
 ( 
 auto 
&  
 f 
  
 : 
  
 done 
 ) 
  
 f 
 . 
 get 
 (); 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 using 
  
 System.Linq 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 ResumePublishSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PublishOrderedMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 IEnumerable 
< ( 
 string 
 , 
  
 string 
 ) 
>  
 keysAndMessages 
 ) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
  FromProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 var 
  
 customSettings 
  
 = 
  
 new 
  
 PublisherClient 
 . 
 Settings 
  
 { 
  
 EnableMessageOrdering 
  
 = 
  
 true 
  
 }; 
  
  PublisherClient 
 
  
 publisher 
  
 = 
  
 await 
  
 new 
  
  PublisherClientBuilder 
 
  
 { 
  
 TopicName 
  
 = 
  
 topicName 
 , 
  
 Settings 
  
 = 
  
 customSettings 
  
 }. 
 BuildAsync 
 (); 
  
 int 
  
 publishedMessageCount 
  
 = 
  
 0 
 ; 
  
 var 
  
 publishTasks 
  
 = 
  
 keysAndMessages 
 . 
 Select 
 ( 
 async 
  
 keyAndMessage 
  
 = 
>  
 { 
  
 try 
  
 { 
  
 string 
  
 message 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 keyAndMessage 
 . 
 Item1 
 , 
  
 keyAndMessage 
 . 
 Item2 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Published message {message}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 publishedMessageCount 
 ); 
  
 } 
  
 catch 
  
 ( 
 Exception 
  
 exception 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}" 
 ); 
  
 publisher 
 . 
 ResumePublish 
 ( 
 keyAndMessage 
 . 
 Item1 
 ); 
  
 } 
  
 }); 
  
 await 
  
 Task 
 . 
 WhenAll 
 ( 
 publishTasks 
 ); 
  
 return 
  
 publishedMessageCount 
 ; 
  
 } 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "google.golang.org/api/option" 
 ) 
 func 
  
 resumePublishWithOrderingKey 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region 
  
 // For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 , 
  
 option 
 . 
 WithEndpoint 
 ( 
 "us-east1-pubsub.googleapis.com:443" 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "pubsub.NewClient: %v" 
 , 
  
 err 
 ) 
  
 return 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 publisher 
 . 
 EnableMessageOrdering 
  
 = 
  
 true 
  
 key 
  
 := 
  
 "some-ordering-key" 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "some-message" 
 ), 
  
 OrderingKey 
 : 
  
 key 
 , 
  
 }) 
  
 _ 
 , 
  
 err 
  
 = 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // Error handling code can be added here. 
  
 fmt 
 . 
 Printf 
 ( 
 "Failed to publish: %s\n" 
 , 
  
 err 
 ) 
  
 // Resume publish on an ordering key that has had unrecoverable errors. 
  
 // After such an error publishes with this ordering key will fail 
  
 // until this method is called. 
  
 publisher 
 . 
 ResumePublish 
 ( 
 key 
 ) 
  
 } 
  
 fmt 
 . 
 Fprint 
 ( 
 w 
 , 
  
 "Published a message with ordering key successfully\n" 
 ) 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutureCallback 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.rpc. ApiException 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.LinkedHashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 ResumePublishWithOrderingKeys 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 // Choose an existing topic. 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 resumePublishWithOrderingKeysExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 resumePublishWithOrderingKeysExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 // Create a publisher and set message ordering to true. 
  
  Publisher 
 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ) 
  
 . 
 setEnableMessageOrdering 
 ( 
 true 
 ) 
  
 . 
 setEndpoint 
 ( 
 "us-east1-pubsub.googleapis.com:443" 
 ) 
  
 . 
 build 
 (); 
  
 try 
  
 { 
  
 Map<String 
 , 
  
 String 
>  
 messages 
  
 = 
  
 new 
  
 LinkedHashMap<String 
 , 
  
 String 
> (); 
  
 messages 
 . 
 put 
 ( 
 "message1" 
 , 
  
 "key1" 
 ); 
  
 messages 
 . 
 put 
 ( 
 "message2" 
 , 
  
 "key2" 
 ); 
  
 messages 
 . 
 put 
 ( 
 "message3" 
 , 
  
 "key1" 
 ); 
  
 messages 
 . 
 put 
 ( 
 "message4" 
 , 
  
 "key2" 
 ); 
  
 for 
  
 ( 
 Map 
 . 
 Entry<String 
 , 
  
 String 
>  
 entry 
  
 : 
  
 messages 
 . 
 entrySet 
 ()) 
  
 { 
  
 ByteString 
  
 data 
  
 = 
  
 ByteString 
 . 
 copyFromUtf8 
 ( 
 entry 
 . 
 getKey 
 ()); 
  
 PubsubMessage 
  
 pubsubMessage 
  
 = 
  
 PubsubMessage 
 . 
 newBuilder 
 (). 
 setData 
 ( 
 data 
 ). 
 setOrderingKey 
 ( 
 entry 
 . 
 getValue 
 ()). 
 build 
 (); 
  
 ApiFuture<String> 
  
 future 
  
 = 
  
 publisher 
 . 
 publish 
 ( 
 pubsubMessage 
 ); 
  
 // Add an asynchronous callback to handle publish success / failure. 
  
 ApiFutures 
 . 
 addCallback 
 ( 
  
 future 
 , 
  
 new 
  
 ApiFutureCallback<String> 
 () 
  
 { 
  
 @Override 
  
 public 
  
 void 
  
 onFailure 
 ( 
 Throwable 
  
 throwable 
 ) 
  
 { 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 ApiException 
 ) 
  
 { 
  
 ApiException 
  
 apiException 
  
 = 
  
 (( 
 ApiException 
 ) 
  
 throwable 
 ); 
  
 // Details on the API exception. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 apiException 
 . 
 getStatusCode 
 (). 
 getCode 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 apiException 
 . 
 isRetryable 
 ()); 
  
 } 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Error publishing message : " 
  
 + 
  
 pubsubMessage 
 . 
 getData 
 ()); 
  
 // (Beta) Must call resumePublish to reset key and continue publishing with order. 
  
 publisher 
 . 
 resumePublish 
 ( 
 pubsubMessage 
 . 
 getOrderingKey 
 ()); 
  
 } 
  
 @Override 
  
 public 
  
 void 
  
 onSuccess 
 ( 
 String 
  
 messageId 
 ) 
  
 { 
  
 // Once published, returns server-assigned message ids (unique within the topic). 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 pubsubMessage 
 . 
 getData 
 () 
  
 + 
  
 " : " 
  
 + 
  
 messageId 
 ); 
  
 } 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 } 
  
 } 
  
 finally 
  
 { 
  
 publisher 
 . 
 shutdown 
 (); 
  
 publisher 
 . 
 awaitTermination 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // const orderingKey = 'key1'; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 async 
  
 function 
  
 resumePublish 
 ( 
 topicNameOrId 
 , 
  
 data 
 , 
  
 orderingKey 
 ) 
  
 { 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
  from 
 
 ( 
 data 
 ); 
  
 const 
  
 publishOptions 
  
 = 
  
 { 
  
 messageOrdering 
 : 
  
 true 
 , 
  
 }; 
  
 // Cache topic objects (publishers) and reuse them. 
  
 // 
  
 // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering 
  
 // key are in the same region. For list of locational endpoints for Pub/Sub, see: 
  
 // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints 
  
 const 
  
 publisher 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 , 
  
 publishOptions 
 ); 
  
 // Publishes the message 
  
 try 
  
 { 
  
 const 
  
 message 
  
 = 
  
 { 
  
 data 
 : 
  
 dataBuffer 
 , 
  
 orderingKey 
 : 
  
 orderingKey 
 , 
  
 }; 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
  publisher 
 
 . 
  publishMessage 
 
 ( 
 message 
 ); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
  
 return 
  
 messageId 
 ; 
  
 } 
  
 catch 
  
 ( 
 e 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 `Could not publish: 
 ${ 
 e 
 } 
 ` 
 ); 
  
  publisher 
 
 . 
  resumePublishing 
 
 ( 
 orderingKey 
 ); 
  
 return 
  
 null 
 ; 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer): Choose an existing topic. 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 publisher_options 
 = 
 pubsub_v1 
 . 
 types 
 . 
  PublisherOptions 
 
 ( 
 enable_message_ordering 
 = 
 True 
 ) 
 # Sending messages to the same region ensures they are received in order 
 # even when multiple publishers are used. 
 client_options 
 = 
 { 
 "api_endpoint" 
 : 
 "us-east1-pubsub.googleapis.com:443" 
 } 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 ( 
 publisher_options 
 = 
 publisher_options 
 , 
 client_options 
 = 
 client_options 
 ) 
 # The `topic_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/topics/{topic_id}` 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 for 
 message 
 in 
 [ 
 ( 
 "message1" 
 , 
 "key1" 
 ), 
 ( 
 "message2" 
 , 
 "key2" 
 ), 
 ( 
 "message3" 
 , 
 "key1" 
 ), 
 ( 
 "message4" 
 , 
 "key2" 
 ), 
 ]: 
 # Data must be a bytestring 
 data 
 = 
 message 
 [ 
 0 
 ] 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 ordering_key 
 = 
 message 
 [ 
 1 
 ] 
 # When you publish a message, the client returns a future. 
 future 
 = 
  publish 
 
er . 
  publish 
 
 ( 
 topic_path 
 , 
 data 
 = 
 data 
 , 
 ordering_key 
 = 
 ordering_key 
 ) 
 try 
 : 
 print 
 ( 
 future 
 . 
 result 
 ()) 
 except 
 RuntimeError 
 : 
 # Resume publish on an ordering key that has had unrecoverable errors. 
 publisher 
 . 
  resume_publish 
 
 ( 
 topic_path 
 , 
 ordering_key 
 ) 
 print 
 ( 
 f 
 "Resumed publishing messages with ordering keys to 
 { 
 topic_path 
 } 
 ." 
 ) 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 # Start sending messages in one request once the size of all queued messages 
 # reaches 1 MB or the number of queued messages reaches 20 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 , 
  
 async 
 : 
  
 { 
  
 max_bytes 
 : 
  
 1_000_000 
 , 
  
 max_messages 
 : 
  
 20 
 } 
 publisher 
 . 
 enable_message_ordering! 
 10 
 . 
 times 
  
 do 
  
 | 
 i 
 | 
  
 publisher 
 . 
  publish_async 
 
  
 "This is message # 
 #{ 
 i 
 } 
 ." 
 , 
  
 ordering_key 
 : 
  
 "ordering-key" 
  
 do 
  
 | 
 result 
 | 
  
 if 
  
 result 
 . 
 succeeded? 
  
 puts 
  
 "Message # 
 #{ 
 i 
 } 
 successfully published." 
  
 else 
  
 puts 
  
 "Message # 
 #{ 
 i 
 } 
 failed to publish" 
  
 # Allow publishing to continue on "ordering-key" after processing the 
  
 # failure. 
  
 publisher 
 . 
 resume_publish 
  
 "ordering-key" 
  
 end 
  
 end 
 end 
 # Stop the async_publisher to send all queued messages immediately. 
 publisher 
 . 
  async_publisher 
 
 . 
 stop! 
 

What's next

To learn how to configure advanced publishing options, see the following:

Create a Mobile Website
View Site in Mobile | Classic
Share by: