Package cloud.google.com/go/pubsub/pstest (v1.31.0)

Package pstest provides a fake Cloud PubSub service for testing. It implements a simplified form of the service, suitable for unit tests. It may behave differently from the actual service in ways in which the service is non-deterministic or unspecified: timing, delivery order, etc.

This package is EXPERIMENTAL and is subject to change without notice.

See the example for usage.

Functions

func ResetMinAckDeadline

  func 
  
 ResetMinAckDeadline 
 () 
 

ResetMinAckDeadline resets the minack deadline to the default.

func SetMinAckDeadline

  func 
  
 SetMinAckDeadline 
 ( 
 n 
  
  time 
 
 . 
  Duration 
 
 ) 
 

SetMinAckDeadline changes the minack deadline to n. Must be greater than or equal to 1 second. Remember to reset this value to the default after your test changes it. Example usage:

 pstest.SetMinAckDeadlineSecs(1)
defer pstest.ResetMinAckDeadlineSecs() 

GServer

GServer is the underlying service implementor. It is not intended to be used directly.

func (*GServer) Acknowledge

func (*GServer) CommitSchema

func (*GServer) CreateSchema

func (*GServer) CreateSubscription

  func 
  
 ( 
 s 
  
 * 
  GServer 
 
 ) 
  
 CreateSubscription 
 ( 
 _ 
  
  context 
 
 . 
  Context 
 
 , 
  
 ps 
  
 * 
  pb 
 
 . 
  Subscription 
 
 ) 
  
 ( 
 * 
  pb 
 
 . 
  Subscription 
 
 , 
  
  error 
 
 ) 
 

func (*GServer) CreateTopic

  func 
  
 ( 
 s 
  
 * 
  GServer 
 
 ) 
  
 CreateTopic 
 ( 
 _ 
  
  context 
 
 . 
  Context 
 
 , 
  
 t 
  
 * 
  pb 
 
 . 
  Topic 
 
 ) 
  
 ( 
 * 
  pb 
 
 . 
  Topic 
 
 , 
  
  error 
 
 ) 
 

func (*GServer) DeleteSchema

func (*GServer) DeleteSchemaRevision

  func 
  
 ( 
 s 
  
 * 
  GServer 
 
 ) 
  
 DeleteSchemaRevision 
 ( 
 _ 
  
  context 
 
 . 
  Context 
 
 , 
  
 req 
  
 * 
  pb 
 
 . 
  DeleteSchemaRevisionRequest 
 
 ) 
  
 ( 
 * 
  pb 
 
 . 
  Schema 
 
 , 
  
  error 
 
 ) 
 

func (*GServer) DeleteSubscription

func (*GServer) DeleteTopic

func (*GServer) DetachSubscription

func (*GServer) GetSchema

func (*GServer) GetSubscription

func (*GServer) GetTopic

func (*GServer) ListSchemaRevisions

func (*GServer) ListSchemas

func (*GServer) ListSubscriptions

func (*GServer) ListTopicSubscriptions

func (*GServer) ListTopics

func (*GServer) ModifyAckDeadline

func (*GServer) Publish

func (*GServer) Pull

func (*GServer) RollbackSchema

  func 
  
 ( 
 s 
  
 * 
  GServer 
 
 ) 
  
 RollbackSchema 
 ( 
 _ 
  
  context 
 
 . 
  Context 
 
 , 
  
 req 
  
 * 
  pb 
 
 . 
  RollbackSchemaRequest 
 
 ) 
  
 ( 
 * 
  pb 
 
 . 
  Schema 
 
 , 
  
  error 
 
 ) 
 

RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.

func (*GServer) Seek

func (*GServer) StreamingPull

func (*GServer) UpdateSubscription

func (*GServer) UpdateTopic

  func 
  
 ( 
 s 
  
 * 
  GServer 
 
 ) 
  
 UpdateTopic 
 ( 
 _ 
  
  context 
 
 . 
  Context 
 
 , 
  
 req 
  
 * 
  pb 
 
 . 
  UpdateTopicRequest 
 
 ) 
  
 ( 
 * 
  pb 
 
 . 
  Topic 
 
 , 
  
  error 
 
 ) 
 

func (*GServer) ValidateMessage

ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate the message against is not empty.

func (*GServer) ValidateSchema

ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.

Message

  type 
  
 Message 
  
 struct 
  
 { 
  
 ID 
  
  string 
 
  
 Data 
  
 [] 
  byte 
 
  
 Attributes 
  
 map 
 [ 
  string 
 
 ] 
  string 
 
  
 PublishTime 
  
  time 
 
 . 
  Time 
 
  
 Deliveries 
  
  int 
 
  
 // number of times delivery of the message was attempted 
  
 Acks 
  
  int 
 
  
 // number of acks received from clients 
  
 Modacks 
  
 [] 
  Modack 
 
  
 // modacks received by server for this message 
  
 OrderingKey 
  
  string 
 
  
 // contains filtered or unexported fields 
 } 
 

A Message is a message that was published to the server.

Modack

  type 
  
 Modack 
  
 struct 
  
 { 
  
 AckID 
  
  string 
 
  
 AckDeadline 
  
  int32 
 
  
 ReceivedAt 
  
  time 
 
 . 
  Time 
 
 } 
 

Modack represents a modack sent to the server.

Reactor

  type 
  
 Reactor 
  
 interface 
  
 { 
  
 // React handles the message types and returns results.  If "handled" is false, 
  
 // then the test server will ignore the results and continue to the next reactor 
  
 // or the original handler. 
  
 React 
 ( 
 _ 
  
 interface 
 {}) 
  
 ( 
 handled 
  
  bool 
 
 , 
  
 ret 
  
 interface 
 {}, 
  
 err 
  
  error 
 
 ) 
 } 
 

Reactor is an interface to allow reaction function to a certain call.

ReactorOptions

  type 
  
 ReactorOptions 
  
 map 
 [ 
  string 
 
 ][] 
  Reactor 
 
 

ReactorOptions is a map that Server uses to look up reactors. Key is the function name, value is array of reactor for the function.

Server

  type 
  
 Server 
  
 struct 
  
 { 
  
 Addr 
  
  string 
 
  
 // The address that the server is listening on. 
  
 GServer 
  
  GServer 
 
  
 // Not intended to be used directly. 
  
 // contains filtered or unexported fields 
 } 
 

Server is a fake Pub/Sub server.

func NewServer

  func 
  
 NewServer 
 ( 
 opts 
  
 ... 
  ServerReactorOption 
 
 ) 
  
 * 
  Server 
 
 

NewServer creates a new fake server running in the current process.

Example

  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsub/pstest" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/grpc" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 // Start a fake server running locally. 
  
 srv 
  
 := 
  
 pstest 
 . 
 NewServer 
 () 
  
 defer 
  
 srv 
 . 
 Close 
 () 
  
 // Connect to the server without using TLS. 
  
 conn 
 , 
  
 err 
  
 := 
  
 grpc 
 . 
 Dial 
 ( 
 srv 
 . 
 Addr 
 , 
  
 grpc 
 . 
 WithInsecure 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 conn 
 . 
 Close 
 () 
  
 // Use the connection when creating a pubsub client. 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 "project" 
 , 
  
 option 
 . 
 WithGRPCConn 
 ( 
 conn 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 _ 
  
 = 
  
 client 
  
 // TODO: Use the client. 
 } 
 

func NewServerWithPort

  func 
  
 NewServerWithPort 
 ( 
 port 
  
  int 
 
 , 
  
 opts 
  
 ... 
  ServerReactorOption 
 
 ) 
  
 * 
  Server 
 
 

NewServerWithPort creates a new fake server running in the current process at the specified port.

Example

  package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsub/pstest" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/grpc" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 // Start a fake server running locally at 9001. 
  
 srv 
  
 := 
  
 pstest 
 . 
 NewServerWithPort 
 ( 
 9001 
 ) 
  
 defer 
  
 srv 
 . 
 Close 
 () 
  
 // Connect to the server without using TLS. 
  
 conn 
 , 
  
 err 
  
 := 
  
 grpc 
 . 
 Dial 
 ( 
 srv 
 . 
 Addr 
 , 
  
 grpc 
 . 
 WithInsecure 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 conn 
 . 
 Close 
 () 
  
 // Use the connection when creating a pubsub client. 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 "project" 
 , 
  
 option 
 . 
 WithGRPCConn 
 ( 
 conn 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 _ 
  
 = 
  
 client 
  
 // TODO: Use the client. 
 } 
 

func (*Server) AddPublishResponse

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 AddPublishResponse 
 ( 
 pbr 
  
 * 
  pb 
 
 . 
  PublishResponse 
 
 , 
  
 err 
  
  error 
 
 ) 
 

AddPublishResponse adds a new publish response to the channel used for responding to publish requests.

func (*Server) ClearMessages

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 ClearMessages 
 () 
 

ClearMessages removes all published messages from internal containers.

func (*Server) Close

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 Close 
 () 
  
  error 
 
 

Close shuts down the server and releases all resources.

func (*Server) Message

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 Message 
 ( 
 id 
  
  string 
 
 ) 
  
 * 
  Message 
 
 

Message returns the message with the given ID, or nil if no message with that ID was published.

func (*Server) Messages

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 Messages 
 () 
  
 [] 
 * 
  Message 
 
 

Messages returns information about all messages ever published.

func (*Server) Publish

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 Publish 
 ( 
 topic 
  
  string 
 
 , 
  
 data 
  
 [] 
  byte 
 
 , 
  
 attrs 
  
 map 
 [ 
  string 
 
 ] 
  string 
 
 ) 
  
  string 
 
 

Publish behaves as if the Publish RPC was called with a message with the given data and attrs. It returns the ID of the message. The topic will be created if it doesn't exist.

Publish panics if there is an error, which is appropriate for testing.

func (*Server) PublishOrdered

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 PublishOrdered 
 ( 
 topic 
  
  string 
 
 , 
  
 data 
  
 [] 
  byte 
 
 , 
  
 attrs 
  
 map 
 [ 
  string 
 
 ] 
  string 
 
 , 
  
 orderingKey 
  
  string 
 
 ) 
  
  string 
 
 

PublishOrdered behaves as if the Publish RPC was called with a message with the given data, attrs and ordering key. It returns the ID of the message. The topic will be created if it doesn't exist.

PublishOrdered panics if there is an error, which is appropriate for testing.

func (*Server) ResetPublishResponses

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 ResetPublishResponses 
 ( 
 size 
  
  int 
 
 ) 
 

ResetPublishResponses resets the buffered publishResponses channel with a new buffered channel with the given size.

func (*Server) SetAutoPublishResponse

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 SetAutoPublishResponse 
 ( 
 autoPublishResponse 
  
  bool 
 
 ) 
 

SetAutoPublishResponse controls whether to automatically respond to messages published or to use user-added responses from the publishResponses channel.

func (*Server) SetStreamTimeout

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 SetStreamTimeout 
 ( 
 d 
  
  time 
 
 . 
  Duration 
 
 ) 
 

SetStreamTimeout sets the amount of time a stream will be active before it shuts itself down. This mimics the real service's behavior of closing streams after 30 minutes. If SetStreamTimeout is never called or is passed zero, streams never shut down.

func (*Server) SetTimeNowFunc

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 SetTimeNowFunc 
 ( 
 f 
  
 func 
 () 
  
  time 
 
 . 
  Time 
 
 ) 
 

SetTimeNowFunc registers f as a function to be used instead of time.Now for this server.

func (*Server) Wait

  func 
  
 ( 
 s 
  
 * 
  Server 
 
 ) 
  
 Wait 
 () 
 

Wait blocks until all server activity has completed.

ServerReactorOption

  type 
  
 ServerReactorOption 
  
 struct 
  
 { 
  
 FuncName 
  
  string 
 
  
 Reactor 
  
  Reactor 
 
 } 
 

ServerReactorOption is options passed to the server for reactor creation.

func WithErrorInjection

  func 
  
 WithErrorInjection 
 ( 
 funcName 
  
  string 
 
 , 
  
 code 
  
  codes 
 
 . 
  Code 
 
 , 
  
 msg 
  
  string 
 
 ) 
  
  ServerReactorOption 
 
 

WithErrorInjection creates a ServerReactorOption that injects error with defined status code and message for a certain function.

Create a Mobile Website
View Site in Mobile | Classic
Share by: