A Subscription object will give you access to your Cloud Pub/Sub subscription.
Subscriptions are sometimes retrieved when using various methods:
Subscription objects may be created directly with:
All Subscription objects are instances of an [EventEmitter]( http://nodejs.org/api/events.html
). The subscription will pull for messages automatically as long as there is at least one listener assigned for the message
event. Available events:
Upon receipt of a message: on(event: 'message', listener: (message: Message ) => void): this;
Upon receipt of an error: on(event: 'error', listener: (error: Error) => void): this;
Upon receipt of a (non-fatal) debug warning: on(event: 'debug', listener: (msg: DebugMessage) => void): this;
Upon the closing of the subscriber: on(event: 'close', listener: Function): this;
By default Subscription objects allow you to process 100 messages at the same time. You can fine tune this value by adjusting the options.flowControl.maxMessages
option.
If your subscription is seeing more re-deliveries than preferable, you might try increasing your options.ackDeadline
value or decreasing the options.streamingOptions.maxStreams
value.
Subscription objects handle ack management, by automatically extending the ack deadline while the message is being processed, to then issue the ack or nack of such message when the processing is done. **Note:** message redelivery is still possible.
By default each PubSub
instance can handle 100 open streams, with default options this translates to less than 20 Subscriptions per PubSub instance. If you wish to create more Subscriptions than that, you can either create multiple PubSub instances or lower the options.streamingOptions.maxStreams
value on each Subscription object.
Inheritance
EventEmitter > SubscriptionPackage
@google-cloud/pubsubExamples
From
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
pubsub
.
getSubscriptions
((
err
,
subscriptions
)
=
>
{
// `subscriptions` is an array of Subscription objects.
});
From
const
topic
=
pubsub
.
topic
(
'my-topic'
);
topic
.
getSubscriptions
((
err
,
subscriptions
)
=
>
{
// `subscriptions` is an array of Subscription objects.
});
const
topic
=
pubsub
.
topic
(
'my-topic'
);
topic
.
createSubscription
(
'new-subscription'
,
(
err
,
subscription
)
=
>
{
// `subscription` is a Subscription object.
});
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
// `subscription` is a Subscription object.
Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.
// Register an error handler.
subscription
.
on
(
'error'
,
(
err
)
=
>
{});
// Register a debug handler, to catch non-fatal errors and other messages.
subscription
.
on
(
'debug'
,
msg
=
>
{
console
.
log
(
msg
.
message
);
});
// Register a close handler in case the subscriber closes unexpectedly
subscription
.
on
(
'close'
,
()
=
>
{});
// Register a listener for `message` events.
function
onMessage
(
message
)
{
// Called every time a message is received.
// message.id = ID of the message.
// message.ackId = ID used to acknowledge the message receival.
// message.data = Contents of the message.
// message.attributes = Attributes of the message.
// message.publishTime = Date when Pub/Sub received the message.
// Ack the message:
// message.ack();
// This doesn't ack the message, but allows more messages to be retrieved
// if your limit was hit or if you don't want to ack the message.
// message.nack();
}
subscription
.
on
(
'message'
,
onMessage
);
// Remove the listener from receiving `message` events.
subscription
.
removeListener
(
'message'
,
onMessage
);
To apply a fine level of flow control, consider the following configuration
const
subscription
=
topic
.
subscription
(
'my-sub'
,
{
flowControl
:
{
maxMessages
:
1
,
// this tells the client to manage and lock any excess messages
allowExcessMessages
:
false
}
});
Constructors
(constructor)(pubsub, name, options)
constructor
(
pubsub
:
PubSub
,
name
:
string
,
options
?:
SubscriptionOptions
);
Constructs a new instance of the Subscription
class
Properties
iam
iam
:
IAM
;
isOpen
get
isOpen
()
:
boolean
;
Indicates if the Subscription is open and receiving messages.
{boolean}
metadata
metadata
?:
google
.
pubsub
.
v1
.
ISubscription
;
name
name
:
string
;
projectId
get
projectId
()
:
string
;
{string}
pubsub
pubsub
:
PubSub
;
request
request
:
typeof
PubSub
.
prototype
.
request
;
topic
topic
?:
Topic
|
string
;
Methods
close()
close
()
:
Promise<void>
;
Closes the Subscription, once this is called you will no longer receive message events unless you call {Subscription#open} or add new message listeners.
Promise
<void>
subscription
.
close
(
err
=
>
{
if
(
err
)
{
// Error handling omitted.
}
});
// If the callback is omitted a Promise will be returned.
subscription
.
close
().
then
(()
=
>
{});
close(callback)
close
(
callback
:
SubscriptionCloseCallback
)
:
void
;
void
create(options)
create
(
options
?:
CreateSubscriptionOptions
)
:
Promise<CreateSubscriptionResponse>
;
Create a subscription.
options
CreateSubscriptionOptions
See a [Subscription resource]( https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions ).
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'newMessages'
);
const
callback
=
function
(
err
,
subscription
,
apiResponse
)
{};
subscription
.
create
(
callback
);
With options
subscription
.
create
({
ackDeadlineSeconds
:
90
},
callback
);
If the callback is omitted, we'll return a Promise.
const
[
sub
,
apiResponse
]
=
await
subscription
.
create
();
create(callback)
create
(
callback
:
CreateSubscriptionCallback
)
:
void
;
void
create(options, callback)
create
(
options
:
CreateSubscriptionOptions
,
callback
:
CreateSubscriptionCallback
)
:
void
;
void
createSnapshot(name, gaxOpts)
createSnapshot
(
name
:
string
,
gaxOpts
?:
CallOptions
)
:
Promise<CreateSnapshotResponse>
;
Create a snapshot with the given name.
name
string
Name of the snapshot.
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
const
callback
=
(
err
,
snapshot
,
apiResponse
)
=
>
{
if
(
!
err
)
{
// The snapshot was created successfully.
}
};
subscription
.
createSnapshot
(
'my-snapshot'
,
callback
);
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
createSnapshot
(
'my-snapshot'
).
then
((
data
)
=
>
{
const
snapshot
=
data
[
0
];
const
apiResponse
=
data
[
1
];
});
createSnapshot(name, callback)
createSnapshot
(
name
:
string
,
callback
:
CreateSnapshotCallback
)
:
void
;
void
createSnapshot(name, gaxOpts, callback)
createSnapshot
(
name
:
string
,
gaxOpts
:
CallOptions
,
callback
:
CreateSnapshotCallback
)
:
void
;
void
delete(gaxOpts)
delete
(
gaxOpts
?:
CallOptions
)
:
Promise<EmptyResponse>
;
Delete the subscription. Pull requests from the current subscription will be errored once unsubscription is complete.
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
subscription
.
delete
((
err
,
apiResponse
)
=
>
{});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
delete
().
then
((
data
)
=
>
{
const
apiResponse
=
data
[
0
];
});
delete(callback)
delete
(
callback
:
EmptyCallback
)
:
void
;
void
delete(gaxOpts, callback)
delete
(
gaxOpts
:
CallOptions
,
callback
:
EmptyCallback
)
:
void
;
void
detached()
detached
()
:
Promise<DetachedResponse>
;
Check if a subscription is detached.
Promise
< DetachedResponse
>
{Promise
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
subscription
.
detached
((
err
,
exists
)
=
>
{});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
detached
().
then
((
data
)
=
>
{
const
detached
=
data
[
0
];
});
detached(callback)
detached
(
callback
:
DetachedCallback
)
:
void
;
callback
DetachedCallback
void
exists()
exists
()
:
Promise<ExistsResponse>
;
Check if a subscription exists.
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
subscription
.
exists
((
err
,
exists
)
=
>
{});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
exists
().
then
((
data
)
=
>
{
const
exists
=
data
[
0
];
});
exists(callback)
exists
(
callback
:
ExistsCallback
)
:
void
;
void
formatMetadata_(metadata)
static
formatMetadata_
(
metadata
:
SubscriptionMetadata
)
:
google
.
pubsub
.
v1
.
ISubscription
;
formatName_(projectId, name)
static
formatName_
(
projectId
:
string
,
name
:
string
)
:
string
;
projectId
string
name
string
string
get(gaxOpts)
get
(
gaxOpts
?:
GetSubscriptionOptions
)
:
Promise<GetSubscriptionResponse>
;
Get a subscription if it exists.
gaxOpts
GetSubscriptionOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
subscription
.
get
((
err
,
subscription
,
apiResponse
)
=
>
{
// The `subscription` data has been populated.
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
get
().
then
((
data
)
=
>
{
const
subscription
=
data
[
0
];
const
apiResponse
=
data
[
1
];
});
get(callback)
get
(
callback
:
GetSubscriptionCallback
)
:
void
;
void
get(gaxOpts, callback)
get
(
gaxOpts
:
GetSubscriptionOptions
,
callback
:
GetSubscriptionCallback
)
:
void
;
void
getMetadata(gaxOpts)
getMetadata
(
gaxOpts
?:
CallOptions
)
:
Promise<GetSubscriptionMetadataResponse>
;
Fetches the subscriptions metadata.
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
subscription
.
getMetadata
((
err
,
apiResponse
)
=
>
{
if
(
err
)
{
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
getMetadata
().
then
((
data
)
=
>
{
const
apiResponse
=
data
[
0
];
});
getMetadata(callback)
getMetadata
(
callback
:
GetSubscriptionMetadataCallback
)
:
void
;
void
getMetadata(gaxOpts, callback)
getMetadata
(
gaxOpts
:
CallOptions
,
callback
:
GetSubscriptionMetadataCallback
)
:
void
;
void
modifyPushConfig(config, gaxOpts)
modifyPushConfig
(
config
:
PushConfig
,
gaxOpts
?:
CallOptions
)
:
Promise<EmptyResponse>
;
Modify the push config for the subscription.
config
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
{
PubSub
}
=
require
(
'@google-cloud/pubsub'
);
const
pubsub
=
new
PubSub
();
const
topic
=
pubsub
.
topic
(
'my-topic'
);
const
subscription
=
topic
.
subscription
(
'my-subscription'
);
const
pushConfig
=
{
pushEndpoint
:
'https://mydomain.com/push'
,
attributes
:
{
key
:
'value'
},
oidcToken
:
{
serviceAccountEmail
:
'myproject@appspot.gserviceaccount.com'
,
audience
:
'myaudience'
}
};
subscription
.
modifyPushConfig
(
pushConfig
,
(
err
,
apiResponse
)
=
>
{
if
(
err
)
{
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
modifyPushConfig
(
pushConfig
).
then
((
data
)
=
>
{
const
apiResponse
=
data
[
0
];
});
modifyPushConfig(config, callback)
modifyPushConfig
(
config
:
PushConfig
,
callback
:
EmptyCallback
)
:
void
;
void
modifyPushConfig(config, gaxOpts, callback)
modifyPushConfig
(
config
:
PushConfig
,
gaxOpts
:
CallOptions
,
callback
:
EmptyCallback
)
:
void
;
void
open()
open
()
:
void
;
Opens the Subscription to receive messages. In general this method shouldn't need to be called, unless you wish to receive messages after calling . Alternatively one could just assign a new message
event listener which will also re-open the Subscription.
void
subscription
.
on
(
'message'
,
message
=
>
message
.
ack
());
// Close the subscription.
subscription
.
close
(
err
=
>
{
if
(
err
)
{
// Error handling omitted.
}
The
subscription
has
been
closed
and
messages
will
no
longer
be
received
.
});
// Resume receiving messages.
subscription
.
open
();
seek(snapshot, gaxOpts)
seek
(
snapshot
:
string
|
Date
,
gaxOpts
?:
CallOptions
)
:
Promise<SeekResponse>
;
Seeks an existing subscription to a point in time or a given snapshot.
snapshot
string | Date
The point to seek to. This will accept the name of the snapshot or a Date object.
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
callback
=
(
err
,
resp
)
=
>
{
if
(
!
err
)
{
// Seek was successful.
}
};
subscription
.
seek
(
'my-snapshot'
,
callback
);
//-
// Alternatively, to specify a certain point in time, you can provide a
Date
// object.
//-
const
date
=
new
Date
(
'October 21 2015'
);
subscription
.
seek
(
date
,
callback
);
seek(snapshot, callback)
seek
(
snapshot
:
string
|
Date
,
callback
:
SeekCallback
)
:
void
;
void
seek(snapshot, gaxOpts, callback)
seek
(
snapshot
:
string
|
Date
,
gaxOpts
:
CallOptions
,
callback
:
SeekCallback
)
:
void
;
void
setMetadata(metadata, gaxOpts)
setMetadata
(
metadata
:
SubscriptionMetadata
,
gaxOpts
?:
CallOptions
)
:
Promise<SetSubscriptionMetadataResponse>
;
Update the subscription object.
metadata
gaxOpts
CallOptions
Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html .
const
metadata
=
{
key
:
'value'
};
subscription
.
setMetadata
(
metadata
,
(
err
,
apiResponse
)
=
>
{
if
(
err
)
{
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription
.
setMetadata
(
metadata
).
then
((
data
)
=
>
{
const
apiResponse
=
data
[
0
];
});
setMetadata(metadata, callback)
setMetadata
(
metadata
:
SubscriptionMetadata
,
callback
:
SetSubscriptionMetadataCallback
)
:
void
;
void
setMetadata(metadata, gaxOpts, callback)
setMetadata
(
metadata
:
SubscriptionMetadata
,
gaxOpts
:
CallOptions
,
callback
:
SetSubscriptionMetadataCallback
)
:
void
;
metadata
gaxOpts
CallOptions
callback
void
setOptions(options)
setOptions
(
options
:
SubscriberOptions
)
:
void
;
Sets the Subscription options.
void
snapshot(name)
snapshot
(
name
:
string
)
:
Snapshot
;
Create a Snapshot object. See to create a snapshot.
name
string
The name of the snapshot.
const
snapshot
=
subscription
.
snapshot
(
'my-snapshot'
);