Package ps contains clients for publishing and subscribing using the Google Cloud Pub/Sub Lite service.
If interfaces are defined, PublisherClient and SubscriberClient can be used as substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), respectively, from the pubsub package.
As noted in comments, the two services have some differences:
- Pub/Sub Lite does not support NACK for messages. By default, this will terminate the SubscriberClient. A custom function can be provided for ReceiveSettings.NackHandler to handle NACKed messages.
- Pub/Sub Lite has no concept of ack deadlines. Subscribers must ACK or NACK every message received and can take as much time as they need to process the message.
- Pub/Sub Lite PublisherClients and SubscriberClients can terminate when an unretryable error occurs.
- Publishers and subscribers will be throttled if Pub/Sub Lite publish or subscribe throughput limits are exceeded. Thus publishing can be more sensitive to buffer overflow than Cloud Pub/Sub.
- Pub/Sub Lite utilizes bidirectional gRPC streams extensively to maximize publish and subscribe throughput.
More information about Google Cloud Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite .
Information about choosing between Google Cloud Pub/Sub vs Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite .
See https://godoc.org/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Constants
MaxPublishRequestCount, MaxPublishRequestBytes
const
(
// MaxPublishRequestCount is the maximum number of messages that can be
// batched in a single publish request.
MaxPublishRequestCount
=
wire
.
MaxPublishRequestCount
// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
MaxPublishRequestBytes
=
wire
.
MaxPublishRequestBytes
)
Variables
ErrOverflow, ErrOversizedMessage, ErrPublisherStopped
var
(
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow
=
bundler
.
ErrOverflow
// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage
=
bundler
.
ErrOversizedItem
// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
ErrPublisherStopped
=
wire
.
ErrServiceStopped
)
DefaultPublishSettings
var
DefaultPublishSettings
=
PublishSettings
{
DelayThreshold
:
10
*
time
.
Millisecond
,
CountThreshold
:
100
,
ByteThreshold
:
1e6
,
Timeout
:
60
*
time
.
Second
,
BufferedByteLimit
:
1e8
,
}
DefaultPublishSettings holds the default values for PublishSettings.
DefaultReceiveSettings
var
DefaultReceiveSettings
=
ReceiveSettings
{
MaxOutstandingMessages
:
1000
,
MaxOutstandingBytes
:
1e9
,
Timeout
:
60
*
time
.
Second
,
}
DefaultReceiveSettings holds the default values for ReceiveSettings.
KeyExtractorFunc
KeyExtractorFunc is a function that extracts an ordering key from a Message.
MessageReceiverFunc
MessageReceiverFunc handles messages sent by the Cloud Pub/Sub Lite service.
The implementation must arrange for pubsub.Message.Ack() or pubsub.Message.Nack() to be called after processing the message.
The receiver func will be called from multiple goroutines if the subscriber is connected to multiple partitions. Only one call from any connected partition will be outstanding at a time, and blocking in this receiver callback will block the delivery of subsequent messages for the partition.
NackHandler
NackHandler is invoked when pubsub.Message.Nack() is called. Cloud Pub/Sub Lite does not have a concept of 'nack'. If the nack handler implementation returns nil, the message is acknowledged. If an error is returned, the SubscriberClient will consider this a fatal error and terminate.
In Cloud Pub/Sub Lite, only a single subscriber for a given subscription is connected to any partition at a time, and there is no other client that may be able to handle messages.
PublishMessageTransformerFunc
type
PublishMessageTransformerFunc
func
(
*
pubsub
.
Message
,
*
pb
.
PubSubMessage
)
error
PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite PubSubMessage API proto. If this returns an error, the pubsub.PublishResult will be errored and the PublisherClient will consider this a fatal error and terminate.
PublishSettings
type
PublishSettings
struct
{
// Publish a non-empty batch after this delay has passed. If DelayThreshold is
// 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise
// must be > 0.
DelayThreshold
time
.
Duration
// Publish a batch when it has this many messages. The maximum is
// MaxPublishRequestCount. If CountThreshold is 0, it will be treated as
// DefaultPublishSettings.CountThreshold. Otherwise must be > 0.
CountThreshold
int
// Publish a batch when its size in bytes reaches this value. The maximum is
// MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as
// DefaultPublishSettings.ByteThreshold. Otherwise must be > 0.
ByteThreshold
int
// The maximum time that the client will attempt to establish a publish stream
// connection to the server. If Timeout is 0, it will be treated as
// DefaultPublishSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
// is long, ErrOverflow may occur first.
Timeout
time
.
Duration
// The maximum number of bytes that the publisher will keep in memory before
// returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as
// DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0.
//
// Note that this setting applies per partition. If BufferedByteLimit is being
// used to bound memory usage, keep in mind the number of partitions in the
// topic.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
// attempting to publish messages at a much higher rate than the publishing
// throughput capacity can cause the buffers to overflow. For more
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
BufferedByteLimit
int
// Optional custom function that extracts an ordering key from a Message. The
// default implementation extracts the key from Message.OrderingKey.
KeyExtractor
KeyExtractorFunc
// Optional custom function that transforms a pubsub.Message to a
// PubSubMessage API proto.
MessageTransformer
PublishMessageTransformerFunc
}
PublishSettings configure the PublisherClient. Batching settings (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per partition.
A zero PublishSettings will result in values equivalent to DefaultPublishSettings.
PublisherClient
type
PublisherClient
struct
{
// contains filtered or unexported fields
}
PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given topic. A PublisherClient is safe to use from multiple goroutines.
See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing.
func NewPublisherClient
func
NewPublisherClient
(
ctx
context
.
Context
,
settings
PublishSettings
,
topic
pubsublite
.
TopicPath
,
opts
...
option
.
ClientOption
)
(
*
PublisherClient
,
error
)
NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish messages to a given topic.
See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing.
func (*PublisherClient) Error
func
(
p
*
PublisherClient
)
Error
()
error
Error returns the error that caused the publisher client to terminate. It may be nil if Stop() was called.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)
func
main
()
{
ctx
:=
context
.
Background
()
topic
:=
pubsublite
.
TopicPath
{
Project
:
"project-id"
,
Zone
:
"zone"
,
TopicID
:
"topic-id"
,
}
publisher
,
err
:=
ps
.
NewPublisherClient
(
ctx
,
ps
.
DefaultPublishSettings
,
topic
)
if
err
!=
nil
{
// TODO: Handle error.
}
defer
publisher
.
Stop
()
var
results
[]
*
pubsub
.
PublishResult
r
:=
publisher
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
[]
byte
(
"hello world"
),
})
results
=
append
(
results
,
r
)
// Do other work ...
for
_
,
r
:=
range
results
{
id
,
err
:=
r
.
Get
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
if
err
==
ps
.
ErrPublisherStopped
{
fmt
.
Printf
(
"Publisher client stopped due to error: %v\n"
,
publisher
.
Error
())
}
}
fmt
.
Printf
(
"Published a message with a message ID: %s\n"
,
id
)
}
}
func (*PublisherClient) Publish
func
(
p
*
PublisherClient
)
Publish
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
*
pubsub
.
PublishResult
Publish publishes msg
to the topic asynchronously. Messages are batched and
sent according to the client's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Once Stop() has been called or the publisher has failed permanently due to an error, future calls to Publish will immediately return a PublishResult with error ErrPublisherStopped. Error() returns the error that caused the publisher to terminate.
Example
package
main
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)
func
main
()
{
ctx
:=
context
.
Background
()
topic
:=
pubsublite
.
TopicPath
{
Project
:
"project-id"
,
Zone
:
"zone"
,
TopicID
:
"topic-id"
,
}
// NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent.
publisher
,
err
:=
ps
.
NewPublisherClient
(
ctx
,
ps
.
DefaultPublishSettings
,
topic
)
if
err
!=
nil
{
// TODO: Handle error.
}
defer
publisher
.
Stop
()
var
results
[]
*
pubsub
.
PublishResult
r
:=
publisher
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
[]
byte
(
"hello world"
),
})
results
=
append
(
results
,
r
)
// Do other work ...
for
_
,
r
:=
range
results
{
id
,
err
:=
r
.
Get
(
ctx
)
if
err
!=
nil
{
// TODO: Handle error.
}
fmt
.
Printf
(
"Published a message with a message ID: %s\n"
,
id
)
}
}
func (*PublisherClient) Stop
func
(
p
*
PublisherClient
)
Stop
()
Stop sends all remaining published messages and closes publish streams. Returns once all outstanding messages have been sent or have failed to be sent.
ReceiveMessageTransformerFunc
type
ReceiveMessageTransformerFunc
func
(
*
pb
.
SequencedMessage
,
*
pubsub
.
Message
)
error
ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API proto to a pubsub.Message. If this returns an error, the SubscriberClient will consider this a fatal error and terminate.
ReceiveSettings
type
ReceiveSettings
struct
{
// MaxOutstandingMessages is the maximum number of unacknowledged messages.
// If MaxOutstandingMessages is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0.
MaxOutstandingMessages
int
// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. If MaxOutstandingBytes is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0.
//
// Note that this setting applies per partition. If MaxOutstandingBytes is
// being used to bound memory usage, keep in mind the number of partitions in
// the associated topic.
MaxOutstandingBytes
int
// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. If Timeout is 0, it will be treated as
// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the SubscriberClient will terminate with the last
// error that occurred while trying to reconnect.
Timeout
time
.
Duration
// The topic partition numbers (zero-indexed) to receive messages from.
// Values must be less than the number of partitions for the topic. If not
// specified, the SubscriberClient will use the partition assignment service
// to determine which partitions it should connect to.
Partitions
[]
int
// Optional custom function to handle pubsub.Message.Nack() calls. If not set,
// the default behavior is to terminate the SubscriberClient.
NackHandler
NackHandler
// Optional custom function that transforms a SequencedMessage API proto to a
// pubsub.Message.
MessageTransformer
ReceiveMessageTransformerFunc
}
ReceiveSettings configure the SubscriberClient. Flow control settings (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition.
A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
SubscriberClient
type
SubscriberClient
struct
{
// contains filtered or unexported fields
}
SubscriberClient is a Cloud Pub/Sub Lite client to receive messages for a given subscription.
See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages.
func NewSubscriberClient
func
NewSubscriberClient
(
ctx
context
.
Context
,
settings
ReceiveSettings
,
subscription
pubsublite
.
SubscriptionPath
,
opts
...
option
.
ClientOption
)
(
*
SubscriberClient
,
error
)
NewSubscriberClient creates a new Cloud Pub/Sub Lite client to receive messages for a given subscription.
See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages.
func (*SubscriberClient) Receive
func
(
s
*
SubscriberClient
)
Receive
(
ctx
context
.
Context
,
f
MessageReceiverFunc
)
error
Receive calls f with the messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If there is a fatal service error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged.
Receive calls f concurrently from multiple goroutines if the SubscriberClient is connected to multiple partitions. All messages received by f must be ACKed or NACKed. Failure to do so can prevent Receive from returning.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Each SubscriberClient may have only one invocation of Receive active at a time.
Examples
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)
func
main
()
{
ctx
:=
context
.
Background
()
subscription
:=
pubsublite
.
SubscriptionPath
{
Project
:
"project-id"
,
Zone
:
"zone"
,
SubscriptionID
:
"subscription-id"
,
}
// NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent.
subscriber
,
err
:=
ps
.
NewSubscriberClient
(
ctx
,
ps
.
DefaultReceiveSettings
,
subscription
)
if
err
!=
nil
{
// TODO: Handle error.
}
cctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
err
=
subscriber
.
Receive
(
cctx
,
func
(
ctx
context
.
Context
,
m
*
pubsub
.
Message
)
{
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m
.
Ack
()
})
if
err
!=
nil
{
// TODO: Handle error.
}
// Call cancel from callback, or another goroutine.
cancel
()
}
maxOutstanding
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)
func
main
()
{
ctx
:=
context
.
Background
()
subscription
:=
pubsublite
.
SubscriptionPath
{
Project
:
"project-id"
,
Zone
:
"zone"
,
SubscriptionID
:
"subscription-id"
,
}
settings
:=
ps
.
DefaultReceiveSettings
settings
.
MaxOutstandingMessages
=
5
settings
.
MaxOutstandingBytes
=
10e6
subscriber
,
err
:=
ps
.
NewSubscriberClient
(
ctx
,
settings
,
subscription
)
if
err
!=
nil
{
// TODO: Handle error.
}
cctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
err
=
subscriber
.
Receive
(
cctx
,
func
(
ctx
context
.
Context
,
m
*
pubsub
.
Message
)
{
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m
.
Ack
()
})
if
err
!=
nil
{
// TODO: Handle error.
}
// Call cancel from callback, or another goroutine.
cancel
()
}