Topics may use schemas to define a format that their messages must follow. When subscribing to a topic with a schema, the messages sent to the subscriber are guaranteed to be valid messages. These messages conform to the type and encoding specified in the schema settings associated with the topic.
The subscriber can determine the schema settings associated with a topic by looking at the following attributes:
-
googclient_schemaname
: The name of the schema used for validation. If the schema is deleted, the name is_deleted-schema_
. -
googclient_schemaencoding
: The encoding of the message, either JSON or BINARY. -
googclient_schemarevisionid
: The revision ID of the schema used to parse and validate the message. Each revision has a unique revision ID associated with it. The revision ID is an auto-generated eight-character UUID. If the revision is deleted, the ID is_deleted-schema-revision_
.
To learn more about schemas, see Schema overview .
Code samples for subscribing to topics associated with a schema
These samples show how to process messages when subscribing to topics configured with schema .
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .
Avro namespace
pubsub
=
::
google
::
cloud
::
pubsub
;
using
::
google
::
cloud
::
future
;
using
::
google
::
cloud
::
StatusOr
;
return
[](
pubsub
::
Subscriber
subscriber
)
{
auto
session
=
subscriber
.
Subscribe
(
[](
pubsub
::
Message
const
&
m
,
pubsub
::
AckHandler
h
)
{
std
::
cout
<<
"Message contents: "
<<
m
.
data
()
<<
"
\n
"
;
std
::
move
(
h
).
ack
();
});
return
session
;
}
namespace
pubsub
=
::
google
::
cloud
::
pubsub
;
using
::
google
::
cloud
::
future
;
using
::
google
::
cloud
::
StatusOr
;
return
[](
pubsub
::
Subscriber
subscriber
)
{
auto
session
=
subscriber
.
Subscribe
(
[](
pubsub
::
Message
const
&
m
,
pubsub
::
AckHandler
h
)
{
google
::
cloud
::
pubsub
::
samples
::
State
state
;
state
.
ParseFromString
(
std
::
string
{
m
.
data
()});
std
::
cout
<<
"Message contents: "
<<
state
.
DebugString
()
<<
"
\n
"
;
std
::
move
(
h
).
ack
();
});
return
session
;
}
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .
Avro using
Avro.IO
;
using
Avro.Specific
;
using
Google.Api.Gax
;
using
Google.Cloud.PubSub.V1
;
using
Newtonsoft.Json
;
using
System
;
using
System.IO
;
using
System.Threading
;
using
System.Threading.Tasks
;
public
class
PullAvroMessagesAsyncSample
{
public
async
Task<int>
PullAvroMessagesAsync
(
string
projectId
,
string
subscriptionId
,
bool
acknowledge
)
{
SubscriptionName
subscriptionName
=
SubscriptionName
.
FromProjectSubscription
(
projectId
,
subscriptionId
);
int
messageCount
=
0
;
SubscriberClient
subscriber
=
await
new
SubscriberClientBuilder
{
SubscriptionName
=
subscriptionName
,
Settings
=
new
SubscriberClient
.
Settings
{
AckExtensionWindow
=
TimeSpan
.
FromSeconds
(
4
),
AckDeadline
=
TimeSpan
.
FromSeconds
(
10
),
FlowControlSettings
=
new
FlowControlSettings
(
maxOutstandingElementCount
:
100
,
maxOutstandingByteCount
:
10240
)
}
}.
BuildAsync
();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task
startTask
=
subscriber
.
StartAsync
((
PubsubMessage
message
,
CancellationToken
cancel
)
=
>
{
string
encoding
=
message
.
Attributes
[
"googclient_schemaencoding"
];
// AvroUtilities is a namespace. Below are files using the namespace.
// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/State.cs
// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/StateUtils.cs
AvroUtilities
.
State
state
=
new
AvroUtilities
.
State
();
switch
(
encoding
)
{
case
"BINARY"
:
using
(
var
ms
=
new
MemoryStream
(
message
.
Data
.
ToByteArray
()))
{
var
decoder
=
new
BinaryDecoder
(
ms
);
var
reader
=
new
SpecificDefaultReader
(
state
.
Schema
,
state
.
Schema
);
reader
.
Read<AvroUtilities
.
State
> (
state
,
decoder
);
}
break
;
case
"JSON"
:
state
=
JsonConvert
.
DeserializeObject<AvroUtilities
.
State
> (
message
.
Data
.
ToStringUtf8
());
break
;
default
:
Console
.
WriteLine
(
$"Encoding not provided in message."
);
break
;
}
Console
.
WriteLine
(
$"Message {message.MessageId}: {state}"
);
Interlocked
.
Increment
(
ref
messageCount
);
return
Task
.
FromResult
(
acknowledge
?
SubscriberClient
.
Reply
.
Ack
:
SubscriberClient
.
Reply
.
Nack
);
});
// Run for 5 seconds.
await
Task
.
Delay
(
5000
);
await
subscriber
.
StopAsync
(
CancellationToken
.
None
);
// Lets make sure that the start task finished successfully after the call to stop.
await
startTask
;
return
messageCount
;
}
}
using
Google.Api.Gax
;
using
Google.Cloud.PubSub.V1
;
using
System
;
using
System.Threading
;
using
System.Threading.Tasks
;
public
class
PullProtoMessagesAsyncSample
{
public
async
Task<int>
PullProtoMessagesAsync
(
string
projectId
,
string
subscriptionId
,
bool
acknowledge
)
{
SubscriptionName
subscriptionName
=
SubscriptionName
.
FromProjectSubscription
(
projectId
,
subscriptionId
);
int
messageCount
=
0
;
SubscriberClient
subscriber
=
await
new
SubscriberClientBuilder
{
SubscriptionName
=
subscriptionName
,
Settings
=
new
SubscriberClient
.
Settings
{
AckExtensionWindow
=
TimeSpan
.
FromSeconds
(
4
),
AckDeadline
=
TimeSpan
.
FromSeconds
(
10
),
FlowControlSettings
=
new
FlowControlSettings
(
maxOutstandingElementCount
:
100
,
maxOutstandingByteCount
:
10240
)
}
}.
BuildAsync
();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task
startTask
=
subscriber
.
StartAsync
((
PubsubMessage
message
,
CancellationToken
cancel
)
=
>
{
string
encoding
=
message
.
Attributes
[
"googclient_schemaencoding"
];
Utilities
.
State
state
=
null
;
switch
(
encoding
)
{
case
"BINARY"
:
state
=
Utilities
.
State
.
Parser
.
ParseFrom
(
message
.
Data
.
ToByteArray
());
break
;
case
"JSON"
:
state
=
Utilities
.
State
.
Parser
.
ParseJson
(
message
.
Data
.
ToStringUtf8
());
break
;
default
:
Console
.
WriteLine
(
$"Encoding not provided in message."
);
break
;
}
Console
.
WriteLine
(
$"Message {message.MessageId}: {state}"
);
Interlocked
.
Increment
(
ref
messageCount
);
return
Task
.
FromResult
(
acknowledge
?
SubscriberClient
.
Reply
.
Ack
:
SubscriberClient
.
Reply
.
Nack
);
});
// Run for 5 seconds.
await
Task
.
Delay
(
5000
);
await
subscriber
.
StopAsync
(
CancellationToken
.
None
);
// Lets make sure that the start task finished successfully after the call to stop.
await
startTask
;
return
messageCount
;
}
}
Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .
Avro import
(
"context"
"fmt"
"io"
"os"
"sync"
"time"
"cloud.google.com/go/pubsub/v2"
"github.com/linkedin/goavro/v2"
)
func
subscribeWithAvroSchema
(
w
io
.
Writer
,
projectID
,
subID
,
avscFile
string
)
error
{
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
avroSchema
,
err
:=
os
.
ReadFile
(
avscFile
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"os.ReadFile err: %w"
,
err
)
}
codec
,
err
:=
goavro
.
NewCodec
(
string
(
avroSchema
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"goavro.NewCodec err: %w"
,
err
)
}
sub
:=
client
.
Subscriber
(
subID
)
ctx2
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Second
)
defer
cancel
()
var
mu
sync
.
Mutex
sub
.
Receive
(
ctx2
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
mu
.
Lock
()
defer
mu
.
Unlock
()
encoding
:=
msg
.
Attributes
[
"googclient_schemaencoding"
]
var
state
map
[
string
]
interface
{}
if
encoding
==
"BINARY"
{
data
,
_
,
err
:=
codec
.
NativeFromBinary
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromBinary err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a binary-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
if
encoding
==
"JSON"
{
data
,
_
,
err
:=
codec
.
NativeFromTextual
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromTextual err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a JSON-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
{
fmt
.
Fprintf
(
w
,
"Unknown message type(%s), nacking\n"
,
encoding
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"%s is abbreviated as %s\n"
,
state
[
"name"
],
state
[
"post_abbr"
])
msg
.
Ack
()
})
return
nil
}
import
(
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/pubsub/v2"
statepb
"github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func
subscribeWithProtoSchema
(
w
io
.
Writer
,
projectID
,
subID
string
)
error
{
// projectID := "my-project-id"
// subID := "my-sub"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
// Create an instance of the message to be decoded (a single U.S. state).
state
:=
& statepb
.
State
{}
sub
:=
client
.
Subscriber
(
subID
)
ctx2
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Second
)
defer
cancel
()
var
mu
sync
.
Mutex
sub
.
Receive
(
ctx2
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
mu
.
Lock
()
defer
mu
.
Unlock
()
encoding
:=
msg
.
Attributes
[
"googclient_schemaencoding"
]
if
encoding
==
"BINARY"
{
if
err
:=
proto
.
Unmarshal
(
msg
.
Data
,
state
);
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"proto.Unmarshal err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Printf
(
"Received a binary-encoded message:\n%#v\n"
,
state
)
}
else
if
encoding
==
"JSON"
{
if
err
:=
protojson
.
Unmarshal
(
msg
.
Data
,
state
);
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"proto.Unmarshal err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a JSON-encoded message:\n%#v\n"
,
state
)
}
else
{
fmt
.
Fprintf
(
w
,
"Unknown message type(%s), nacking\n"
,
encoding
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"%s is abbreviated as %s\n"
,
state
.
Name
,
state
.
PostAbbr
)
msg
.
Ack
()
})
return
nil
}
Java
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .
Avro import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.protobuf. ByteString
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
utilities.State
;
public
class
SubscribeWithAvroSchemaExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use an existing subscription.
String
subscriptionId
=
"your-subscription-id"
;
subscribeWithAvroSchemaExample
(
projectId
,
subscriptionId
);
}
public
static
void
subscribeWithAvroSchemaExample
(
String
projectId
,
String
subscriptionId
)
{
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
// Prepare a reader for the encoded Avro records.
SpecificDatumReader<State>
reader
=
new
SpecificDatumReader
<> (
State
.
getClassSchema
());
// Instantiate an asynchronous message receiver.
MessageReceiver
receiver
=
(
PubsubMessage
message
,
AckReplyConsumer
consumer
)
-
>
{
ByteString
data
=
message
.
getData
();
// Get the schema encoding type.
String
encoding
=
message
.
getAttributesMap
().
get
(
"googclient_schemaencoding"
);
// Send the message data to a byte[] input stream.
InputStream
inputStream
=
new
ByteArrayInputStream
(
data
.
toByteArray
());
Decoder
decoder
=
null
;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
block
:
try
{
switch
(
encoding
)
{
case
"BINARY"
:
decoder
=
DecoderFactory
.
get
().
directBinaryDecoder
(
inputStream
,
/* reuse= */
null
);
System
.
out
.
println
(
"Receiving a binary-encoded message:"
);
break
;
case
"JSON"
:
decoder
=
DecoderFactory
.
get
().
jsonDecoder
(
State
.
getClassSchema
(),
inputStream
);
System
.
out
.
println
(
"Receiving a JSON-encoded message:"
);
break
;
default
:
break
block
;
}
// Obtain an object of the generated Avro class using the decoder.
State
state
=
reader
.
read
(
null
,
decoder
);
System
.
out
.
println
(
state
.
getName
()
+
" is abbreviated as "
+
state
.
getPostAbbr
());
}
catch
(
IOException
e
)
{
System
.
err
.
println
(
e
);
}
// Ack the message.
consumer
.
ack
();
};
Subscriber
subscriber
=
null
;
try
{
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
receiver
).
build
();
subscriber
.
startAsync
().
awaitRunning
();
System
.
out
.
printf
(
"Listening for messages on %s:\n"
,
subscriptionName
.
toString
());
subscriber
.
awaitTerminated
(
30
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
timeoutException
)
{
subscriber
.
stopAsync
();
}
}
}
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.protobuf. ByteString
;
import
com.google.protobuf. InvalidProtocolBufferException
;
import
com.google.protobuf.util. JsonFormat
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
utilities.StateProto.State
;
public
class
SubscribeWithProtoSchemaExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use an existing subscription.
String
subscriptionId
=
"your-subscription-id"
;
subscribeWithProtoSchemaExample
(
projectId
,
subscriptionId
);
}
public
static
void
subscribeWithProtoSchemaExample
(
String
projectId
,
String
subscriptionId
)
{
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
MessageReceiver
receiver
=
(
PubsubMessage
message
,
AckReplyConsumer
consumer
)
-
>
{
ByteString
data
=
message
.
getData
();
// Get the schema encoding type.
String
encoding
=
message
.
getAttributesMap
().
get
(
"googclient_schemaencoding"
);
block
:
try
{
switch
(
encoding
)
{
case
"BINARY"
:
// Obtain an object of the generated proto class.
State
state
=
State
.
parseFrom
(
data
);
System
.
out
.
println
(
"Received a BINARY-formatted message: "
+
state
);
break
;
case
"JSON"
:
State
.
Builder
stateBuilder
=
State
.
newBuilder
();
JsonFormat
.
parser
().
merge
(
data
.
toStringUtf8
(),
stateBuilder
);
System
.
out
.
println
(
"Received a JSON-formatted message:"
+
stateBuilder
.
build
());
break
;
default
:
break
block
;
}
}
catch
(
InvalidProtocolBufferException
e
)
{
e
.
printStackTrace
();
}
consumer
.
ack
();
System
.
out
.
println
(
"Ack'ed the message"
);
};
// Create subscriber client.
Subscriber
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
receiver
).
build
();
try
{
subscriber
.
startAsync
().
awaitRunning
();
System
.
out
.
printf
(
"Listening for messages on %s:\n"
,
subscriptionName
);
subscriber
.
awaitTerminated
(
30
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
timeoutException
)
{
subscriber
.
stopAsync
();
}
}
}
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .
Avro /**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const
{
PubSub
,
Schema
,
Encodings
}
=
require
(
' @google-cloud/pubsub
'
);
// Node FS library, to load definitions
const
fs
=
require
(
'fs'
);
// And the Apache Avro library
const
avro
=
require
(
'avro-js'
);
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
function
listenForAvroRecords
(
subscriptionNameOrId
,
timeout
)
{
// References an existing subscription
const
subscription
=
pubSubClient
.
subscription
(
subscriptionNameOrId
);
// Make an encoder using the official avro-js library.
const
definition
=
fs
.
readFileSync
(
'system-test/fixtures/provinces.avsc'
)
.
toString
();
const
type
=
avro
.
parse
(
definition
);
// Create an event handler to handle messages
let
messageCount
=
0
;
const
messageHandler
=
async
message
=
>
{
// "Ack" ( ack
nowledge receipt of) the message
message
.
ack
();
// Get the schema metadata from the message.
const
schemaMetadata
=
Schema
.
metadataFromMessage
(
message
.
attributes
);
let
result
;
switch
(
schemaMetadata
.
encoding
)
{
case
Encodings
.
Binary
:
result
=
type
.
fromBuffer
(
message
.
data
);
break
;
case
Encodings
.
Json
:
result
=
type
.
fromString
(
message
.
data
.
toString
());
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaMetadata
.
encoding
}
`
);
break
;
}
console
.
log
(
`Received message
${
message
.
id
}
:`
);
console
.
log
(
`\tData:
${
JSON
.
stringify
(
result
,
null
,
4
)
}
`
);
console
.
log
(
`\tAttributes:
${
message
.
attributes
}
`
);
messageCount
+=
1
;
};
// Listen for new messages until timeout is hit
subscripti on
.
on
(
'message'
,
messageHandler
);
setTimeout
(()
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
console
.
log
(
`
${
messageCount
}
message(s) received.`
);
},
timeout
*
1000
);
}
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const
{
PubSub
,
Schema
,
Encodings
}
=
require
(
' @google-cloud/pubsub
'
);
// And the protobufjs library
const
protobuf
=
require
(
'protobufjs'
);
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
async
function
listenForProtobufMessages
(
subscriptionNameOrId
,
timeout
)
{
// References an existing subscription
const
subscription
=
pubSubClient
.
subscription
(
subscriptionNameOrId
);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const
root
=
protobuf
.
loadSync
(
'system-test/fixtures/provinces.proto'
);
const
Province
=
root
.
lookupType
(
'utilities.Province'
);
// Create an event handler to handle messages
let
messageCount
=
0
;
const
messageHandler
=
async
message
=
>
{
// "Ack" ( ack
nowledge receipt of) the message
message
.
ack
();
// Get the schema metadata from the message.
const
schemaMetadata
=
Schema
.
metadataFromMessage
(
message
.
attributes
);
let
result
;
switch
(
schemaMetadata
.
encoding
)
{
case
Encodings
.
Binary
:
result
=
Province
.
decode
(
message
.
data
);
break
;
case
Encodings
.
Json
:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result
=
JSON
.
parse
(
message
.
data
.
toString
());
console
.
log
(
`Validation of JSON:
${
Province
.
verify
(
result
)
}
`
);
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaMetadata
.
encoding
}
`
);
break
;
}
console
.
log
(
`Received message
${
message
.
id
}
:`
);
console
.
log
(
`\tData:
${
JSON
.
stringify
(
result
,
null
,
4
)
}
`
);
console
.
log
(
`\tAttributes:
${
JSON
.
stringify
(
message
.
attributes
,
null
,
4
)
}
`
);
messageCount
+=
1
;
};
// Listen for new messages until timeout is hit
subscripti on
.
on
(
'message'
,
messageHandler
);
setTimeout
(()
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
console
.
log
(
`
${
messageCount
}
message(s) received.`
);
},
timeout
*
1000
);
}
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .
Avro /**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import
{
PubSub
,
Schema
,
Encodings
,
Message
}
from
'@google-cloud/pubsub'
;
// Node FS library, to load definitions
import
*
as
fs
from
'fs'
;
// And the Apache Avro library
import
*
as
avro
from
'avro-js'
;
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
function
listenForAvroRecords
(
subscriptionNameOrId
:
string
,
timeout
:
number
)
{
// References an existing subscription
const
subscription
=
pubSubClient
.
subscription
(
subscriptionNameOrId
);
// Make an encoder using the official avro-js library.
const
definition
=
fs
.
readFileSync
(
'system-test/fixtures/provinces.avsc'
)
.
toString
();
const
type
=
avro
.
parse
(
definition
);
// Create an event handler to handle messages
let
messageCount
=
0
;
const
messageHandler
=
async
(
message
:
Message
)
=
>
{
// "Ack" (acknowledge receipt of) the message
message
.
ack
();
// Get the schema metadata from the message.
const
schemaMetadata
=
Schema
.
metadataFromMessage
(
message
.
attributes
);
let
result
:
object
|
undefined
;
switch
(
schemaMetadata
.
encoding
)
{
case
Encodings
.
Binary
:
result
=
type
.
fromBuffer
(
message
.
data
);
break
;
case
Encodings
.
Json
:
result
=
type
.
fromString
(
message
.
data
.
toString
());
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaMetadata
.
encoding
}
`
);
break
;
}
console
.
log
(
`Received message
${
message
.
id
}
:`
);
console
.
log
(
`\tData:
${
JSON
.
stringify
(
result
,
null
,
4
)
}
`
);
console
.
log
(
`\tAttributes:
${
message
.
attributes
}
`
);
messageCount
+=
1
;
};
// Listen for new messages until timeout is hit
subscription
.
on
(
'message'
,
messageHandler
);
setTimeout
(()
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
console
.
log
(
`
${
messageCount
}
message(s) received.`
);
},
timeout
*
1000
);
}
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import
{
PubSub
,
Schema
,
Encodings
,
Message
}
from
'@google-cloud/pubsub'
;
// And the protobufjs library
import
*
as
protobuf
from
'protobufjs'
;
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
async
function
listenForProtobufMessages
(
subscriptionNameOrId
:
string
,
timeout
:
number
,
)
{
// References an existing subscription
const
subscription
=
pubSubClient
.
subscription
(
subscriptionNameOrId
);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const
root
=
protobuf
.
loadSync
(
'system-test/fixtures/provinces.proto'
);
const
Province
=
root
.
lookupType
(
'utilities.Province'
);
// Create an event handler to handle messages
let
messageCount
=
0
;
const
messageHandler
=
async
(
message
:
Message
)
=
>
{
// "Ack" (acknowledge receipt of) the message
message
.
ack
();
// Get the schema metadata from the message.
const
schemaMetadata
=
Schema
.
metadataFromMessage
(
message
.
attributes
);
let
result
;
switch
(
schemaMetadata
.
encoding
)
{
case
Encodings
.
Binary
:
result
=
Province
.
decode
(
message
.
data
);
break
;
case
Encodings
.
Json
:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result
=
JSON
.
parse
(
message
.
data
.
toString
());
console
.
log
(
`Validation of JSON:
${
Province
.
verify
(
result
)
}
`
);
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaMetadata
.
encoding
}
`
);
break
;
}
console
.
log
(
`Received message
${
message
.
id
}
:`
);
console
.
log
(
`\tData:
${
JSON
.
stringify
(
result
,
null
,
4
)
}
`
);
console
.
log
(
`\tAttributes:
${
JSON
.
stringify
(
message
.
attributes
,
null
,
4
)
}
`
);
messageCount
+=
1
;
};
// Listen for new messages until timeout is hit
subscription
.
on
(
'message'
,
messageHandler
);
setTimeout
(()
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
console
.
log
(
`
${
messageCount
}
message(s) received.`
);
},
timeout
*
1000
);
}
PHP
Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub PHP API reference documentation .
Avro use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using an AVRO schema.
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_avro_records($projectId, $subscriptionId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$definition = file_get_contents($definitionFile);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$io = new \AvroStringIO($message->data());
$schema = \AvroSchema::parse($definition);
$reader = new \AvroIODatumReader($schema);
$decoder = new \AvroIOBinaryDecoder($io);
$decodedMessageData = json_encode($reader->read($decoder));
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_proto_messages($projectId, $subscriptionId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$protobufMessage = new \Utilities\StateProto();
$protobufMessage->mergeFromString($message->data());
$decodedMessageData = $protobufMessage->serializeToJsonString();
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .
Avro import
avro.schema
as
schema
from
avro.io
import
BinaryDecoder
,
DatumReader
from
concurrent.futures
import
TimeoutError
import
io
import
json
from
google.cloud.pubsub
import
SubscriberClient
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber
=
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project_id
,
subscription_id
)
with
open
(
avsc_file
,
"rb"
)
as
file
:
avro_schema
=
schema
.
parse
(
file
.
read
())
def
callback
(
message
:
pubsub_v1
.
subscriber
.
message
.
Message
)
-
> None
:
# Get the message serialization type.
encoding
=
message
.
attributes
.
get
(
"googclient_schemaencoding"
)
# Deserialize the message data accordingly.
if
encoding
==
"BINARY"
:
bout
=
io
.
BytesIO
(
message
.
data
)
decoder
=
BinaryDecoder
(
bout
)
reader
=
DatumReader
(
avro_schema
)
message_data
=
reader
.
read
(
decoder
)
print
(
f
"Received a binary-encoded message:
\n
{
message_data
}
"
)
elif
encoding
==
"JSON"
:
message_data
=
json
.
loads
(
message
.
data
)
print
(
f
"Received a JSON-encoded message:
\n
{
message_data
}
"
)
else
:
print
(
f
"Received a message with no encoding:
\n
{
message
}
"
)
message
.
ack
()
streaming_pull_future
=
subscribe
r .
subscribe
(
subscription_path
,
callback
=
callback
)
print
(
f
"Listening for messages on
{
subscription_path
}
..
\n
"
)
# Wrap subscriber in a 'with' block to automatically call close() when done.
with
subscriber
:
try
:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future
.
result
(
timeout
=
timeout
)
except
TimeoutError
:
streaming_pull_future
.
cancel
()
# Trigger the shutdown.
streaming_pull_future
.
result
()
# Block until the shutdown is complete.
from
concurrent.futures
import
TimeoutError
from
google.cloud.pubsub
import
SubscriberClient
from
google.protobuf.json_format
import
Parse
from
utilities
import
us_states_pb2
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber
=
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project_id
,
subscription_id
)
# Instantiate a protoc-generated class defined in `us-states.proto`.
state
=
us_states_pb2
.
StateProto
()
def
callback
(
message
:
pubsub_v1
.
subscriber
.
message
.
Message
)
-
> None
:
# Get the message serialization type.
encoding
=
message
.
attributes
.
get
(
"googclient_schemaencoding"
)
# Deserialize the message data accordingly.
if
encoding
==
"BINARY"
:
state
.
ParseFromString
(
message
.
data
)
print
(
f
"Received a binary-encoded message:
\n
{
state
}
"
)
elif
encoding
==
"JSON"
:
Parse
(
message
.
data
,
state
)
print
(
f
"Received a JSON-encoded message:
\n
{
state
}
"
)
else
:
print
(
f
"Received a message with no encoding:
\n
{
message
}
"
)
message
.
ack
()
streaming_pull_future
=
subscribe
r .
subscribe
(
subscription_path
,
callback
=
callback
)
print
(
f
"Listening for messages on
{
subscription_path
}
..
\n
"
)
# Wrap subscriber in a 'with' block to automatically call close() when done.
with
subscriber
:
try
:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future
.
result
(
timeout
=
timeout
)
except
TimeoutError
:
streaming_pull_future
.
cancel
()
# Trigger the shutdown.
streaming_pull_future
.
result
()
# Block until the shutdown is complete.
Ruby
The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .
Avro # subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub
=
Google
::
Cloud
::
PubSub
.
new
subscriber
=
pubsub
.
subscriber
subscription_id
listener
=
subscriber
.
listen
do
|
received_message
|
encoding
=
received_message
.
attributes
[
"googclient_schemaencoding"
]
case
encoding
when
"BINARY"
require
"avro"
avro_schema
=
Avro
::
Schema
.
parse
File
.
read
(
avsc_file
)
buffer
=
StringIO
.
new
received_message
.
data
decoder
=
Avro
::
IO
::
BinaryDecoder
.
new
buffer
reader
=
Avro
::
IO
::
DatumReader
.
new
avro_schema
message_data
=
reader
.
read
decoder
puts
"Received a binary-encoded message:
\n
#{
message_data
}
"
when
"JSON"
require
"json"
message_data
=
JSON
.
parse
received_message
.
data
puts
"Received a JSON-encoded message:
\n
#{
message_data
}
"
else
"Received a message with no encoding:
\n
#{
received_message
.
message_id
}
"
end
received_message
.
acknowledge!
end
listener
.
start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep
60
listener
.
stop
.
wait!
# subscription_id = "your-subscription-id"
pubsub
=
Google
::
Cloud
::
PubSub
.
new
subscriber
=
pubsub
.
subscriber
subscription_id
listener
=
subscriber
.
listen
do
|
received_message
|
encoding
=
received_message
.
attributes
[
"googclient_schemaencoding"
]
case
encoding
when
"BINARY"
state
=
Utilities
::
StateProto
.
decode
received_message
.
data
puts
"Received a binary-encoded message:
\n
#{
state
}
"
when
"JSON"
require
"json"
state
=
Utilities
::
StateProto
.
decode_json
received_message
.
data
puts
"Received a JSON-encoded message:
\n
#{
state
}
"
else
"Received a message with no encoding:
\n
#{
received_message
.
message_id
}
"
end
received_message
.
acknowledge!
end
listener
.
start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep
60
listener
.
stop
.
wait!
Subscribe to a topic associated with an Avro schema with revisions
Avro requires messages to be parsed using the schema with which they are encoded. You can also translate messages to a different schema by using Avro schema resolution .
Pub/Sub ensures that all schema revisions are forward and backward compatible with all other revisions. This compatibility lets any revision to be used as the reader or writer schema.
When parsing a message encoded with a different schema revision than the one your subscriber uses, you might need to get the original schema and pass it in as the writer schema.
It's best to cache the Avro reader object that can parse messages for
each schema revision encountered in order to minimize latency
and minimize the number of calls to the GetSchema
API.
The following code shows these functions:
-
Read the attributes discussed in the previous section to determine which schema revision is used to encode the message.
-
Fetch the schema revision and cache a reader generated with it.
-
Parse the message into the schema that your subscriber uses.
Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .
import
(
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/pubsub/v2"
schema
"cloud.google.com/go/pubsub/v2/apiv1"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
"github.com/linkedin/goavro/v2"
)
func
subscribeWithAvroSchemaRevisions
(
w
io
.
Writer
,
projectID
,
subID
string
)
error
{
// projectID := "my-project-id"
// topicID := "my-topic"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
schemaClient
,
err
:=
schema
.
NewSchemaClient
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewSchemaClient: %w"
,
err
)
}
// Create the cache for the codecs for different revision IDs.
revisionCodecs
:=
make
(
map
[
string
]
*
goavro
.
Codec
)
sub
:=
client
.
Subscriber
(
subID
)
ctx2
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Second
)
defer
cancel
()
var
mu
sync
.
Mutex
sub
.
Receive
(
ctx2
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
mu
.
Lock
()
defer
mu
.
Unlock
()
name
:=
msg
.
Attributes
[
"googclient_schemaname"
]
revision
:=
msg
.
Attributes
[
"googclient_schemarevisionid"
]
codec
,
ok
:=
revisionCodecs
[
revision
]
// If the codec doesn't exist in the map, this is the first time we
// are seeing this revision. We need to fetch the schema and cache the
// codec. It would be more typical to do this asynchronously, but is
// shown here in a synchronous way to ease readability.
if
!
ok
{
s
:=
& pubsubpb
.
GetSchemaRequest
{
Name
:
fmt
.
Sprintf
(
"%s@%s"
,
name
,
revision
),
View
:
pubsubpb
.
SchemaView_FULL
,
}
schema
,
err
:=
schemaClient
.
GetSchema
(
ctx
,
s
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"Nacking, cannot read message without schema: %v\n"
,
err
)
msg
.
Nack
()
return
}
codec
,
err
=
goavro
.
NewCodec
(
schema
.
Definition
)
if
err
!=
nil
{
msg
.
Nack
()
fmt
.
Fprintf
(
w
,
"goavro.NewCodec err: %v\n"
,
err
)
}
revisionCodecs
[
revision
]
=
codec
}
encoding
:=
msg
.
Attributes
[
"googclient_schemaencoding"
]
var
state
map
[
string
]
interface
{}
if
encoding
==
"BINARY"
{
data
,
_
,
err
:=
codec
.
NativeFromBinary
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromBinary err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a binary-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
if
encoding
==
"JSON"
{
data
,
_
,
err
:=
codec
.
NativeFromTextual
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromTextual err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a JSON-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
{
fmt
.
Fprintf
(
w
,
"Unknown message type(%s), nacking\n"
,
encoding
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"%s is abbreviated as %s\n"
,
state
[
"name"
],
state
[
"post_abbr"
])
msg
.
Ack
()
})
return
nil
}
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. SchemaServiceClient
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.protobuf. ByteString
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
com.google.pubsub.v1. Schema
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
utilities.State
;
public
class
SubscribeWithAvroSchemaRevisionsExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use an existing subscription.
String
subscriptionId
=
"your-subscription-id"
;
subscribeWithAvroSchemaRevisionsExample
(
projectId
,
subscriptionId
);
}
static
SchemaServiceClient
getSchemaServiceClient
()
{
try
{
return
SchemaServiceClient
.
create
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"Could not get schema client: "
+
e
);
return
null
;
}
}
public
static
void
subscribeWithAvroSchemaRevisionsExample
(
String
projectId
,
String
subscriptionId
)
{
// Used to get the schemas for revsions.
final
SchemaServiceClient
schemaServiceClient
=
getSchemaServiceClient
();
if
(
schemaServiceClient
==
null
)
{
return
;
}
// Cache for the readers for different revision IDs.
Map<String
,
SpecificDatumReader<State>
>
revisionReaders
=
new
HashMap<String
,
SpecificDatumReader<State>
> ();
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
// Instantiate an asynchronous message receiver.
MessageReceiver
receiver
=
(
PubsubMessage
message
,
AckReplyConsumer
consumer
)
-
>
{
// Get the schema encoding type.
String
name
=
message
.
getAttributesMap
().
get
(
"googclient_schemaname"
);
String
revision
=
message
.
getAttributesMap
().
get
(
"googclient_schemarevisionid"
);
SpecificDatumReader<State>
reader
=
null
;
synchronized
(
revisionReaders
)
{
reader
=
revisionReaders
.
get
(
revision
);
}
if
(
reader
==
null
)
{
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder. It would be more typical
// to do this asynchronously, but is shown here in a synchronous
// way to ease readability.
try
{
Schema
schema
=
schemaServiceClient
.
getSchema
(
name
+
"@"
+
revision
);
org
.
apache
.
avro
.
Schema
avroSchema
=
new
org
.
apache
.
avro
.
Schema
.
Parser
().
parse
(
schema
.
getDefinition
());
reader
=
new
SpecificDatumReader<State>
(
avroSchema
,
State
.
getClassSchema
());
synchronized
(
revisionReaders
)
{
revisionReaders
.
put
(
revision
,
reader
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"Could not get schema: "
+
e
);
// Without the schema, we cannot read the message, so nack it.
consumer
.
nack
();
return
;
}
}
ByteString
data
=
message
.
getData
();
// Send the message data to a byte[] input stream.
InputStream
inputStream
=
new
ByteArrayInputStream
(
data
.
toByteArray
());
String
encoding
=
message
.
getAttributesMap
().
get
(
"googclient_schemaencoding"
);
Decoder
decoder
=
null
;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
try
{
switch
(
encoding
)
{
case
"BINARY"
:
decoder
=
DecoderFactory
.
get
().
directBinaryDecoder
(
inputStream
,
/* reuse= */
null
);
System
.
out
.
println
(
"Receiving a binary-encoded message:"
);
break
;
case
"JSON"
:
decoder
=
DecoderFactory
.
get
().
jsonDecoder
(
State
.
getClassSchema
(),
inputStream
);
System
.
out
.
println
(
"Receiving a JSON-encoded message:"
);
break
;
default
:
System
.
out
.
println
(
"Unknown message type; nacking."
);
consumer
.
nack
();
break
;
}
// Obtain an object of the generated Avro class using the decoder.
State
state
=
reader
.
read
(
null
,
decoder
);
System
.
out
.
println
(
state
.
getName
()
+
" is abbreviated as "
+
state
.
getPostAbbr
());
// Ack the message.
consumer
.
ack
();
}
catch
(
IOException
e
)
{
System
.
err
.
println
(
e
);
// If we failed to process the message, nack it.
consumer
.
nack
();
}
};
Subscriber
subscriber
=
null
;
try
{
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
receiver
).
build
();
subscriber
.
startAsync
().
awaitRunning
();
System
.
out
.
printf
(
"Listening for messages on %s:\n"
,
subscriptionName
.
toString
());
subscriber
.
awaitTerminated
(
30
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
timeoutException
)
{
subscriber
.
stopAsync
();
}
}
}
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .
import
avro.schema
as
schema
from
avro.io
import
BinaryDecoder
,
DatumReader
from
concurrent.futures
import
TimeoutError
import
io
import
json
from
google.api_core.exceptions
import
NotFound
from
google.cloud.pubsub
import
SchemaServiceClient
,
SubscriberClient
schema_client
=
SchemaServiceClient
()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber
=
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project_id
,
subscription_id
)
with
open
(
avsc_file
,
"rb"
)
as
file
:
reader_avro_schema
=
schema
.
parse
(
file
.
read
())
# Dict to keep readers for different schema revisions.
revisions_to_readers
=
{}
def
callback
(
message
:
pubsub_v1
.
subscriber
.
message
.
Message
)
-
> None
:
# Get the message serialization type.
schema_name
=
message
.
attributes
.
get
(
"googclient_schemaname"
)
schema_revision_id
=
message
.
attributes
.
get
(
"googclient_schemarevisionid"
)
encoding
=
message
.
attributes
.
get
(
"googclient_schemaencoding"
)
if
schema_revision_id
not
in
revisions_to_readers
:
schema_path
=
schema_name
+
"@"
+
schema_revision_id
try
:
received_avro_schema
=
schema_client
.
get_schema
(
request
=
{
"name"
:
schema_path
}
)
except
NotFound
:
print
(
f
"
{
schema_path
}
not found."
)
message
.
nack
()
return
writer_avro_schema
=
schema
.
parse
(
received_avro_schema
.
definition
)
revisions_to_readers
[
schema_revision_id
]
=
DatumReader
(
writer_avro_schema
,
reader_avro_schema
)
reader
=
revisions_to_readers
[
schema_revision_id
]
# Deserialize the message data accordingly.
if
encoding
==
"BINARY"
:
bout
=
io
.
BytesIO
(
message
.
data
)
decoder
=
BinaryDecoder
(
bout
)
message_data
=
reader
.
read
(
decoder
)
print
(
f
"Received a binary-encoded message:
\n
{
message_data
}
"
)
elif
encoding
==
"JSON"
:
message_data
=
json
.
loads
(
message
.
data
)
print
(
f
"Received a JSON-encoded message:
\n
{
message_data
}
"
)
else
:
print
(
f
"Received a message with no encoding:
\n
{
message
}
"
)
message
.
nack
()
message
.
ack
()
streaming_pull_future
=
subscribe
r .
subscribe
(
subscription_path
,
callback
=
callback
)
print
(
f
"Listening for messages on
{
subscription_path
}
..
\n
"
)
# Wrap subscriber in a 'with' block to automatically call close() when done.
with
subscriber
:
try
:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future
.
result
(
timeout
=
timeout
)
except
TimeoutError
:
streaming_pull_future
.
cancel
()
# Trigger the shutdown.
streaming_pull_future
.
result
()
# Block until the shutdown is complete.
Required roles
To get the permissions you need to validate a message against a schema, complete the one of the following steps:
- Grant one of the following predefined roles to a service account:
roles/pubsub.admin
,roles/pubsub.editor
, orroles/pubsub.viewer
. -
Create a custom role for a service account and add the following permissions
pubsub.schemas.validate
andpubsub.schemas.get
.To learn more about custom roles, see Create and manage Custom IAM roles .