Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs
See https://godoc.org/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Publishing
Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:
topic , err := pubsubClient . CreateTopic ( context . Background (), "topic-name" )
Messages may then be published to a topic:
res := topic . Publish ( ctx , & pubsub . Message { Data : [] byte ( "payload" )})
Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.
Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.
The first time you call Publish on a topic, goroutines are started in the background. To clean up these goroutines, call Stop:
topic . Stop ()
Receiving
To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Subsciptions may be created like so:
sub , err := pubsubClient . CreateSubscription ( context . Background (), "sub-name" , pubsub . SubscriptionConfig { Topic : topic })
Messages are then consumed from a subscription via callback.
err := sub . Receive ( context . Background (), func ( ctx context . Context , m * Message ) { log . Printf ( "Got message: %s" , m . Data ) m . Ack () }) if err != nil { // Handle error. }
The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Receive, cancel its context.
Once client code has processed the message, it must call Message.Ack or Message.Nack, otherwise the message will eventually be redelivered. Ack/Nack MUST be called within the Receive handler function, and not from a goroutine. Otherwise, flow control (e.g. ReceiveSettings.MaxOutstandingMessages) will not be respected, and messages can get orphaned when cancelling Receive.
If the client cannot or doesn't want to process the message, it can call Message.Nack to speed redelivery. For more information and configuration options, see "Deadlines" below.
Note: It is possible for Messages to be redelivered, even if Message.Ack has been called. Client code must be robust to multiple deliveries of messages.
Note: This uses pubsub's streaming pull feature. This feature properties that may be surprising. Please take a look at https://cloud.google.com/pubsub/docs/pull#streamingpull for more details on how streaming pull behaves compared to the synchronous pull method.
Deadlines
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become eligible for redelivery.
As a convenience, the pubsub client will automatically extend deadlines until either:
- Message.Ack or Message.Nack is called, or
- The "MaxExtension" period elapses from the time the message is fetched from the server.
ACK deadlines are extended periodically by the client. The initial ACK deadline given to messages is 10s. The period between extensions, as well as the length of the extension, automatically adjust depending on the time it takes to ack messages, up to 10m. This has the effect that subscribers that process messages quickly have their message ack deadlines extended for a short amount, whereas subscribers that process message slowly have their message ack deadlines extended for a large amount. The net effect is fewer RPCs sent from the client library.
For example, consider a subscriber that takes 3 minutes to process each message. Since the library has already recorded several 3 minute "time to ack"s in a percentile distribution, future message extensions are sent with a value of 3 minutes, every 3 minutes. Suppose the application crashes 5 seconds after the library sends such an extension: the Pub/Sub server would wait the remaining 2m55s before re-sending the messages out to other subscribers.
Please note that the client library does not use the subscription's AckDeadline by default. To enforce the subscription AckDeadline, set MaxExtension to the subscription's AckDeadline:
cfg , err := sub . Config ( ctx ) if err != nil { // TODO: handle err } sub . ReceiveSettings . MaxExtension = cfg . AckDeadline
Slow Message Processing
For use cases where message processing exceeds 30 minutes, we recommend using the base client in a pull model, since long-lived streams are periodically killed by firewalls. See the example at https://godoc.org/cloud.google.com/go/pubsub/apiv1#example-SubscriberClient-Pull-LengthyClientProcessing
Emulator
To use an emulator with this library, you can set the PUBSUB_EMULATOR_HOST environment variable to the address at which your emulator is running. This will send requests to that address instead of to Cloud Pub/Sub. You can then create and use a client as usual:
// Set PUBSUB_EMULATOR_HOST environment variable. err := os . Setenv ( "PUBSUB_EMULATOR_HOST" , "localhost:9000" ) if err != nil { // TODO: Handle error. } // Create client as usual. client , err := pubsub . NewClient ( ctx , "my-project-id" ) if err != nil { // TODO: Handle error. } defer client . Close ()
Constants
ScopePubSub, ScopeCloudPlatform
const
(
// ScopePubSub grants permissions to view and manage Pub/Sub
// topics and subscriptions.
ScopePubSub
=
"https://www.googleapis.com/auth/pubsub"
// ScopeCloudPlatform grants permissions to view and manage your data
// across Google Cloud Platform services.
ScopeCloudPlatform
=
"https://www.googleapis.com/auth/cloud-platform"
)
MaxPublishRequestCount, MaxPublishRequestBytes
const
(
// MaxPublishRequestCount is the maximum number of messages that can be in
// a single publish request, as defined by the PubSub service.
MaxPublishRequestCount
=
1000
// MaxPublishRequestBytes is the maximum size of a single publish request
// in bytes, as defined by the PubSub service.
MaxPublishRequestBytes
=
1e7
)
Variables
ErrFlowControllerMaxOutstandingMessages, ErrFlowControllerMaxOutstandingBytes
var
(
// ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages.
ErrFlowControllerMaxOutstandingMessages
=
errors
.
New
(
"pubsub: MaxOutstandingMessages flow controller limit exceeded"
)
// ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes.
ErrFlowControllerMaxOutstandingBytes
=
errors
.
New
(
"pubsub: MaxOutstandingBytes flow control limit exceeded"
)
)
PublishedMessages, PublishLatency, PullCount, AckCount, NackCount, ModAckCount, ModAckTimeoutCount, StreamOpenCount, StreamRetryCount, StreamRequestCount, StreamResponseCount, OutstandingMessages, OutstandingBytes
var
(
// PublishedMessages is a measure of the number of messages published, which may include errors.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishedMessages
=
stats
.
Int64
(
statsPrefix
+
"published_messages"
,
"Number of PubSub message published"
,
stats
.
UnitDimensionless
)
// PublishLatency is a measure of the number of milliseconds it took to publish a bundle,
// which may consist of one or more messages.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishLatency
=
stats
.
Float64
(
statsPrefix
+
"publish_roundtrip_latency"
,
"The latency in milliseconds per publish batch"
,
stats
.
UnitMilliseconds
)
// PullCount is a measure of the number of messages pulled.
// It is EXPERIMENTAL and subject to change or removal without notice.
PullCount
=
stats
.
Int64
(
statsPrefix
+
"pull_count"
,
"Number of PubSub messages pulled"
,
stats
.
UnitDimensionless
)
// AckCount is a measure of the number of messages acked.
// It is EXPERIMENTAL and subject to change or removal without notice.
AckCount
=
stats
.
Int64
(
statsPrefix
+
"ack_count"
,
"Number of PubSub messages acked"
,
stats
.
UnitDimensionless
)
// NackCount is a measure of the number of messages nacked.
// It is EXPERIMENTAL and subject to change or removal without notice.
NackCount
=
stats
.
Int64
(
statsPrefix
+
"nack_count"
,
"Number of PubSub messages nacked"
,
stats
.
UnitDimensionless
)
// ModAckCount is a measure of the number of messages whose ack-deadline was modified.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckCount
=
stats
.
Int64
(
statsPrefix
+
"mod_ack_count"
,
"Number of ack-deadlines modified"
,
stats
.
UnitDimensionless
)
// ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckTimeoutCount
=
stats
.
Int64
(
statsPrefix
+
"mod_ack_timeout_count"
,
"Number of ModifyAckDeadline RPCs that timed out"
,
stats
.
UnitDimensionless
)
// StreamOpenCount is a measure of the number of times a streaming-pull stream was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamOpenCount
=
stats
.
Int64
(
statsPrefix
+
"stream_open_count"
,
"Number of calls opening a new streaming pull"
,
stats
.
UnitDimensionless
)
// StreamRetryCount is a measure of the number of times a streaming-pull operation was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRetryCount
=
stats
.
Int64
(
statsPrefix
+
"stream_retry_count"
,
"Number of retries of a stream send or receive"
,
stats
.
UnitDimensionless
)
// StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRequestCount
=
stats
.
Int64
(
statsPrefix
+
"stream_request_count"
,
"Number gRPC StreamingPull request messages sent"
,
stats
.
UnitDimensionless
)
// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCount
=
stats
.
Int64
(
statsPrefix
+
"stream_response_count"
,
"Number of gRPC StreamingPull response messages received"
,
stats
.
UnitDimensionless
)
// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessages
=
stats
.
Int64
(
statsPrefix
+
"outstanding_messages"
,
"Number of outstanding Pub/Sub messages"
,
stats
.
UnitDimensionless
)
// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes
=
stats
.
Int64
(
statsPrefix
+
"outstanding_bytes"
,
"Number of outstanding bytes"
,
stats
.
UnitDimensionless
)
)
The following are measures recorded in publish/subscribe flows.
PublishedMessagesView, PublishLatencyView, PullCountView, AckCountView, NackCountView, ModAckCountView, ModAckTimeoutCountView, StreamOpenCountView, StreamRetryCountView, StreamRequestCountView, StreamResponseCountView, OutstandingMessagesView, OutstandingBytesView
var
(
// PublishedMessagesView is a cumulative sum of PublishedMessages.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishedMessagesView
*
view
.
View
// PublishLatencyView is a distribution of PublishLatency.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishLatencyView
*
view
.
View
// PullCountView is a cumulative sum of PullCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
PullCountView
*
view
.
View
// AckCountView is a cumulative sum of AckCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AckCountView
*
view
.
View
// NackCountView is a cumulative sum of NackCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
NackCountView
*
view
.
View
// ModAckCountView is a cumulative sum of ModAckCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckCountView
*
view
.
View
// ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckTimeoutCountView
*
view
.
View
// StreamOpenCountView is a cumulative sum of StreamOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamOpenCountView
*
view
.
View
// StreamRetryCountView is a cumulative sum of StreamRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRetryCountView
*
view
.
View
// StreamRequestCountView is a cumulative sum of StreamRequestCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRequestCountView
*
view
.
View
// StreamResponseCountView is a cumulative sum of StreamResponseCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCountView
*
view
.
View
// OutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessagesView
*
view
.
View
// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView
*
view
.
View
)
DefaultPublishViews, DefaultSubscribeViews
These arrays hold the default OpenCensus views that keep track of publish/subscribe operations. It is EXPERIMENTAL and subject to change or removal without notice.
DefaultPublishSettings
var
DefaultPublishSettings
=
PublishSettings
{
DelayThreshold
:
10
*
time
.
Millisecond
,
CountThreshold
:
100
,
ByteThreshold
:
1e6
,
Timeout
:
60
*
time
.
Second
,
BufferedByteLimit
:
10
*
MaxPublishRequestBytes
,
FlowControlSettings
:
FlowControlSettings
{
MaxOutstandingMessages
:
1000
,
MaxOutstandingBytes
:
-
1
,
LimitExceededBehavior
:
FlowControlIgnore
,
},
}
DefaultPublishSettings holds the default values for topics' PublishSettings.
DefaultReceiveSettings
var
DefaultReceiveSettings
=
ReceiveSettings
{
MaxExtension
:
60
*
time
.
Minute
,
MaxExtensionPeriod
:
0
,
MaxOutstandingMessages
:
1000
,
MaxOutstandingBytes
:
1e9
,
NumGoroutines
:
10
,
}
DefaultReceiveSettings holds the default values for ReceiveSettings.
ErrOversizedMessage
var
ErrOversizedMessage
=
bundler
.
ErrOversizedItem
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
AuthenticationMethod
type
AuthenticationMethod
interface
{
// contains filtered or unexported methods
}
AuthenticationMethod is used by push points to verify the source of push requests. This interface defines fields that are part of a closed alpha that may not be accessible to all users.
Client
type
Client
struct
{
// contains filtered or unexported fields
}
Client is a Google Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
func NewClient
func
NewClient
(
ctx
context
.
Context
,
projectID
string
,
opts
...
option
.
ClientOption
)
(
c
*
Client
,
err
error
)
NewClient creates a new PubSub client. It uses a default configuration.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
_
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// See the other examples to learn how to use the Client.
}
func NewClientWithConfig
func
NewClientWithConfig
(
ctx
context
.
Context
,
projectID
string
,
config
*
ClientConfig
,
opts
...
option
.
ClientOption
)
(
c
*
Client
,
err
error
)
NewClientWithConfig creates a new PubSub client.
func (*Client) Close
Close releases any resources held by the client, such as memory and goroutines.
If the client is available for the lifetime of the program, then Close need not be called at exit.
func (*Client) CreateSubscription
func
(
c
*
Client
)
CreateSubscription
(
ctx
context
.
Context
,
id
string
,
cfg
SubscriptionConfig
)
(
*
Subscription
,
error
)
CreateSubscription creates a new subscription on a topic.
id is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
cfg.Topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription. This field is required.
cfg.AckDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via Subscription.Receive need not be acknowledged within this deadline, as the deadline will be automatically extended.
cfg.PushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
Examples
package
main
import
(
"context"
"time"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new topic with the given name.
topic
,
err
:=
client
.
CreateTopic
(
ctx
,
"topicName"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new subscription to the previously created topic
// with the given name.
sub
,
err
:=
client
.
CreateSubscription
(
ctx
,
"subName"
,
pubsub
.
SubscriptionConfig
{
Topic
:
topic
,
AckDeadline
:
10
*
time
.
Second
,
ExpirationPolicy
:
25
*
time
.
Hour
,
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
sub
// TODO: use the subscription.
}
neverExpire
package
main
import
(
"context"
"time"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new topic with the given name.
topic
,
err
:=
client
.
CreateTopic
(
ctx
,
"topicName"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new subscription to the previously
// created topic and ensure it never expires.
sub
,
err
:=
client
.
CreateSubscription
(
ctx
,
"subName"
,
pubsub
.
SubscriptionConfig
{
Topic
:
topic
,
AckDeadline
:
10
*
time
.
Second
,
ExpirationPolicy
:
time
.
Duration
(
0
),
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
sub
// TODO: Use the subscription
}
func (*Client) CreateTopic
CreateTopic creates a new topic.
The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". For more information, see: https://cloud.google.com/pubsub/docs/admin#resource_names
If the topic already exists an error will be returned.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new topic with the given name.
topic
,
err
:=
client
.
CreateTopic
(
ctx
,
"topicName"
)
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
topic
// TODO: use the topic.
}
func (*Client) CreateTopicWithConfig
func
(
c
*
Client
)
CreateTopicWithConfig
(
ctx
context
.
Context
,
topicID
string
,
tc
*
TopicConfig
)
(
*
Topic
,
error
)
CreateTopicWithConfig creates a topic from TopicConfig.
The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". For more information, see: https://cloud.google.com/pubsub/docs/admin#resource_names .
If the topic already exists, an error will be returned.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// Create a new topic with the given name and config.
topicConfig
:=
& pubsub
.
TopicConfig
{
KMSKeyName
:
"projects/project-id/locations/global/keyRings/my-key-ring/cryptoKeys/my-key"
,
MessageStoragePolicy
:
pubsub
.
MessageStoragePolicy
{
AllowedPersistenceRegions
:
[]
string
{
"us-east1"
},
},
}
topic
,
err
:=
client
.
CreateTopicWithConfig
(
ctx
,
"topicName"
,
topicConfig
)
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
topic
// TODO: use the topic.
}
func (*Client) DetachSubscription
func
(
c
*
Client
)
DetachSubscription
(
ctx
context
.
Context
,
sub
string
)
(
*
DetachSubscriptionResult
,
error
)
DetachSubscription detaches a subscription from its topic. All messages
retained in the subscription are dropped. Subsequent Pull
and StreamingPull
requests will return FAILED_PRECONDITION. If the subscription is a push
subscription, pushes to the endpoint will stop.
func (*Client) Snapshot
Snapshot creates a reference to a snapshot.
func (*Client) Snapshots
func
(
c
*
Client
)
Snapshots
(
ctx
context
.
Context
)
*
SnapshotConfigIterator
Snapshots returns an iterator which returns snapshots for this project.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// List all snapshots for the project.
iter
:=
client
.
Snapshots
(
ctx
)
_
=
iter
// TODO: iterate using Next.
}
func (*Client) Subscription
func
(
c
*
Client
)
Subscription
(
id
string
)
*
Subscription
Subscription creates a reference to a subscription.
func (*Client) SubscriptionInProject
func
(
c
*
Client
)
SubscriptionInProject
(
id
,
projectID
string
)
*
Subscription
SubscriptionInProject creates a reference to a subscription in a given project.
func (*Client) Subscriptions
func
(
c
*
Client
)
Subscriptions
(
ctx
context
.
Context
)
*
SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// List all subscriptions of the project.
it
:=
client
.
Subscriptions
(
ctx
)
_
=
it
// TODO: iterate using Next.
}
func (*Client) Topic
Topic creates a reference to a topic in the client's project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
func (*Client) TopicInProject
TopicInProject creates a reference to a topic in the given project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
topic
:=
client
.
TopicInProject
(
"topicName"
,
"another-project-id"
)
_
=
topic
// TODO: use the topic.
}
func (*Client) Topics
func
(
c
*
Client
)
Topics
(
ctx
context
.
Context
)
*
TopicIterator
Topics returns an iterator which returns all of the topics for the client's project.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
it
:=
client
.
Topics
(
ctx
)
_
=
it
// TODO: iterate using Next.
}
ClientConfig
type
ClientConfig
struct
{
PublisherCallOptions
*
vkit
.
PublisherCallOptions
SubscriberCallOptions
*
vkit
.
SubscriberCallOptions
}
ClientConfig has configurations for the client.
DeadLetterPolicy
DeadLetterPolicy specifies the conditions for dead lettering messages in a subscription.
DetachSubscriptionResult
type
DetachSubscriptionResult
struct
{}
DetachSubscriptionResult is the response for the DetachSubscription method. Reserved for future use.
FlowControlSettings
type
FlowControlSettings
struct
{
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingMessages
int
// MaxOutstandingBytes is the maximum size of buffered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingBytes
int
// LimitExceededBehavior configures the behavior when trying to publish
// additional messages while the flow controller is full. The available options
// are Ignore (disable, default), Block, and SignalError (publish
// results will return an error).
LimitExceededBehavior
LimitExceededBehavior
}
FlowControlSettings controls flow control for messages while publishing or subscribing.
LimitExceededBehavior
type
LimitExceededBehavior
int
LimitExceededBehavior configures the behavior that flowController can use in case the flow control limits are exceeded.
FlowControlIgnore, FlowControlBlock, FlowControlSignalError
const
(
// FlowControlIgnore disables flow control.
FlowControlIgnore
LimitExceededBehavior
=
iota
// FlowControlBlock signals to wait until the request can be made without exceeding the limit.
FlowControlBlock
// FlowControlSignalError signals an error to the caller of acquire.
FlowControlSignalError
)
Message
Message represents a Pub/Sub message.
Message can be passed to Topic.Publish for publishing.
If received in the callback passed to Subscription.Receive, client code must call Message.Ack or Message.Nack when finished processing the Message. Calls to Ack or Nack have no effect after the first call.
Ack indicates successful processing of a Message. If message acknowledgement fails, the Message will be redelivered. Nack indicates that the client will not or cannot process a Message. Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
MessageStoragePolicy
type
MessageStoragePolicy
struct
{
// AllowedPersistenceRegions is the list of GCP regions where messages that are published
// to the topic may be persisted in storage. Messages published by publishers running in
// non-allowed GCP regions (or running outside of GCP altogether) will be
// routed for storage in one of the allowed regions.
//
// If empty, it indicates a misconfiguration at the project or organization level, which
// will result in all Publish operations failing. This field cannot be empty in updates.
//
// If nil, then the policy is not defined on a topic level. When used in updates, it resets
// the regions back to the organization level Resource Location Restriction policy.
//
// For more information, see
// https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
AllowedPersistenceRegions
[]
string
}
MessageStoragePolicy constrains how messages published to the topic may be stored. It is determined when the topic is created based on the policy configured at the project level.
OIDCToken
type
OIDCToken
struct
{
// Audience to be used when generating OIDC token. The audience claim
// identifies the recipients that the JWT is intended for. The audience
// value is a single case-sensitive string. Having multiple values (array)
// for the audience field is not supported. More info about the OIDC JWT
// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
// Note: if not specified, the Push endpoint URL will be used.
Audience
string
// The service account email to be used for generating the OpenID Connect token.
// The caller of:
// * CreateSubscription
// * UpdateSubscription
// * ModifyPushConfig
// calls must have the iam.serviceAccounts.actAs permission for the service account.
// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
ServiceAccountEmail
string
}
OIDCToken allows PushConfigs to be authenticated using the OpenID Connect protocol https://openid.net/connect/
PublishResult
type
PublishResult
=
ipubsub
.
PublishResult
A PublishResult holds the result from a call to Publish.
Call Get to obtain the result of the Publish call. Example: // Get blocks until Publish completes or ctx is done. id, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
PublishSettings
type
PublishSettings
struct
{
// Publish a non-empty batch after this delay has passed.
DelayThreshold
time
.
Duration
// Publish a batch when it has this many messages. The maximum is
// MaxPublishRequestCount.
CountThreshold
int
// Publish a batch when its size in bytes reaches this value.
ByteThreshold
int
// The number of goroutines used in each of the data structures that are
// involved along the the Publish path. Adjusting this value adjusts
// concurrency along the publish path.
//
// Defaults to a multiple of GOMAXPROCS.
NumGoroutines
int
// The maximum time that the client will attempt to publish a bundle of messages.
Timeout
time
.
Duration
// The maximum number of bytes that the Bundler will keep in memory before
// returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes.
// If MaxOutstandingBytes is set, that value will override BufferedByteLimit.
//
// Defaults to DefaultPublishSettings.BufferedByteLimit.
// Deprecated: Set `Topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead.
BufferedByteLimit
int
// FlowControlSettings defines publisher flow control settings.
FlowControlSettings
FlowControlSettings
}
PublishSettings control the bundling of published messages.
PushConfig
type
PushConfig
struct
{
// A URL locating the endpoint to which messages should be pushed.
Endpoint
string
// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
Attributes
map
[
string
]
string
// AuthenticationMethod is used by push endpoints to verify the source
// of push requests.
// It can be used with push endpoints that are private by default to
// allow requests only from the Cloud Pub/Sub system, for example.
// This field is optional and should be set only by users interested in
// authenticated push.
AuthenticationMethod
AuthenticationMethod
}
PushConfig contains configuration for subscriptions that operate in push mode.
ReceiveSettings
type
ReceiveSettings
struct
{
// MaxExtension is the maximum period for which the Subscription should
// automatically extend the ack deadline for each message.
//
// The Subscription will automatically extend the ack deadline of all
// fetched Messages up to the duration specified. Automatic deadline
// extension beyond the initial receipt may be disabled by specifying a
// duration less than 0.
MaxExtension
time
.
Duration
// MaxExtensionPeriod is the maximum duration by which to extend the ack
// deadline at a time. The ack deadline will continue to be extended by up
// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
// bounds the maximum amount of time before a message redelivery in the
// event the subscriber fails to extend the deadline.
//
// MaxExtensionPeriod configuration can be disabled by specifying a
// duration less than (or equal to) 0.
MaxExtensionPeriod
time
.
Duration
// MaxOutstandingMessages is the maximum number of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
// If the value is negative, then there will be no limit on the number of
// unprocessed messages.
MaxOutstandingMessages
int
// MaxOutstandingBytes is the maximum size of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
// the value is negative, then there will be no limit on the number of bytes
// for unprocessed messages.
MaxOutstandingBytes
int
// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
// PubSub server and the less accurate method of only enforcing flow control
// at the client side is used.
// The default is false.
UseLegacyFlowControl
bool
// NumGoroutines is the number of goroutines that each datastructure along
// the Receive path will spawn. Adjusting this value adjusts concurrency
// along the receive path.
//
// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
//
// NumGoroutines does not limit the number of messages that can be processed
// concurrently. Even with one goroutine, many messages might be processed at
// once, because that goroutine may continually receive messages and invoke the
// function passed to Receive on them. To limit the number of messages being
// processed concurrently, set MaxOutstandingMessages.
NumGoroutines
int
// Synchronous switches the underlying receiving mechanism to unary Pull.
// When Synchronous is false, the more performant StreamingPull is used.
// StreamingPull also has the benefit of subscriber affinity when using
// ordered delivery.
// When Synchronous is true, NumGoroutines is set to 1 and only one Pull
// RPC will be made to poll messages at a time.
// The default is false.
//
// Deprecated.
// Previously, users might use Synchronous mode since StreamingPull had a limitation
// where MaxOutstandingMessages was not always respected with large batches of
// small messsages. With server side flow control, this is no longer an issue
// and we recommend switching to the default StreamingPull mode by setting
// Synchronous to false.
Synchronous
bool
}
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
RetryPolicy
type
RetryPolicy
struct
{
// MinimumBackoff is the minimum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
MinimumBackoff
optional
.
Duration
// MaximumBackoff is the maximum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
MaximumBackoff
optional
.
Duration
}
RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
Retry delay will be exponential based on provided minimum and maximum backoffs. https://en.wikipedia.org/wiki/Exponential_backoff .
RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
Retry Policy is implemented on a best effort basis. At times, the delay between consecutive deliveries may not match the configuration. That is, delay can be more or less than configured backoff.
SchemaClient
type
SchemaClient
struct
{
// contains filtered or unexported fields
}
SchemaClient is a Pub/Sub schema client scoped to a single project.
func NewSchemaClient
func
NewSchemaClient
(
ctx
context
.
Context
,
projectID
string
,
opts
...
option
.
ClientOption
)
(
*
SchemaClient
,
error
)
NewSchemaClient creates a new Pub/Sub Schema client.
func (*SchemaClient) Close
func
(
s
*
SchemaClient
)
Close
()
error
Close closes the schema client and frees up resources.
func (*SchemaClient) CreateSchema
func
(
c
*
SchemaClient
)
CreateSchema
(
ctx
context
.
Context
,
schemaID
string
,
s
SchemaConfig
)
(
*
SchemaConfig
,
error
)
CreateSchema creates a new schema with the given schemaID and config. Schemas cannot be updated after creation.
func (*SchemaClient) DeleteSchema
func
(
s
*
SchemaClient
)
DeleteSchema
(
ctx
context
.
Context
,
schemaID
string
)
error
DeleteSchema deletes an existing schema given a schema ID.
func (*SchemaClient) Schema
func
(
c
*
SchemaClient
)
Schema
(
ctx
context
.
Context
,
schemaID
string
,
view
SchemaView
)
(
*
SchemaConfig
,
error
)
Schema retrieves the configuration of a schema given a schemaID and a view.
func (*SchemaClient) Schemas
func
(
c
*
SchemaClient
)
Schemas
(
ctx
context
.
Context
,
view
SchemaView
)
*
SchemaIterator
Schemas returns an iterator which returns all of the schemas for the client's project.
func (*SchemaClient) ValidateMessageWithConfig
func
(
s
*
SchemaClient
)
ValidateMessageWithConfig
(
ctx
context
.
Context
,
msg
[]
byte
,
encoding
SchemaEncoding
,
config
SchemaConfig
)
(
*
ValidateMessageResult
,
error
)
ValidateMessageWithConfig validates a message against an schema specified by a schema config.
func (*SchemaClient) ValidateMessageWithID
func
(
s
*
SchemaClient
)
ValidateMessageWithID
(
ctx
context
.
Context
,
msg
[]
byte
,
encoding
SchemaEncoding
,
schemaID
string
)
(
*
ValidateMessageResult
,
error
)
ValidateMessageWithID validates a message against an schema specified by the schema ID of an existing schema.
func (*SchemaClient) ValidateSchema
func
(
s
*
SchemaClient
)
ValidateSchema
(
ctx
context
.
Context
,
schema
SchemaConfig
)
(
*
ValidateSchemaResult
,
error
)
ValidateSchema validates a schema config and returns an error if invalid.
SchemaConfig
type
SchemaConfig
struct
{
// The name of the schema populated by the server. This field is read-only.
Name
string
// The type of the schema definition.
Type
SchemaType
// The definition of the schema. This should contain a string representing
// the full definition of the schema that is a valid schema definition of
// the type specified in `type`.
Definition
string
}
SchemaConfig is a reference to a PubSub schema.
SchemaEncoding
SchemaEncoding is the encoding expected for messages.
EncodingUnspecified, EncodingJSON, EncodingBinary
const
(
// EncodingUnspecified is the default unused value.
EncodingUnspecified
SchemaEncoding
=
0
// EncodingJSON is the JSON encoding type for a message.
EncodingJSON
SchemaEncoding
=
1
// EncodingBinary is the binary encoding type for a message.
// For some schema types, binary encoding may not be available.
EncodingBinary
SchemaEncoding
=
2
)
SchemaIterator
type
SchemaIterator
struct
{
// contains filtered or unexported fields
}
SchemaIterator is a struct used to iterate over schemas.
func (*SchemaIterator) Next
func
(
s
*
SchemaIterator
)
Next
()
(
*
SchemaConfig
,
error
)
Next returns the next schema. If there are no more schemas, iterator.Done will be returned.
SchemaSettings
type
SchemaSettings
struct
{
Schema
string
Encoding
SchemaEncoding
}
SchemaSettings are settings for validating messages published against a schema.
SchemaType
type
SchemaType
pb
.
Schema_Type
SchemaType is the possible shcema definition types.
SchemaTypeUnspecified, SchemaProtocolBuffer, SchemaAvro
const
(
// SchemaTypeUnspecified is the unused default value.
SchemaTypeUnspecified
SchemaType
=
0
// SchemaProtocolBuffer is a protobuf schema definition.
SchemaProtocolBuffer
SchemaType
=
1
// SchemaAvro is an Avro schema definition.
SchemaAvro
SchemaType
=
2
)
SchemaView
type
SchemaView
pb
.
SchemaView
SchemaView is a view of Schema object fields to be returned by GetSchema and ListSchemas.
SchemaViewUnspecified, SchemaViewBasic, SchemaViewFull
const
(
// SchemaViewUnspecified is the default/unset value.
SchemaViewUnspecified
SchemaView
=
0
// SchemaViewBasic includes the name and type of the schema, but not the definition.
SchemaViewBasic
SchemaView
=
1
// SchemaViewFull includes all Schema object fields.
SchemaViewFull
SchemaView
=
2
)
Snapshot
type
Snapshot
struct
{
// contains filtered or unexported fields
}
Snapshot is a reference to a PubSub snapshot.
func (*Snapshot) Delete
Delete deletes a snapshot.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
snap
:=
client
.
Snapshot
(
"snapshotName"
)
if
err
:=
snap
.
Delete
(
ctx
);
err
!=
nil
{
// TODO: Handle error.
}
}
func (*Snapshot) ID
ID returns the unique identifier of the snapshot within its project.
SnapshotConfig
SnapshotConfig contains the details of a Snapshot.
SnapshotConfigIterator
type
SnapshotConfigIterator
struct
{
// contains filtered or unexported fields
}
SnapshotConfigIterator is an iterator that returns a series of snapshots.
func (*SnapshotConfigIterator) Next
func
(
snaps
*
SnapshotConfigIterator
)
Next
()
(
*
SnapshotConfig
,
error
)
Next returns the next SnapshotConfig. Its second return value is iterator.Done if there are no more results. Once Next returns iterator.Done, all subsequent calls will return iterator.Done.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// List all snapshots for the project.
iter
:=
client
.
Snapshots
(
ctx
)
for
{
snapConfig
,
err
:=
iter
.
Next
()
if
err
==
iterator
.
Done
{
break
}
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
snapConfig
// TODO: use the SnapshotConfig.
}
}
Subscription
type
Subscription
struct
{
// Settings for pulling messages. Configure these before calling Receive.
ReceiveSettings
ReceiveSettings
// contains filtered or unexported fields
}
Subscription is a reference to a PubSub subscription.
func (*Subscription) Config
func
(
s
*
Subscription
)
Config
(
ctx
context
.
Context
)
(
SubscriptionConfig
,
error
)
Config fetches the current configuration for the subscription.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
config
,
err
:=
sub
.
Config
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
}
fmt
.
Println
(
config
)
}
func (*Subscription) CreateSnapshot
func
(
s
*
Subscription
)
CreateSnapshot
(
ctx
context
.
Context
,
name
string
)
(
*
SnapshotConfig
,
error
)
CreateSnapshot creates a new snapshot from this subscription. The snapshot will be for the topic this subscription is subscribed to. If the name is empty string, a unique name is assigned.
The created snapshot is guaranteed to retain: (a) The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged when Snapshot returns without error. (b) Any messages published to the subscription's topic following Snapshot returning without error.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
snapConfig
,
err
:=
sub
.
CreateSnapshot
(
ctx
,
"snapshotName"
)
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
snapConfig
// TODO: Use SnapshotConfig.
}
func (*Subscription) Delete
func
(
s
*
Subscription
)
Delete
(
ctx
context
.
Context
)
error
Delete deletes the subscription.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
if
err
:=
sub
.
Delete
(
ctx
);
err
!=
nil
{
// TODO: Handle error.
}
}
func (*Subscription) Exists
func
(
s
*
Subscription
)
Exists
(
ctx
context
.
Context
)
(
bool
,
error
)
Exists reports whether the subscription exists on the server.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
ok
,
err
:=
sub
.
Exists
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
}
if
!
ok
{
// Subscription doesn't exist.
}
}
func (*Subscription) IAM
func
(
s
*
Subscription
)
IAM
()
*
iam
.
Handle
IAM returns the subscription's IAM handle.
func (*Subscription) ID
func
(
s
*
Subscription
)
ID
()
string
ID returns the unique identifier of the subscription within its project.
func (*Subscription) Receive
func
(
s
*
Subscription
)
Receive
(
ctx
context
.
Context
,
f
func
(
context
.
Context
,
*
Message
))
error
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will send an ack deadline extension on message receipt, then automatically extend the ack deadline of all fetched Messages up to the period specified by s.ReceiveSettings.MaxExtension.
Each Subscription may have only one invocation of Receive active at a time.
Examples
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
err
=
sub
.
Receive
(
ctx
,
func
(
ctx
context
.
Context
,
m
*
pubsub
.
Message
)
{
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m
.
Ack
()
})
if
err
!=
context
.
Canceled
{
// TODO: Handle error.
}
}
maxExtension
package
main
import
(
"context"
"time"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
// This program is expected to process and acknowledge messages in 30 seconds. If
// not, the Pub/Sub API will assume the message is not acknowledged.
sub
.
ReceiveSettings
.
MaxExtension
=
30
*
time
.
Second
err
=
sub
.
Receive
(
ctx
,
func
(
ctx
context
.
Context
,
m
*
pubsub
.
Message
)
{
// TODO: Handle message.
m
.
Ack
()
})
if
err
!=
context
.
Canceled
{
// TODO: Handle error.
}
}
maxOutstanding
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
sub
.
ReceiveSettings
.
MaxOutstandingMessages
=
5
sub
.
ReceiveSettings
.
MaxOutstandingBytes
=
10e6
err
=
sub
.
Receive
(
ctx
,
func
(
ctx
context
.
Context
,
m
*
pubsub
.
Message
)
{
// TODO: Handle message.
m
.
Ack
()
})
if
err
!=
context
.
Canceled
{
// TODO: Handle error.
}
}
func (*Subscription) SeekToSnapshot
func
(
s
*
Subscription
)
SeekToSnapshot
(
ctx
context
.
Context
,
snap
*
Snapshot
)
error
SeekToSnapshot seeks the subscription to a snapshot.
The snapshot need not be created from this subscription, but it must be for the topic this subscription is subscribed to.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
snap
:=
client
.
Snapshot
(
"snapshotName"
)
if
err
:=
sub
.
SeekToSnapshot
(
ctx
,
snap
);
err
!=
nil
{
// TODO: Handle error.
}
}
func (*Subscription) SeekToTime
SeekToTime seeks the subscription to a point in time.
Messages retained in the subscription that were published before this
time are marked as acknowledged, and messages retained in the
subscription that were published after this time are marked as
unacknowledged. Note that this operation affects only those messages
retained in the subscription (configured by SnapshotConfig). For example,
if time
corresponds to a point before the message retention
window (or to a point before the system's notion of the subscription
creation time), only retained messages will be marked as unacknowledged,
and already-expunged messages will not be restored.
Example
package
main
import
(
"context"
"time"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
if
err
:=
sub
.
SeekToTime
(
ctx
,
time
.
Now
().
Add
(
-
time
.
Hour
));
err
!=
nil
{
// TODO: Handle error.
}
}
func (*Subscription) String
func
(
s
*
Subscription
)
String
()
string
String returns the globally unique printable name of the subscription.
func (*Subscription) Update
func
(
s
*
Subscription
)
Update
(
ctx
context
.
Context
,
cfg
SubscriptionConfigToUpdate
)
(
SubscriptionConfig
,
error
)
Update changes an existing subscription according to the fields set in cfg. It returns the new SubscriptionConfig.
Update returns an error if no fields were modified.
Examples
package
main
import
(
"context"
"time"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
subConfig
,
err
:=
sub
.
Update
(
ctx
,
pubsub
.
SubscriptionConfigToUpdate
{
PushConfig
:
& pubsub
.
PushConfig
{
Endpoint
:
"https://example.com/push"
},
// Make the subscription never expire.
ExpirationPolicy
:
time
.
Duration
(
0
),
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
subConfig
// TODO: Use SubscriptionConfig.
}
pushConfigAuthenticationMethod
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
sub
:=
client
.
Subscription
(
"subName"
)
subConfig
,
err
:=
sub
.
Update
(
ctx
,
pubsub
.
SubscriptionConfigToUpdate
{
PushConfig
:
& pubsub
.
PushConfig
{
Endpoint
:
"https://example.com/push"
,
AuthenticationMethod
:
& pubsub
.
OIDCToken
{
ServiceAccountEmail
:
"service-account-email"
,
Audience
:
"client-12345"
,
},
},
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
subConfig
// TODO: Use SubscriptionConfig.
}
SubscriptionConfig
type
SubscriptionConfig
struct
{
Topic
*
Topic
PushConfig
PushConfig
// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
// deadline, as the deadline will be automatically extended.
AckDeadline
time
.
Duration
// Whether to retain acknowledged messages. If true, acknowledged messages
// will not be expunged until they fall out of the RetentionDuration window.
RetainAckedMessages
bool
// How long to retain messages in backlog, from the time of publish. If
// RetainAckedMessages is true, this duration affects the retention of
// acknowledged messages, otherwise only unacknowledged messages are retained.
// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
RetentionDuration
time
.
Duration
// Expiration policy specifies the conditions for a subscription's expiration.
// A subscription is considered active as long as any connected subscriber is
// successfully consuming messages from the subscription or is issuing
// operations on the subscription. If `expiration_policy` is not set, a
// *default policy* with `ttl` of 31 days will be used. The minimum allowed
// value for `expiration_policy.ttl` is 1 day.
//
// Use time.Duration(0) to indicate that the subscription should never expire.
ExpirationPolicy
optional
.
Duration
// The set of labels for the subscription.
Labels
map
[
string
]
string
// EnableMessageOrdering enables message ordering on this subscription.
// This value is only used for subscription creation and update, and
// is not read locally in calls like Subscription.Receive().
//
// If set to false, even if messages are published with ordering keys,
// messages will not be delivered in order.
//
// When calling Subscription.Receive(), the client will check this
// value with a call to Subscription.Config(), which requires the
// roles/viewer or roles/pubsub.viewer role on your service account.
// If that call fails, mesages with ordering keys will be delivered in order.
EnableMessageOrdering
bool
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription. If not set, dead lettering is disabled.
DeadLetterPolicy
*
DeadLetterPolicy
// Filter is an expression written in the Cloud Pub/Sub filter language. If
// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
// filter are delivered on this subscription. If empty, then no messages are
// filtered out. Cannot be changed after the subscription is created.
Filter
string
// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
RetryPolicy
*
RetryPolicy
// Detached indicates whether the subscription is detached from its topic.
// Detached subscriptions don't receive messages from their topic and don't
// retain any backlog. `Pull` and `StreamingPull` requests will return
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached
bool
// TopicMessageRetentionDuration indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration
time
.
Duration
// contains filtered or unexported fields
}
SubscriptionConfig describes the configuration of a subscription.
func (*SubscriptionConfig) ID
func
(
s
*
SubscriptionConfig
)
ID
()
string
ID returns the unique identifier of the subscription within its project.
This method only works when the subscription config is returned from the server,
such as when calling client.Subscription
or client.Subscriptions
.
Otherwise, this will return an empty string.
func (*SubscriptionConfig) String
func
(
s
*
SubscriptionConfig
)
String
()
string
String returns the globally unique printable name of the subscription config.
This method only works when the subscription config is returned from the server,
such as when calling client.Subscription
or client.Subscriptions
.
Otherwise, this will return an empty string.
SubscriptionConfigToUpdate
type
SubscriptionConfigToUpdate
struct
{
// If non-nil, the push config is changed.
PushConfig
*
PushConfig
// If non-zero, the ack deadline is changed.
AckDeadline
time
.
Duration
// If set, RetainAckedMessages is changed.
RetainAckedMessages
optional
.
Bool
// If non-zero, RetentionDuration is changed.
RetentionDuration
time
.
Duration
// If non-zero, Expiration is changed.
ExpirationPolicy
optional
.
Duration
// If non-nil, DeadLetterPolicy is changed. To remove dead lettering from
// a subscription, use the zero value for this struct.
DeadLetterPolicy
*
DeadLetterPolicy
// If non-nil, the current set of labels is completely
// replaced by the new set.
// This field has beta status. It is not subject to the stability guarantee
// and may change.
Labels
map
[
string
]
string
// If non-nil, RetryPolicy is changed. To remove an existing retry policy
// (to redeliver messages as soon as possible) use a pointer to the zero value
// for this struct.
RetryPolicy
*
RetryPolicy
}
SubscriptionConfigToUpdate describes how to update a subscription.
SubscriptionIterator
type
SubscriptionIterator
struct
{
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a series of subscriptions.
func (*SubscriptionIterator) Next
func
(
subs
*
SubscriptionIterator
)
Next
()
(
*
Subscription
,
error
)
Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// List all subscriptions of the project.
it
:=
client
.
Subscriptions
(
ctx
)
for
{
sub
,
err
:=
it
.
Next
()
if
err
==
iterator
.
Done
{
break
}
if
err
!=
nil
{
// TODO: Handle error.
}
fmt
.
Println
(
sub
)
}
}
func (*SubscriptionIterator) NextConfig
func
(
subs
*
SubscriptionIterator
)
NextConfig
()
(
*
SubscriptionConfig
,
error
)
NextConfig returns the next subscription config. If there are no more subscriptions,
iterator.Done will be returned.
This call shares the underlying iterator with calls to SubscriptionIterator.Next
.
If you wish to use mix calls, create separate iterator instances for both.
Topic
type
Topic
struct
{
// Settings for publishing messages. All changes must be made before the
// first call to Publish. The default is DefaultPublishSettings.
PublishSettings
PublishSettings
// EnableMessageOrdering enables delivery of ordered keys.
EnableMessageOrdering
bool
// contains filtered or unexported fields
}
Topic is a reference to a PubSub topic.
The methods of Topic are safe for use by multiple goroutines.
func (*Topic) Config
func
(
t
*
Topic
)
Config
(
ctx
context
.
Context
)
(
TopicConfig
,
error
)
Config returns the TopicConfig for the topic.
func (*Topic) Delete
Delete deletes the topic.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
topic
:=
client
.
Topic
(
"topicName"
)
if
err
:=
topic
.
Delete
(
ctx
);
err
!=
nil
{
// TODO: Handle error.
}
}
func (*Topic) Exists
Exists reports whether the topic exists on the server.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
topic
:=
client
.
Topic
(
"topicName"
)
ok
,
err
:=
topic
.
Exists
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
}
if
!
ok
{
// Topic doesn't exist.
}
}
func (*Topic) Flush
func
(
t
*
Topic
)
Flush
()
Flush blocks until all remaining messages are sent.
func (*Topic) IAM
IAM returns the topic's IAM handle.
func (*Topic) ID
ID returns the unique identifier of the topic within its project.
func (*Topic) Publish
func
(
t
*
Topic
)
Publish
(
ctx
context
.
Context
,
msg
*
Message
)
*
PublishResult
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
topic
:=
client
.
Topic
(
"topicName"
)
defer
topic
.
Stop
()
var
results
[]
*
pubsub
.
PublishResult
r
:=
topic
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
[]
byte
(
"hello world"
),
})
results
=
append
(
results
,
r
)
// Do other work ...
for
_
,
r
:=
range
results
{
id
,
err
:=
r
.
Get
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
}
fmt
.
Printf
(
"Published a message with a message ID: %s\n"
,
id
)
}
}
func (*Topic) ResumePublish
ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
func (*Topic) Stop
func
(
t
*
Topic
)
Stop
()
Stop sends all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.
func (*Topic) String
String returns the printable globally unique name for the topic.
func (*Topic) Subscriptions
func
(
t
*
Topic
)
Subscriptions
(
ctx
context
.
Context
)
*
SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
Some of the returned subscriptions may belong to a project other than t.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
topic
:=
client
.
Topic
(
"topic-name"
)
// List all subscriptions of the topic (maybe of multiple projects).
for
subs
:=
topic
.
Subscriptions
(
ctx
);
;
{
sub
,
err
:=
subs
.
Next
()
if
err
==
iterator
.
Done
{
break
}
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
sub
// TODO: use the subscription.
}
}
func (*Topic) Update
func
(
t
*
Topic
)
Update
(
ctx
context
.
Context
,
cfg
TopicConfigToUpdate
)
(
TopicConfig
,
error
)
Update changes an existing topic according to the fields set in cfg. It returns the new TopicConfig.
Examples
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.V
}
topic
:=
client
.
Topic
(
"topic-name"
)
topicConfig
,
err
:=
topic
.
Update
(
ctx
,
pubsub
.
TopicConfigToUpdate
{
MessageStoragePolicy
:
& pubsub
.
MessageStoragePolicy
{
AllowedPersistenceRegions
:
[]
string
{
"asia-east1"
,
"asia-northeast1"
,
"asia-southeast1"
,
"australia-southeast1"
,
"europe-north1"
,
"europe-west1"
,
"europe-west2"
,
"europe-west3"
,
"europe-west4"
,
"us-central1"
,
"us-central2"
,
"us-east1"
,
"us-east4"
,
"us-west1"
,
"us-west2"
},
},
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
topicConfig
// TODO: Use TopicConfig
}
resetMessageStoragePolicy
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.V
}
topic
:=
client
.
Topic
(
"topic-name"
)
topicConfig
,
err
:=
topic
.
Update
(
ctx
,
pubsub
.
TopicConfigToUpdate
{
// Just use a non-nil MessageStoragePolicy without any fields.
MessageStoragePolicy
:
& pubsub
.
MessageStoragePolicy
{},
})
if
err
!=
nil
{
// TODO: Handle error.
}
_
=
topicConfig
// TODO: Use TopicConfig
}
TopicConfig
type
TopicConfig
struct
{
// The set of labels for the topic.
Labels
map
[
string
]
string
// The topic's message storage policy.
MessageStoragePolicy
MessageStoragePolicy
// The name of the Cloud KMS key to be used to protect access to messages
// published to this topic, in the format
// "projects/P/locations/L/keyRings/R/cryptoKeys/K".
KMSKeyName
string
// Schema defines the schema settings upon topic creation. This cannot
// be modified after a topic has been created.
SchemaSettings
*
SchemaSettings
// RetentionDuration configures the minimum duration to retain a message
// after it is published to the topic. If this field is set, messages published
// to the topic in the last `RetentionDuration` are always available to subscribers.
// For instance, it allows any attached subscription to [seek to a
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `RetentionDuration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 7 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration
optional
.
Duration
// contains filtered or unexported fields
}
TopicConfig describes the configuration of a topic.
func (*TopicConfig) ID
func
(
t
*
TopicConfig
)
ID
()
string
ID returns the unique identifier of the topic within its project.
This method only works when the topic config is returned from the server,
such as when calling client.Topic
or client.Topics
.
Otherwise, this will return an empty string.
func (*TopicConfig) String
func
(
t
*
TopicConfig
)
String
()
string
String returns the printable globally unique name for the topic config.
This method only works when the topic config is returned from the server,
such as when calling client.Topic
or client.Topics
.
Otherwise, this will return an empty string.
TopicConfigToUpdate
type
TopicConfigToUpdate
struct
{
// If non-nil, the current set of labels is completely
// replaced by the new set.
Labels
map
[
string
]
string
// If non-nil, the existing policy (containing the list of regions)
// is completely replaced by the new policy.
//
// Use the zero value &MessageStoragePolicy{} to reset the topic back to
// using the organization's Resource Location Restriction policy.
//
// If nil, the policy remains unchanged.
//
// This field has beta status. It is not subject to the stability guarantee
// and may change.
MessageStoragePolicy
*
MessageStoragePolicy
// If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration
optional
.
Duration
}
TopicConfigToUpdate describes how to update a topic.
TopicIterator
type
TopicIterator
struct
{
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a series of topics.
func (*TopicIterator) Next
func
(
tps
*
TopicIterator
)
Next
()
(
*
Topic
,
error
)
Next returns the next topic. If there are no more topics, iterator.Done will be returned.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
func
main
()
{
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project-id"
)
if
err
!=
nil
{
// TODO: Handle error.
}
// List all topics.
it
:=
client
.
Topics
(
ctx
)
for
{
t
,
err
:=
it
.
Next
()
if
err
==
iterator
.
Done
{
break
}
if
err
!=
nil
{
// TODO: Handle error.
}
fmt
.
Println
(
t
)
}
}
func (*TopicIterator) NextConfig
func
(
t
*
TopicIterator
)
NextConfig
()
(
*
TopicConfig
,
error
)
NextConfig returns the next topic config. If there are no more topics,
iterator.Done will be returned.
This call shares the underlying iterator with calls to TopicIterator.Next
.
If you wish to use mix calls, create separate iterator instances for both.
ValidateMessageResult
type
ValidateMessageResult
struct
{}
ValidateMessageResult is the response for the ValidateMessage method. Reserved for future use.
ValidateSchemaResult
type
ValidateSchemaResult
struct
{}
ValidateSchemaResult is the response for the ValidateSchema method. Reserved for future use.