[Cloud Pub/Sub]( https://developers.google.com/pubsub/overview ) is a reliable, many-to-many, asynchronous messaging service from Cloud Platform.
Package
@google-cloud/pubsubExamples
Import the client library
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
Create a client that uses Application Default Credentials (ADC) :
const
pubsub
=
new
PubSub
();
Create a client with explicit credentials :
const
pubsub
=
new
PubSub
({
projectId
:
'your-project-id'
,
keyFilename
:
'/path/to/keyfile.json'
});
Full quickstart example:
// Imports the Google Cloud client library
import
{
PubSub
}
from
'@google-cloud/pubsub'
;
async
function
quickstart
(
projectId
=
'your-project-id'
,
// Your Google Cloud Platform project ID
topicNameOrId
=
'my-topic'
,
// Name for the new topic to create
subscriptionName
=
'my-sub'
// Name for the new subscription to create
)
{
// Instantiates a client
const
pubsub
=
new
PubSub
({
projectId
});
// Creates a new topic
const
[
topic
]
=
await
pubsub
.
createTopic
(
topicNameOrId
);
console
.
log
(
`Topic
${
topic
.
name
}
created.`
);
// Creates a subscription on that new topic
const
[
subscription
]
=
await
topic
.
createSubscription
(
subscriptionName
);
// Receive callbacks for new messages on the subscription
subscription
.
on
(
'message'
,
message
=
>
{
console
.
log
(
'Received message:'
,
message
.
data
.
toString
());
process
.
exit
(
0
);
});
// Receive callbacks for errors on the subscription
subscription
.
on
(
'error'
,
error
=
>
{
console
.
error
(
'Received error:'
,
error
);
process
.
exit
(
1
);
});
// Send a message to the topic
topic
.
publishMessage
({
data
:
Buffer
.
from
(
'Test message!'
)});
}
Constructors
(constructor)(options)
constructor
(
options
?:
ClientConfig
);
Constructs a new instance of the PubSub
class
Properties
api
api
:
{
[
key
:
string
]
:
gax
.
ClientStub
;
};
auth
auth
:
GoogleAuth
;
getSnapshotsStream
getSnapshotsStream
:
()
=
>
ObjectStream<Snapshot>
;
getSubscriptionsStream
getSubscriptionsStream
:
()
=
>
ObjectStream<Subscription>
;
getTopicsStream
getTopicsStream
:
()
=
>
ObjectStream<Topic>
;
isEmulator
isEmulator
:
boolean
;
isIdResolved
get
isIdResolved
()
:
boolean
;
Returns true if we have actually resolved the full project name.
isOpen
isOpen
:
boolean
;
name
name
?:
string
;
options
options
:
ClientConfig
;
projectId
projectId
:
string
;
Promise
Promise
?:
PromiseConstructor
;
Methods
close()
close
()
:
Promise<void>
;
Closes out this object, releasing any server connections. Note that once you close a PubSub object, it may not be used again. Any pending operations (e.g. queued publish messages) will fail. If you have topic or subscription objects that may have pending operations, you should call close() on those first if you want any pending messages to be delivered correctly. The PubSub class doesn't track those.
EmptyCallback
Promise
<void>
{Promise
close(callback)
close
(
callback
:
EmptyCallback
)
:
void
;
void
closeAllClients_()
closeAllClients_
()
:
Promise<void>
;
Close all open client objects.
Promise
<void>
{Promise}
createSchema(schemaId, type, definition, gaxOpts)
createSchema
(
schemaId
:
string
,
type
:
SchemaType
,
definition
:
string
,
gaxOpts
?:
CallOptions
)
:
Promise<Schema>
;
Create a schema in the project.
schemaId
string
The name or ID of the subscription.
type
definition
string
The text describing the schema in terms of the type.
gaxOpts
CallOptions
Create a schema.
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
await
pubsub
.
createSchema
(
'messageType'
,
SchemaTypes
.
Avro
,
'{...avro definition...}'
);
createSubscription(topic, name, options)
createSubscription
(
topic
:
Topic
|
string
,
name
:
string
,
options
?:
CreateSubscriptionOptions
)
:
Promise<CreateSubscriptionResponse>
;
Create a subscription to a topic.
topic
name
string
The name of the subscription.
options
CreateSubscriptionOptions
See a [Subscription resource]( https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions ).
Subscribe to a topic.
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
const
topic
=
'messageCenter'
;
const
name
=
'newMessages'
;
const
callback
=
function
(
err
,
subscription
,
apiResponse
)
{};
pubsub
.
createSubscription
(
topic
,
name
,
callback
);
If the callback is omitted, we'll return a Promise.
pubsub
.
createSubscription
(
topic
,
name
)
.
then
(
function
(
data
)
{
const
subscription
=
data
[
0
];
const
apiResponse
=
data
[
1
];
});
createSubscription(topic, name, callback)
createSubscription
(
topic
:
Topic
|
string
,
name
:
string
,
callback
:
CreateSubscriptionCallback
)
:
void
;
void
createSubscription(topic, name, options, callback)
createSubscription
(
topic
:
Topic
|
string
,
name
:
string
,
options
:
CreateSubscriptionOptions
,
callback
:
CreateSubscriptionCallback
)
:
void
;
void
createTopic(name, gaxOpts)
createTopic
(
name
:
string
|
TopicMetadata
,
gaxOpts
?:
CallOptions
)
:
Promise<CreateTopicResponse>
;
Create a topic with the given name.
name
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
pubsub
.
createTopic
(
'my-new-topic'
,
function
(
err
,
topic
,
apiResponse
)
{
if
(
!
err
)
{
// The topic was created successfully.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub
.
createTopic
(
'my-new-topic'
).
then
(
function
(
data
)
{
const
topic
=
data
[
0
];
const
apiResponse
=
data
[
1
];
});
createTopic(name, callback)
createTopic
(
name
:
string
|
TopicMetadata
,
callback
:
CreateTopicCallback
)
:
void
;
void
createTopic(name, gaxOpts, callback)
createTopic
(
name
:
string
|
TopicMetadata
,
gaxOpts
:
CallOptions
,
callback
:
CreateTopicCallback
)
:
void
;
void
detachSubscription(name, gaxOpts)
detachSubscription
(
name
:
string
,
gaxOpts
?:
CallOptions
)
:
Promise<DetachSubscriptionResponse>
;
Detach a subscription with the given name.
name
string
Name of the subscription.
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
Promise
< DetachSubscriptionResponse
>
{Promise
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
pubsub
.
detachSubscription
(
'my-sub'
,
(
err
,
topic
,
apiResponse
)
=
>
{
if
(
!
err
)
{
// The topic was created successfully.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub
.
detachSubscription
(
'my-sub'
).
then
(
data
=
>
{
const
apiResponse
=
data
[
0
];
});
detachSubscription(name, callback)
detachSubscription
(
name
:
string
,
callback
:
DetachSubscriptionCallback
)
:
void
;
name
string
callback
DetachSubscriptionCallback
void
detachSubscription(name, gaxOpts, callback)
detachSubscription
(
name
:
string
,
gaxOpts
:
CallOptions
,
callback
:
DetachSubscriptionCallback
)
:
void
;
name
string
gaxOpts
CallOptions
callback
DetachSubscriptionCallback
void
determineBaseUrl_()
determineBaseUrl_
()
:
void
;
Determine the appropriate endpoint to use for API requests, first trying the apiEndpoint
parameter. If that isn't set, we try the Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try the standard gcloud alpha pubsub
environment variable (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production API is used.
Note that if the URL doesn't end in '.googleapis.com', we will assume that it's an emulator and disable strict SSL checks.
void
formatName_(name)
static
formatName_
(
name
:
string
)
:
string
;
name
string
string
getClient_(config, callback)
getClient_
(
config
:
GetClientConfig
,
callback
:
GetClientCallback
)
:
void
;
Get the PubSub client object.
config
GetClientConfig
Configuration object.
callback
GetClientCallback
The callback function.
void
getClientAsync_(config)
getClientAsync_
(
config
:
GetClientConfig
)
:
Promise<gax
.
ClientStub
> ;
Get the PubSub client object.
config
GetClientConfig
Configuration object.
Promise
< ClientStub
>
{Promise}
getClientConfig()
getClientConfig
()
:
Promise<ClientConfig>
;
Retrieve a client configuration, suitable for passing into a GAPIC 'v1' class constructor. This will fill out projectId, emulator URLs, and so forth.
getSchemaClient()
getSchemaClient
()
:
Promise<SchemaServiceClient>
;
Gets a schema client, creating one if needed. This is a shortcut for new v1.SchemaServiceClient(await pubsub.getClientConfig())
.
getSnapshots(options)
getSnapshots
(
options
?:
PageOptions
)
:
Promise<GetSnapshotsResponse>
;
Get a list of snapshots.
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
pubsub
.
getSnapshots
(
function
(
err
,
snapshots
)
{
if
(
!
err
)
{
// snapshots is an array of Snapshot objects.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub
.
getSnapshots
().
then
(
function
(
data
)
{
const
snapshots
=
data
[
0
];
});
getSnapshots(callback)
getSnapshots
(
callback
:
GetSnapshotsCallback
)
:
void
;
void
getSnapshots(options, callback)
getSnapshots
(
options
:
PageOptions
,
callback
:
GetSnapshotsCallback
)
:
void
;
void
getSubscriptions(options)
getSubscriptions
(
options
?:
GetSubscriptionsOptions
)
:
Promise<GetSubscriptionsResponse>
;
Get a list of the subscriptions registered to all of your project's topics. You may optionally provide a query object as the first argument to customize the response.
Your provided callback will be invoked with an error object if an API error occurred or an array of objects.
To get subscriptions for a topic, see Topic .
options
GetSubscriptionsOptions
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
pubsub
.
getSubscriptions
(
function
(
err
,
subscriptions
)
{
if
(
!
err
)
{
// subscriptions is an array of Subscription objects.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub
.
getSubscriptions
().
then
(
function
(
data
)
{
const
subscriptions
=
data
[
0
];
});
getSubscriptions(callback)
getSubscriptions
(
callback
:
GetSubscriptionsCallback
)
:
void
;
void
getSubscriptions(options, callback)
getSubscriptions
(
options
:
GetSubscriptionsOptions
,
callback
:
GetSubscriptionsCallback
)
:
void
;
void
getTopics(options)
getTopics
(
options
?:
PageOptions
)
:
Promise<GetTopicsResponse>
;
Get a list of the topics registered to your project. You may optionally provide a query object as the first argument to customize the response.
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
pubsub
.
getTopics
(
function
(
err
,
topics
)
{
if
(
!
err
)
{
// topics is an array of Topic objects.
}
});
//-
// Customize the query.
//-
pubsub
.
getTopics
({
pageSize
:
3
},
function
(
err
,
topics
)
{});
//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub
.
getTopics
().
then
(
function
(
data
)
{
const
topics
=
data
[
0
];
});
getTopics(callback)
getTopics
(
callback
:
GetTopicsCallback
)
:
void
;
void
getTopics(options, callback)
getTopics
(
options
:
PageOptions
,
callback
:
GetTopicsCallback
)
:
void
;
void
listSchemas(view, options)
listSchemas
(
view
?:
SchemaView
,
options
?:
CallOptions
)
:
AsyncIterable<google
.
pubsub
.
v1
.
ISchema
> ;
Get a list of schemas associated with your project.
The returned AsyncIterable will resolve to objects.
This method returns an async iterable. These objects can be adapted to work in a Promise/then framework, as well as with callbacks, but this discussion is considered out of scope for these docs.
view
SchemaView
The type of schema objects requested, which should be an enum value from SchemaViews . Defaults to Full.
options
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
for
await
(
const
s
of
pubsub
.
listSchemas
())
{
const
moreInfo
=
await
s
.
get
();
}
request(config, callback)
request<T
,
R
=
void
> (
config
:
RequestConfig
,
callback
:
RequestCallback<T
,
R
> )
:
void
;
Funnel all API requests through this method, to be sure we have a project ID.
config
RequestConfig
Configuration object.
callback
RequestCallback
<T, R>
The callback function.
void
T
R
schema(idOrName)
schema
(
idOrName
:
string
)
:
Schema
;
Create a Schema object, representing a schema within the project. See or to create a schema.
idOrName
string
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
const
schema
=
pubsub
.
schema
(
'my-schema'
);
snapshot(name)
snapshot
(
name
:
string
)
:
Snapshot
;
Create a Snapshot object. See to create a snapshot.
name
string
The name of the snapshot.
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
const
snapshot
=
pubsub
.
snapshot
(
'my-snapshot'
);
subscription(name, options)
subscription
(
name
:
string
,
options
?:
SubscriptionOptions
)
:
Subscription
;
Create a Subscription object. This command by itself will not run any API requests. You will receive a object, which will allow you to interact with a subscription.
name
string
Name of the subscription.
options
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
const
subscription
=
pubsub
.
subscription
(
'my-subscription'
);
// Register a listener for `message` events.
subscripti on
.
on
(
'message'
,
function
(
message
)
{
// Called every time a message is received.
// message.id = ID of the message.
// message.ackId = ID used to acknowledge the message receival.
// message.data = Contents of the message.
// message.attributes = Attributes of the message.
// message.publishTime = Date when Pub/Sub received the message.
});
topic(name, options)
topic
(
name
:
string
,
options
?:
PublishOptions
)
:
Topic
;
Create a Topic object. See to create a topic.
name
string
The name of the topic.
options
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
validateSchema(schema, gaxOpts)
validateSchema
(
schema
:
ISchema
,
gaxOpts
?:
CallOptions
)
:
Promise<void>
;
Validate a schema definition.
schema
gaxOpts
CallOptions
Promise
<void>
{Promise