Package pstest provides a fake Cloud PubSub service for testing. It implements a simplified form of the service, suitable for unit tests. It may behave differently from the actual service in ways in which the service is non-deterministic or unspecified: timing, delivery order, etc.
This package is EXPERIMENTAL and is subject to change without notice.
See the example for usage.
Functions
func ResetMinAckDeadline
func
ResetMinAckDeadline
()
ResetMinAckDeadline resets the minack deadline to the default.
func SetMinAckDeadline
SetMinAckDeadline changes the minack deadline to n. Must be greater than or equal to 1 second. Remember to reset this value to the default after your test changes it. Example usage:
pstest.SetMinAckDeadlineSecs(1)
defer pstest.ResetMinAckDeadlineSecs()
func ValidateFilter
ValidateFilter validates if the filter string is parsable.
GServer
type
GServer
struct
{
pb
.
UnimplementedPublisherServer
pb
.
UnimplementedSubscriberServer
pb
.
UnimplementedSchemaServiceServer
// contains filtered or unexported fields
}
GServer is the underlying service implementor. It is not intended to be used directly.
func (*GServer) Acknowledge
func
(
s
*
GServer
)
Acknowledge
(
_
context
.
Context
,
req
*
pb
.
AcknowledgeRequest
)
(
*
emptypb
.
Empty
,
error
)
func (*GServer) CommitSchema
func
(
s
*
GServer
)
CommitSchema
(
_
context
.
Context
,
req
*
pb
.
CommitSchemaRequest
)
(
*
pb
.
Schema
,
error
)
func (*GServer) CreateSchema
func
(
s
*
GServer
)
CreateSchema
(
_
context
.
Context
,
req
*
pb
.
CreateSchemaRequest
)
(
*
pb
.
Schema
,
error
)
func (*GServer) CreateSubscription
func
(
s
*
GServer
)
CreateSubscription
(
_
context
.
Context
,
ps
*
pb
.
Subscription
)
(
*
pb
.
Subscription
,
error
)
func (*GServer) CreateTopic
func (*GServer) DeleteSchema
func
(
s
*
GServer
)
DeleteSchema
(
_
context
.
Context
,
req
*
pb
.
DeleteSchemaRequest
)
(
*
emptypb
.
Empty
,
error
)
func (*GServer) DeleteSchemaRevision
func
(
s
*
GServer
)
DeleteSchemaRevision
(
_
context
.
Context
,
req
*
pb
.
DeleteSchemaRevisionRequest
)
(
*
pb
.
Schema
,
error
)
func (*GServer) DeleteSubscription
func
(
s
*
GServer
)
DeleteSubscription
(
_
context
.
Context
,
req
*
pb
.
DeleteSubscriptionRequest
)
(
*
emptypb
.
Empty
,
error
)
func (*GServer) DeleteTopic
func
(
s
*
GServer
)
DeleteTopic
(
_
context
.
Context
,
req
*
pb
.
DeleteTopicRequest
)
(
*
emptypb
.
Empty
,
error
)
func (*GServer) DetachSubscription
func
(
s
*
GServer
)
DetachSubscription
(
_
context
.
Context
,
req
*
pb
.
DetachSubscriptionRequest
)
(
*
pb
.
DetachSubscriptionResponse
,
error
)
func (*GServer) GetSchema
func
(
s
*
GServer
)
GetSchema
(
_
context
.
Context
,
req
*
pb
.
GetSchemaRequest
)
(
*
pb
.
Schema
,
error
)
func (*GServer) GetSubscription
func
(
s
*
GServer
)
GetSubscription
(
_
context
.
Context
,
req
*
pb
.
GetSubscriptionRequest
)
(
*
pb
.
Subscription
,
error
)
func (*GServer) GetTopic
func
(
s
*
GServer
)
GetTopic
(
_
context
.
Context
,
req
*
pb
.
GetTopicRequest
)
(
*
pb
.
Topic
,
error
)
func (*GServer) ListSchemaRevisions
func
(
s
*
GServer
)
ListSchemaRevisions
(
_
context
.
Context
,
req
*
pb
.
ListSchemaRevisionsRequest
)
(
*
pb
.
ListSchemaRevisionsResponse
,
error
)
func (*GServer) ListSchemas
func
(
s
*
GServer
)
ListSchemas
(
_
context
.
Context
,
req
*
pb
.
ListSchemasRequest
)
(
*
pb
.
ListSchemasResponse
,
error
)
func (*GServer) ListSubscriptions
func
(
s
*
GServer
)
ListSubscriptions
(
_
context
.
Context
,
req
*
pb
.
ListSubscriptionsRequest
)
(
*
pb
.
ListSubscriptionsResponse
,
error
)
func (*GServer) ListTopicSubscriptions
func
(
s
*
GServer
)
ListTopicSubscriptions
(
_
context
.
Context
,
req
*
pb
.
ListTopicSubscriptionsRequest
)
(
*
pb
.
ListTopicSubscriptionsResponse
,
error
)
func (*GServer) ListTopics
func
(
s
*
GServer
)
ListTopics
(
_
context
.
Context
,
req
*
pb
.
ListTopicsRequest
)
(
*
pb
.
ListTopicsResponse
,
error
)
func (*GServer) ModifyAckDeadline
func
(
s
*
GServer
)
ModifyAckDeadline
(
_
context
.
Context
,
req
*
pb
.
ModifyAckDeadlineRequest
)
(
*
emptypb
.
Empty
,
error
)
func (*GServer) Publish
func
(
s
*
GServer
)
Publish
(
_
context
.
Context
,
req
*
pb
.
PublishRequest
)
(
*
pb
.
PublishResponse
,
error
)
func (*GServer) Pull
func
(
s
*
GServer
)
Pull
(
ctx
context
.
Context
,
req
*
pb
.
PullRequest
)
(
*
pb
.
PullResponse
,
error
)
func (*GServer) RollbackSchema
func
(
s
*
GServer
)
RollbackSchema
(
_
context
.
Context
,
req
*
pb
.
RollbackSchemaRequest
)
(
*
pb
.
Schema
,
error
)
RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (*GServer) Seek
func
(
s
*
GServer
)
Seek
(
ctx
context
.
Context
,
req
*
pb
.
SeekRequest
)
(
*
pb
.
SeekResponse
,
error
)
func (*GServer) StreamingPull
func
(
s
*
GServer
)
StreamingPull
(
sps
pb
.
Subscriber_StreamingPullServer
)
error
func (*GServer) UpdateSubscription
func
(
s
*
GServer
)
UpdateSubscription
(
_
context
.
Context
,
req
*
pb
.
UpdateSubscriptionRequest
)
(
*
pb
.
Subscription
,
error
)
func (*GServer) UpdateTopic
func
(
s
*
GServer
)
UpdateTopic
(
_
context
.
Context
,
req
*
pb
.
UpdateTopicRequest
)
(
*
pb
.
Topic
,
error
)
func (*GServer) ValidateMessage
func
(
s
*
GServer
)
ValidateMessage
(
_
context
.
Context
,
req
*
pb
.
ValidateMessageRequest
)
(
*
pb
.
ValidateMessageResponse
,
error
)
ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate the message against is not empty.
func (*GServer) ValidateSchema
func
(
s
*
GServer
)
ValidateSchema
(
_
context
.
Context
,
req
*
pb
.
ValidateSchemaRequest
)
(
*
pb
.
ValidateSchemaResponse
,
error
)
ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.
Message
type
Message
struct
{
ID
string
Data
[]
byte
Attributes
map
[
string
]
string
PublishTime
time
.
Time
Deliveries
int
// number of times delivery of the message was attempted
Acks
int
// number of acks received from clients
Modacks
[]
Modack
// modacks received by server for this message
OrderingKey
string
// contains filtered or unexported fields
}
A Message is a message that was published to the server.
Modack
Modack represents a modack sent to the server.
Reactor
type
Reactor
interface
{
// React handles the message types and returns results. If "handled" is false,
// then the test server will ignore the results and continue to the next reactor
// or the original handler.
React
(
_
interface
{})
(
handled
bool
,
ret
interface
{},
err
error
)
}
Reactor is an interface to allow reaction function to a certain call.
ReactorOptions
ReactorOptions is a map that Server uses to look up reactors. Key is the function name, value is array of reactor for the function.
Server
type
Server
struct
{
Addr
string
// The address that the server is listening on.
GServer
GServer
// Not intended to be used directly.
// contains filtered or unexported fields
}
Server is a fake Pub/Sub server.
func NewServer
func
NewServer
(
opts
...
ServerReactorOption
)
*
Server
NewServer creates a new fake server running in the current process.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
func
main
()
{
ctx
:=
context
.
Background
()
// Start a fake server running locally.
srv
:=
pstest
.
NewServer
()
defer
srv
.
Close
()
// Connect to the server without using TLS.
conn
,
err
:=
grpc
.
Dial
(
srv
.
Addr
,
grpc
.
WithInsecure
())
if
err
!=
nil
{
// TODO: Handle error.
}
defer
conn
.
Close
()
// Use the connection when creating a pubsub client.
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project"
,
option
.
WithGRPCConn
(
conn
))
if
err
!=
nil
{
// TODO: Handle error.
}
defer
client
.
Close
()
_
=
client
// TODO: Use the client.
}
func NewServerWithPort
func
NewServerWithPort
(
port
int
,
opts
...
ServerReactorOption
)
*
Server
NewServerWithPort creates a new fake server running in the current process at the specified port.
Example
package
main
import
(
"context"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
func
main
()
{
ctx
:=
context
.
Background
()
// Start a fake server running locally at 9001.
srv
:=
pstest
.
NewServerWithPort
(
9001
)
defer
srv
.
Close
()
// Connect to the server without using TLS.
conn
,
err
:=
grpc
.
Dial
(
srv
.
Addr
,
grpc
.
WithInsecure
())
if
err
!=
nil
{
// TODO: Handle error.
}
defer
conn
.
Close
()
// Use the connection when creating a pubsub client.
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
"project"
,
option
.
WithGRPCConn
(
conn
))
if
err
!=
nil
{
// TODO: Handle error.
}
defer
client
.
Close
()
_
=
client
// TODO: Use the client.
}
func (*Server) AddPublishResponse
func
(
s
*
Server
)
AddPublishResponse
(
pbr
*
pb
.
PublishResponse
,
err
error
)
AddPublishResponse adds a new publish response to the channel used for responding to publish requests.
func (*Server) ClearMessages
func
(
s
*
Server
)
ClearMessages
()
ClearMessages removes all published messages from internal containers.
func (*Server) Close
Close shuts down the server and releases all resources.
func (*Server) Message
Message returns the message with the given ID, or nil if no message with that ID was published.
func (*Server) Messages
Messages returns information about all messages ever published.
func (*Server) Publish
Publish behaves as if the Publish RPC was called with a message with the given data and attrs. It returns the ID of the message. The topic will be created if it doesn't exist.
Publish panics if there is an error, which is appropriate for testing.
func (*Server) PublishOrdered
func
(
s
*
Server
)
PublishOrdered
(
topic
string
,
data
[]
byte
,
attrs
map
[
string
]
string
,
orderingKey
string
)
string
PublishOrdered behaves as if the Publish RPC was called with a message with the given data, attrs and ordering key. It returns the ID of the message. The topic will be created if it doesn't exist.
PublishOrdered panics if there is an error, which is appropriate for testing.
func (*Server) ResetPublishResponses
ResetPublishResponses resets the buffered publishResponses channel with a new buffered channel with the given size.
func (*Server) SetAutoPublishResponse
SetAutoPublishResponse controls whether to automatically respond to messages published or to use user-added responses from the publishResponses channel.
func (*Server) SetStreamTimeout
SetStreamTimeout sets the amount of time a stream will be active before it shuts itself down. This mimics the real service's behavior of closing streams after 30 minutes. If SetStreamTimeout is never called or is passed zero, streams never shut down.
func (*Server) SetTimeNowFunc
SetTimeNowFunc registers f as a function to be used instead of time.Now for this server.
func (*Server) Wait
func
(
s
*
Server
)
Wait
()
Wait blocks until all server activity has completed.
ServerReactorOption
ServerReactorOption is options passed to the server for reactor creation.
func WithErrorInjection
func
WithErrorInjection
(
funcName
string
,
code
codes
.
Code
,
msg
string
)
ServerReactorOption
WithErrorInjection creates a ServerReactorOption that injects error with defined status code and message for a certain function.