Package cloud.google.com/go/pubsublite/pscompat (v1.7.1)

Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service.

This package is designed to compatible with the Cloud Pub/Sub library: https://pkg.go.dev/cloud.google.com/go/pubsub . If interfaces are defined by the client application, PublisherClient and SubscriberClient can be used as substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), respectively, from the pubsub package. See the following examples: https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface and https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface .

The Cloud Pub/Sub and Pub/Sub Lite services have some differences:

  • Pub/Sub Lite does not support NACK for messages. By default, this will terminate the SubscriberClient. A custom function can be provided for ReceiveSettings.NackHandler to handle NACKed messages.
  • Pub/Sub Lite has no concept of ACK deadlines. Subscribers must ACK or NACK every message received and can take as much time as they need to process the message.
  • Pub/Sub Lite PublisherClients and SubscriberClients can fail permanently when an unretryable error occurs.
  • Publishers and subscribers will be throttled if Pub/Sub Lite publish or subscribe throughput limits are exceeded. Thus publishing can be more sensitive to buffer overflow than Cloud Pub/Sub.
  • Pub/Sub Lite utilizes bidirectional gRPC streams extensively to maximize publish and subscribe throughput.

More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite .

Information about choosing between Cloud Pub/Sub vs Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite .

Complete sample programs can be found at https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite .

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.

Constants

MaxPublishRequestCount, MaxPublishRequestBytes

  const 
  
 ( 
  
 // MaxPublishRequestCount is the maximum number of messages that can be 
  
 // batched in a single publish request. 
  
 MaxPublishRequestCount 
  
 = 
  
  wire 
 
 . 
  MaxPublishRequestCount 
 
  
 // MaxPublishRequestBytes is the maximum allowed serialized size of a single 
  
 // publish request (containing a batch of messages) in bytes. 
  
 MaxPublishRequestBytes 
  
 = 
  
  wire 
 
 . 
  MaxPublishRequestBytes 
 
 ) 
 

Variables

ErrOverflow, ErrOversizedMessage, ErrPublisherStopped, ErrBackendUnavailable

  var 
  
 ( 
  
 // ErrOverflow is set for a PublishResult when publish buffers overflow. This 
  
 // can occur when backends are unavailable or the actual publish throughput 
  
 // of clients exceeds the allocated publish throughput for the Pub/Sub Lite 
  
 // topic. Use errors.Is for comparing errors. 
  
 ErrOverflow 
  
 = 
  
  wire 
 
 . 
  ErrOverflow 
 
  
 // ErrOversizedMessage is set for a PublishResult when a published message 
  
 // exceeds MaxPublishRequestBytes. Publishing this message will never succeed. 
  
 // Use errors.Is for comparing errors. 
  
 ErrOversizedMessage 
  
 = 
  
  wire 
 
 . 
  ErrOversizedMessage 
 
  
 // ErrPublisherStopped is set for a PublishResult when a message cannot be 
  
 // published because the publisher client has stopped or is in the process of 
  
 // stopping. It may be stopping due to a fatal error. PublisherClient.Error() 
  
 // returns the error that caused the publisher client to terminate (if any). 
  
 // Use errors.Is for comparing errors. 
  
 ErrPublisherStopped 
  
 = 
  
  wire 
 
 . 
  ErrServiceStopped 
 
  
 // ErrBackendUnavailable indicates that the backend service has been 
  
 // unavailable for a period of time. The timeout can be configured using 
  
 // PublishSettings.Timeout or ReceiveSettings.Timeout. Use errors.Is for 
  
 // comparing errors. 
  
 ErrBackendUnavailable 
  
 = 
  
  wire 
 
 . 
  ErrBackendUnavailable 
 
 ) 
 

DefaultPublishSettings

  var 
  
 DefaultPublishSettings 
  
 = 
  
  PublishSettings 
 
 { 
  
 DelayThreshold 
 : 
  
 10 
  
 * 
  
  time 
 
 . 
  Millisecond 
 
 , 
  
 CountThreshold 
 : 
  
 100 
 , 
  
 ByteThreshold 
 : 
  
 1e6 
 , 
  
 Timeout 
 : 
  
 7 
  
 * 
  
 24 
  
 * 
  
  time 
 
 . 
  Hour 
 
 , 
  
 BufferedByteLimit 
 : 
  
 1e10 
 , 
  
 EnableIdempotence 
 : 
  
  true 
 
 , 
 } 
 

DefaultPublishSettings holds the default values for PublishSettings.

DefaultReceiveSettings

  var 
  
 DefaultReceiveSettings 
  
 = 
  
  ReceiveSettings 
 
 { 
  
 MaxOutstandingMessages 
 : 
  
 1000 
 , 
  
 MaxOutstandingBytes 
 : 
  
 1e9 
 , 
  
 Timeout 
 : 
  
 7 
  
 * 
  
 24 
  
 * 
  
  time 
 
 . 
  Hour 
 
 , 
 } 
 

DefaultReceiveSettings holds the default values for ReceiveSettings.

KeyExtractorFunc

  type 
  
 KeyExtractorFunc 
  
 func 
 ( 
 * 
  pubsub 
 
 . 
  Message 
 
 ) 
  
 [] 
  byte 
 
 

KeyExtractorFunc is a function that extracts an ordering key from a Message.

  type 
  
 MessageMetadata 
  
 struct 
  
 { 
  
 // The topic partition the message was published to. 
  
 Partition 
  
  int 
 
  
 // The offset the message was assigned. 
  
 // 
  
 // If this MessageMetadata was returned for a publish result and publish 
  
 // idempotence was enabled, the offset may be -1 when the message was 
  
 // identified as a duplicate of an already successfully published message, 
  
 // but the server did not have sufficient information to return the message's 
  
 // offset at publish time. Messages received by subscribers will always have 
  
 // the correct offset. 
  
 Offset 
  
  int64 
 
 } 
 

MessageMetadata holds properties of a message published to the Pub/Sub Lite service.

  func 
  
 ParseMessageMetadata 
 ( 
 id 
  
  string 
 
 ) 
  
 ( 
 * 
  MessageMetadata 
 
 , 
  
  error 
 
 ) 
 

ParseMessageMetadata creates MessageMetadata from the ID string of a pubsub.PublishResult returned by PublisherClient or pubsub.Message.ID received from SubscriberClient.

Examples

publisher
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 topic 
  
 = 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClient 
 ( 
 ctx 
 , 
  
 topic 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
 Data 
 : 
  
 [] 
 byte 
 ( 
 "payload" 
 )}) 
  
 id 
 , 
  
 err 
  
 := 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 metadata 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 ParseMessageMetadata 
 ( 
 id 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Published message to partition %d with offset %d\n" 
 , 
  
 metadata 
 . 
 Partition 
 , 
  
 metadata 
 . 
 Offset 
 ) 
 } 
 
subscriber
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 subscription 
  
 = 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClient 
 ( 
 ctx 
 , 
  
 subscription 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 err 
  
 = 
  
 subscriber 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 // TODO: Handle message. 
  
 m 
 . 
 Ack 
 () 
  
 metadata 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 ParseMessageMetadata 
 ( 
 m 
 . 
 ID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Received message from partition %d with offset %d\n" 
 , 
  
 metadata 
 . 
 Partition 
 , 
  
 metadata 
 . 
 Offset 
 ) 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
 } 
 
  func 
  
 ( 
 m 
  
 * 
  MessageMetadata 
 
 ) 
  
 String 
 () 
  
  string 
 
 

NackHandler

  type 
  
 NackHandler 
  
 func 
 ( 
 * 
  pubsub 
 
 . 
  Message 
 
 ) 
  
  error 
 
 

NackHandler is invoked when pubsub.Message.Nack() is called. Pub/Sub Lite does not have a concept of 'nack'. If the nack handler implementation returns nil, the message is acknowledged. If an error is returned, the SubscriberClient will consider this a fatal error and terminate.

In Pub/Sub Lite, only a single subscriber for a given subscription is connected to any partition at a time, and there is no other client that may be able to handle messages.

PublishMessageTransformerFunc

  type 
  
 PublishMessageTransformerFunc 
  
 func 
 ( 
 * 
  pubsub 
 
 . 
  Message 
 
 , 
  
 * 
  pb 
 
 . 
  PubSubMessage 
 
 ) 
  
  error 
 
 

PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite PubSubMessage API proto. If this returns an error, the pubsub.PublishResult will be errored and the PublisherClient will consider this a fatal error and terminate.

PublishSettings

  type 
  
 PublishSettings 
  
 struct 
  
 { 
  
 // Publish a non-empty batch after this delay has passed. If DelayThreshold is 
  
 // 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise 
  
 // must be > 0. 
  
 DelayThreshold 
  
  time 
 
 . 
  Duration 
 
  
 // Publish a batch when it has this many messages. The maximum is 
  
 // MaxPublishRequestCount. If CountThreshold is 0, it will be treated as 
  
 // DefaultPublishSettings.CountThreshold. Otherwise must be > 0. 
  
 CountThreshold 
  
  int 
 
  
 // Publish a batch when its size in bytes reaches this value. The maximum is 
  
 // MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as 
  
 // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. 
  
 ByteThreshold 
  
  int 
 
  
 // The maximum time that the client will attempt to open a publish stream 
  
 // to the server. If Timeout is 0, it will be treated as 
  
 // DefaultPublishSettings.Timeout, otherwise will be clamped to 2 minutes. In 
  
 // the future, setting Timeout to less than 2 minutes will result in an error. 
  
 // 
  
 // If your application has a low tolerance to backend unavailability, set 
  
 // Timeout to a lower duration to detect and handle. When the timeout is 
  
 // exceeded, the PublisherClient will terminate with ErrBackendUnavailable and 
  
 // details of the last error that occurred while trying to reconnect to 
  
 // backends. Note that if the timeout duration is long, ErrOverflow may occur 
  
 // first. 
  
 // 
  
 // If no failover operations need to be performed by the application, it is 
  
 // recommended to just use the default timeout value to avoid the 
  
 // PublisherClient terminating during short periods of backend unavailability. 
  
 Timeout 
  
  time 
 
 . 
  Duration 
 
  
 // The maximum number of bytes that the publisher will keep in memory before 
  
 // returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as 
  
 // DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0. 
  
 // 
  
 // Note that this setting applies per partition. If BufferedByteLimit is being 
  
 // used to bound memory usage, keep in mind the number of partitions in the 
  
 // topic. 
  
 // 
  
 // Note that Pub/Sub Lite topics are provisioned a publishing throughput 
  
 // capacity, per partition, shared by all publisher clients. Setting a large 
  
 // buffer size can mitigate transient publish spikes. However, consistently 
  
 // attempting to publish messages at a much higher rate than the publishing 
  
 // throughput capacity can cause the buffers to overflow. For more 
  
 // information, see https://cloud.google.com/pubsub/lite/docs/topics. 
  
 BufferedByteLimit 
  
  int 
 
  
 // Whether idempotence is enabled, where the server will ensure that unique 
  
 // messages within a single publisher session are stored only once. Default 
  
 // true. 
  
 EnableIdempotence 
  
  optional 
 
 . 
  Bool 
 
  
 // Optional custom function that extracts an ordering key from a Message. The 
  
 // default implementation extracts the key from Message.OrderingKey. 
  
 KeyExtractor 
  
  KeyExtractorFunc 
 
  
 // Optional custom function that transforms a pubsub.Message to a 
  
 // PubSubMessage API proto. 
  
 MessageTransformer 
  
  PublishMessageTransformerFunc 
 
  
 // contains filtered or unexported fields 
 } 
 

PublishSettings configure the PublisherClient. Batching settings (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per partition.

A zero PublishSettings will result in values equivalent to DefaultPublishSettings.

PublisherClient

  type 
  
 PublisherClient 
  
 struct 
  
 { 
  
 // contains filtered or unexported fields 
 } 
 

PublisherClient is a Pub/Sub Lite client to publish messages to a given topic. A PublisherClient is safe to use from multiple goroutines.

PublisherClients are expected to be long-lived and used for the duration of the application, rather than for publishing small batches of messages. Stop must be called to release resources when a PublisherClient is no longer required.

See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing.

func NewPublisherClient

  func 
  
 NewPublisherClient 
 ( 
 ctx 
  
  context 
 
 . 
  Context 
 
 , 
  
 topic 
  
  string 
 
 , 
  
 opts 
  
 ... 
  option 
 
 . 
  ClientOption 
 
 ) 
  
 ( 
 * 
  PublisherClient 
 
 , 
  
  error 
 
 ) 
 

NewPublisherClient creates a new Pub/Sub Lite publisher client to publish messages to a given topic, using DefaultPublishSettings. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".

Stop must be called to release resources when a PublisherClient is no longer required.

Example

interface
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 // publisherInterface is implemented by both pscompat.PublisherClient and 
  
 // pubsub.Topic. 
  
 type 
  
 publisherInterface 
  
 interface 
  
 { 
  
 Publish 
 ( 
 context 
 . 
 Context 
 , 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 * 
 pubsub 
 . 
  PublishResult 
 
  
 Stop 
 () 
  
 } 
  
 publish 
  
 := 
  
 func 
 ( 
 publisher 
  
 publisherInterface 
 ) 
  
 { 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 // TODO: Publish messages. 
  
 } 
  
 // Create a Pub/Sub Lite publisher client. 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClient 
 ( 
 ctx 
 , 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 publish 
 ( 
 publisher 
 ) 
  
 // Create a Cloud Pub/Sub topic to publish. 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 "my-project" 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 topic 
  
 := 
  
 client 
 . 
 Topic 
 ( 
 "my-topic" 
 ) 
  
 publish 
 ( 
 topic 
 ) 
 } 
 

func NewPublisherClientWithSettings

  func 
  
 NewPublisherClientWithSettings 
 ( 
 ctx 
  
  context 
 
 . 
  Context 
 
 , 
  
 topic 
  
  string 
 
 , 
  
 settings 
  
  PublishSettings 
 
 , 
  
 opts 
  
 ... 
  option 
 
 . 
  ClientOption 
 
 ) 
  
 ( 
 * 
  PublisherClient 
 
 , 
  
  error 
 
 ) 
 

NewPublisherClientWithSettings creates a new Pub/Sub Lite publisher client to publish messages to a given topic, using the specified PublishSettings. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".

Stop must be called to release resources when a PublisherClient is no longer required.

func (*PublisherClient) Error

  func 
  
 ( 
 p 
  
 * 
  PublisherClient 
 
 ) 
  
 Error 
 () 
  
  error 
 
 

Error returns the error that caused the publisher client to terminate. The error returned here may contain more context than PublishResult errors. The return value may be nil if Stop() was called.

func (*PublisherClient) Publish

Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the client's PublishSettings. Publish never blocks.

Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server. Retryable errors are automatically handled. If a PublishResult returns an error, this indicates that the publisher client encountered a fatal error and can no longer be used. Fatal errors should be manually inspected and the cause resolved. A new publisher client instance must be created to republish failed messages.

Once Stop() has been called or the publisher client has failed permanently due to an error, future calls to Publish will immediately return a PublishResult with error ErrPublisherStopped.

Error() returns the error that caused the publisher client to terminate and may contain more context than the error returned by PublishResult.

Examples

  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 topic 
  
 = 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClient 
 ( 
 ctx 
 , 
  
 topic 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 var 
  
 results 
  
 [] 
 * 
 pubsub 
 . 
  PublishResult 
 
  
 r 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "hello world" 
 ), 
  
 }) 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 r 
 ) 
  
 // Publish more messages ... 
  
 var 
  
 publishFailed 
  
 bool 
  
 for 
  
 _ 
 , 
  
 r 
  
 := 
  
 range 
  
 results 
  
 { 
  
 id 
 , 
  
 err 
  
 := 
  
 r 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 publishFailed 
  
 = 
  
 true 
  
 continue 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Published a message with a message ID: %s\n" 
 , 
  
 id 
 ) 
  
 } 
  
 // NOTE: A failed PublishResult indicates that the publisher client 
  
 // encountered a fatal error and has permanently terminated. After the fatal 
  
 // error has been resolved, a new publisher client instance must be created to 
  
 // republish failed messages. 
  
 if 
  
 publishFailed 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Publisher client terminated due to error: %v\n" 
 , 
  
 publisher 
 . 
 Error 
 ()) 
  
 } 
 } 
 
batchingSettings
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "time" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 topic 
  
 = 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
  
 settings 
  
 := 
  
 pscompat 
 . 
 PublishSettings 
 { 
  
 DelayThreshold 
 : 
  
 50 
  
 * 
  
 time 
 . 
 Millisecond 
 , 
  
 CountThreshold 
 : 
  
 200 
 , 
  
 BufferedByteLimit 
 : 
  
 5e8 
 , 
  
 } 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClientWithSettings 
 ( 
 ctx 
 , 
  
 topic 
 , 
  
 settings 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 var 
  
 results 
  
 [] 
 * 
 pubsub 
 . 
  PublishResult 
 
  
 r 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "hello world" 
 ), 
  
 }) 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 r 
 ) 
  
 // Publish more messages ... 
  
 var 
  
 publishFailed 
  
 bool 
  
 for 
  
 _ 
 , 
  
 r 
  
 := 
  
 range 
  
 results 
  
 { 
  
 id 
 , 
  
 err 
  
 := 
  
 r 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 publishFailed 
  
 = 
  
 true 
  
 continue 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Published a message with a message ID: %s\n" 
 , 
  
 id 
 ) 
  
 } 
  
 // NOTE: A failed PublishResult indicates that the publisher client 
  
 // encountered a fatal error and has permanently terminated. After the fatal 
  
 // error has been resolved, a new publisher client instance must be created to 
  
 // republish failed messages. 
  
 if 
  
 publishFailed 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Publisher client terminated due to error: %v\n" 
 , 
  
 publisher 
 . 
 Error 
 ()) 
  
 } 
 } 
 
earlyTokenRefresh
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "log" 
  
 "time" 
  
 "cloud.google.com/go/pubsub" 
  
 api 
  
 "cloud.google.com/go/pubsublite/apiv1" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
  
 "golang.org/x/oauth2/google" 
  
 "google.golang.org/api/option" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 topic 
  
 = 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
  
 params 
  
 := 
  
 google 
 . 
 CredentialsParams 
 { 
  
 Scopes 
 : 
  
 api 
 . 
 DefaultAuthScopes 
 (), 
  
 EarlyTokenRefresh 
 : 
  
 5 
  
 * 
  
 time 
 . 
 Minute 
 , 
  
 } 
  
 creds 
 , 
  
 err 
  
 := 
  
 google 
 . 
 FindDefaultCredentialsWithParams 
 ( 
 ctx 
 , 
  
 params 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatalf 
 ( 
 "No 'Application Default Credentials' found: %v." 
 , 
  
 err 
 ) 
  
 } 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClient 
 ( 
 ctx 
 , 
  
 topic 
 , 
  
 option 
 . 
 WithTokenSource 
 ( 
 creds 
 . 
 TokenSource 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 var 
  
 results 
  
 [] 
 * 
 pubsub 
 . 
  PublishResult 
 
  
 r 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "hello world" 
 ), 
  
 }) 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 r 
 ) 
  
 // Publish more messages ... 
  
 var 
  
 publishFailed 
  
 bool 
  
 for 
  
 _ 
 , 
  
 r 
  
 := 
  
 range 
  
 results 
  
 { 
  
 id 
 , 
  
 err 
  
 := 
  
 r 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 publishFailed 
  
 = 
  
 true 
  
 continue 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Published a message with a message ID: %s\n" 
 , 
  
 id 
 ) 
  
 } 
  
 // NOTE: A failed PublishResult indicates that the publisher client 
  
 // encountered a fatal error and has permanently terminated. After the fatal 
  
 // error has been resolved, a new publisher client instance must be created to 
  
 // republish failed messages. 
  
 if 
  
 publishFailed 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Publisher client terminated due to error: %v\n" 
 , 
  
 publisher 
 . 
 Error 
 ()) 
  
 } 
 } 
 
errorHandling
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "errors" 
  
 "fmt" 
  
 "sync" 
  
 "time" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
  
 "golang.org/x/sync/errgroup" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 topic 
  
 = 
  
 "projects/my-project/locations/region-or-zone/topics/my-topic" 
  
 settings 
  
 := 
  
 pscompat 
 . 
 PublishSettings 
 { 
  
 // The PublisherClient will terminate when it cannot connect to backends for 
  
 // more than 10 minutes. 
  
 Timeout 
 : 
  
 10 
  
 * 
  
 time 
 . 
 Minute 
 , 
  
 // Sets a conservative publish buffer byte limit, per partition. 
  
 BufferedByteLimit 
 : 
  
 1e8 
 , 
  
 } 
  
 publisher 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClientWithSettings 
 ( 
 ctx 
 , 
  
 topic 
 , 
  
 settings 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 publisher 
 . 
 Stop 
 () 
  
 var 
  
 toRepublish 
  
 [] 
 * 
 pubsub 
 . 
 Message 
  
 var 
  
 mu 
  
 sync 
 . 
 Mutex 
  
 g 
  
 := 
  
 new 
 ( 
 errgroup 
 . 
 Group 
 ) 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 10 
 ; 
  
 i 
 ++ 
  
 { 
  
 msg 
  
 := 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 fmt 
 . 
 Sprintf 
 ( 
 "message-%d" 
 , 
  
 i 
 )), 
  
 } 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
 msg 
 ) 
  
 g 
 . 
 Go 
 ( 
 func 
 () 
  
 error 
  
 { 
  
 id 
 , 
  
 err 
  
 := 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // NOTE: A failed PublishResult indicates that the publisher client has 
  
 // permanently terminated. A new publisher client instance must be 
  
 // created to republish failed messages. 
  
 fmt 
 . 
 Printf 
 ( 
 "Publish error: %v\n" 
 , 
  
 err 
 ) 
  
 // Oversized messages cannot be published. 
  
 if 
  
 ! 
 errors 
 . 
 Is 
 ( 
 err 
 , 
  
 pscompat 
 . 
  ErrOversizedMessage 
 
 ) 
  
 { 
  
 mu 
 . 
 Lock 
 () 
  
 toRepublish 
  
 = 
  
 append 
 ( 
 toRepublish 
 , 
  
 msg 
 ) 
  
 mu 
 . 
 Unlock 
 () 
  
 } 
  
 return 
  
 err 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Published a message with a message ID: %s\n" 
 , 
  
 id 
 ) 
  
 return 
  
 nil 
  
 }) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 g 
 . 
 Wait 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Publisher client terminated due to error: %v\n" 
 , 
  
 publisher 
 . 
 Error 
 ()) 
  
 switch 
  
 { 
  
 case 
  
 errors 
 . 
 Is 
 ( 
 publisher 
 . 
 Error 
 (), 
  
 pscompat 
 . 
  ErrBackendUnavailable 
 
 ): 
  
 // TODO: Create a new publisher client to republish failed messages. 
  
 case 
  
 errors 
 . 
 Is 
 ( 
 publisher 
 . 
 Error 
 (), 
  
 pscompat 
 . 
  ErrOverflow 
 
 ): 
  
 // TODO: Create a new publisher client to republish failed messages. 
  
 // Throttle publishing. Note that backend unavailability can also cause 
  
 // buffer overflow before the ErrBackendUnavailable error. 
  
 default 
 : 
  
 // TODO: Inspect and handle fatal error. 
  
 } 
  
 } 
 } 
 

func (*PublisherClient) Stop

  func 
  
 ( 
 p 
  
 * 
  PublisherClient 
 
 ) 
  
 Stop 
 () 
 

Stop sends all remaining published messages and closes publish streams. Returns once all outstanding messages have been sent or have failed to be sent. Stop should be called when the client is no longer required.

ReassignmentHandlerFunc

  type 
  
 ReassignmentHandlerFunc 
  
 func 
 ( 
 previousPartitions 
 , 
  
 nextPartitions 
  
 [] 
  int 
 
 ) 
  
  error 
 
 

ReassignmentHandlerFunc is called any time a new partition assignment is received from the server. It will be called with both the previous and new partition numbers as decided by the server. Both slices of partition numbers are sorted in ascending order.

When this handler is called, partitions that are being assigned away are stopping and new partitions are starting. Acks and nacks for messages from partitions that are being assigned away will have no effect, but message deliveries may still be in flight.

The client library will not acknowledge the assignment until this handler returns. The server will not assign any of the partitions in previousPartitions to another client unless the assignment is acknowledged, or a client takes too long to acknowledge (currently 30 seconds from the time the assignment is sent from server's point of view).

Because of the above, as long as reassignment handling is processed quickly, it can be used to abort outstanding operations on partitions which are being assigned away from this client.

If this handler returns an error, the SubscriberClient will consider this a fatal error and terminate.

ReceiveMessageTransformerFunc

  type 
  
 ReceiveMessageTransformerFunc 
  
 func 
 ( 
 * 
  pb 
 
 . 
  SequencedMessage 
 
 , 
  
 * 
  pubsub 
 
 . 
  Message 
 
 ) 
  
  error 
 
 

ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API proto to a pubsub.Message. The implementation must not set pubsub.Message.ID.

If this returns an error, the SubscriberClient will consider this a fatal error and terminate.

ReceiveSettings

  type 
  
 ReceiveSettings 
  
 struct 
  
 { 
  
 // MaxOutstandingMessages is the maximum number of unacknowledged messages. 
  
 // If MaxOutstandingMessages is 0, it will be treated as 
  
 // DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0. 
  
 MaxOutstandingMessages 
  
  int 
 
  
 // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged 
  
 // messages. If MaxOutstandingBytes is 0, it will be treated as 
  
 // DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0. 
  
 // 
  
 // Note that this setting applies per partition. If MaxOutstandingBytes is 
  
 // being used to bound memory usage, keep in mind the number of partitions in 
  
 // the associated topic. 
  
 MaxOutstandingBytes 
  
  int 
 
  
 // The maximum time that the client will attempt to open a subscribe stream 
  
 // to the server. If Timeout is 0, it will be treated as 
  
 // DefaultReceiveSettings.Timeout, otherwise will be clamped to 2 minutes. In 
  
 // the future, setting Timeout to less than 2 minutes will result in an error. 
  
 // 
  
 // If your application has a low tolerance to backend unavailability, set 
  
 // Timeout to a lower duration to detect and handle. When the timeout is 
  
 // exceeded, the SubscriberClient will terminate with ErrBackendUnavailable 
  
 // and details of the last error that occurred while trying to reconnect to 
  
 // backends. 
  
 // 
  
 // If no failover operations need to be performed by the application, it is 
  
 // recommended to just use the default timeout value to avoid the 
  
 // SubscriberClient terminating during short periods of backend 
  
 // unavailability. 
  
 Timeout 
  
  time 
 
 . 
  Duration 
 
  
 // The topic partition numbers (zero-indexed) to receive messages from. 
  
 // Values must be less than the number of partitions for the topic. If not 
  
 // specified, the SubscriberClient will use the partition assignment service 
  
 // to determine which partitions it should connect to. 
  
 Partitions 
  
 [] 
  int 
 
  
 // Optional custom function to handle pubsub.Message.Nack() calls. If not set, 
  
 // the default behavior is to terminate the SubscriberClient. 
  
 NackHandler 
  
  NackHandler 
 
  
 // Optional custom function that transforms a SequencedMessage API proto to a 
  
 // pubsub.Message. 
  
 MessageTransformer 
  
  ReceiveMessageTransformerFunc 
 
  
 // Optional custom function that is called when a new partition assignment has 
  
 // been delivered to the client. 
  
 ReassignmentHandler 
  
  ReassignmentHandlerFunc 
 
 } 
 

ReceiveSettings configure the SubscriberClient. Flow control settings (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition.

A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.

SubscriberClient

  type 
  
 SubscriberClient 
  
 struct 
  
 { 
  
 // contains filtered or unexported fields 
 } 
 

SubscriberClient is a Pub/Sub Lite client to receive messages for a given subscription.

See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages.

func NewSubscriberClient

  func 
  
 NewSubscriberClient 
 ( 
 ctx 
  
  context 
 
 . 
  Context 
 
 , 
  
 subscription 
  
  string 
 
 , 
  
 opts 
  
 ... 
  option 
 
 . 
  ClientOption 
 
 ) 
  
 ( 
 * 
  SubscriberClient 
 
 , 
  
  error 
 
 ) 
 

NewSubscriberClient creates a new Pub/Sub Lite client to receive messages for a given subscription, using DefaultReceiveSettings. A valid subscription path has the format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID".

Example

interface
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 // subscriberInterface is implemented by both pscompat.SubscriberClient and 
  
 // pubsub.Subscription. 
  
 type 
  
 subscriberInterface 
  
 interface 
  
 { 
  
 Receive 
 ( 
 context 
 . 
 Context 
 , 
  
 func 
 ( 
 context 
 . 
 Context 
 , 
  
 * 
 pubsub 
 . 
 Message 
 )) 
  
 error 
  
 } 
  
 receive 
  
 := 
  
 func 
 ( 
 subscriber 
  
 subscriberInterface 
 ) 
  
 { 
  
 // TODO: Receive messages. 
  
 } 
  
 // Create a Pub/Sub Lite subscriber client. 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClient 
 ( 
 ctx 
 , 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 receive 
 ( 
 subscriber 
 ) 
  
 // Create a Cloud Pub/Sub subscription to receive. 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 "my-project" 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 subscription 
  
 := 
  
 client 
 . 
 Subscription 
 ( 
 "my-subscription" 
 ) 
  
 receive 
 ( 
 subscription 
 ) 
 } 
 

func NewSubscriberClientWithSettings

  func 
  
 NewSubscriberClientWithSettings 
 ( 
 ctx 
  
  context 
 
 . 
  Context 
 
 , 
  
 subscription 
  
  string 
 
 , 
  
 settings 
  
  ReceiveSettings 
 
 , 
  
 opts 
  
 ... 
  option 
 
 . 
  ClientOption 
 
 ) 
  
 ( 
 * 
  SubscriberClient 
 
 , 
  
  error 
 
 ) 
 

NewSubscriberClientWithSettings creates a new Pub/Sub Lite client to receive messages for a given subscription, using the specified ReceiveSettings. A valid subscription path has the format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID".

func (*SubscriberClient) Receive

Receive calls f with the messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.

The standard way to terminate a Receive is to cancel its context:

 cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, callback)
// Call cancel from callback, or another goroutine. 

If there is a fatal service error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged. The context passed to f will be canceled when ctx is Done or there is a fatal service error.

Receive calls f concurrently from multiple goroutines if the SubscriberClient is connected to multiple partitions. Only one call from any connected partition will be outstanding at a time, and blocking in the receiver callback f will block the delivery of subsequent messages for the partition.

All messages received by f must be ACKed or NACKed. Failure to do so can prevent Receive from returning. Messages may be processed by the client concurrently and ACKed asynchronously to increase throughput.

Each SubscriberClient may have only one invocation of Receive active at a time.

Examples

  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 subscription 
  
 = 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClient 
 ( 
 ctx 
 , 
  
 subscription 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 cctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithCancel 
 ( 
 ctx 
 ) 
  
 err 
  
 = 
  
 subscriber 
 . 
 Receive 
 ( 
 cctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 // TODO: Handle message. 
  
 // NOTE: May be called concurrently; synchronize access to shared memory. 
  
 m 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 // Call cancel from the receiver callback or another goroutine to stop 
  
 // receiving. 
  
 cancel 
 () 
 } 
 
errorHandling
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "errors" 
  
 "fmt" 
  
 "time" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 subscription 
  
 = 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
  
 settings 
  
 := 
  
 pscompat 
 . 
 ReceiveSettings 
 { 
  
 // The SubscriberClient will terminate when it cannot connect to backends 
  
 // for more than 5 minutes. 
  
 Timeout 
 : 
  
 5 
  
 * 
  
 time 
 . 
 Minute 
 , 
  
 } 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClientWithSettings 
 ( 
 ctx 
 , 
  
 subscription 
 , 
  
 settings 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 for 
  
 { 
  
 cctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithCancel 
 ( 
 ctx 
 ) 
  
 err 
  
 = 
  
 subscriber 
 . 
 Receive 
 ( 
 cctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 // TODO: Handle message. 
  
 // NOTE: May be called concurrently; synchronize access to shared memory. 
  
 m 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Subscriber client stopped receiving due to error: %v\n" 
 , 
  
 err 
 ) 
  
 if 
  
 errors 
 . 
 Is 
 ( 
 err 
 , 
  
 pscompat 
 . 
  ErrBackendUnavailable 
 
 ) 
  
 { 
  
 // TODO: Alert if necessary. Receive can be retried. 
  
 } 
  
 else 
  
 { 
  
 // TODO: Handle fatal error. 
  
 break 
  
 } 
  
 } 
  
 // Call cancel from the receiver callback or another goroutine to stop 
  
 // receiving. 
  
 cancel 
 () 
  
 } 
 } 
 
manualPartitionAssignment
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 subscription 
  
 = 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
  
 settings 
  
 := 
  
 pscompat 
 . 
 ReceiveSettings 
 { 
  
 // NOTE: The corresponding topic must have 2 or more partitions. 
  
 Partitions 
 : 
  
 [] 
 int 
 { 
 0 
 , 
  
 1 
 }, 
  
 } 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClientWithSettings 
 ( 
 ctx 
 , 
  
 subscription 
 , 
  
 settings 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 cctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithCancel 
 ( 
 ctx 
 ) 
  
 err 
  
 = 
  
 subscriber 
 . 
 Receive 
 ( 
 cctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 // TODO: Handle message. 
  
 // NOTE: May be called concurrently; synchronize access to shared memory. 
  
 m 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 // Call cancel from the receiver callback or another goroutine to stop 
  
 // receiving. 
  
 cancel 
 () 
 } 
 
maxOutstanding
  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 const 
  
 subscription 
  
 = 
  
 "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" 
  
 settings 
  
 := 
  
 pscompat 
 . 
 ReceiveSettings 
 { 
  
 MaxOutstandingMessages 
 : 
  
 5 
 , 
  
 MaxOutstandingBytes 
 : 
  
 10e6 
 , 
  
 } 
  
 subscriber 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClientWithSettings 
 ( 
 ctx 
 , 
  
 subscription 
 , 
  
 settings 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 cctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithCancel 
 ( 
 ctx 
 ) 
  
 err 
  
 = 
  
 subscriber 
 . 
 Receive 
 ( 
 cctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 // TODO: Handle message. 
  
 // NOTE: May be called concurrently; synchronize access to shared memory. 
  
 m 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 // Call cancel from the receiver callback or another goroutine to stop 
  
 // receiving. 
  
 cancel 
 () 
 } 
 
Create a Mobile Website
View Site in Mobile | Classic
Share by: