Receive messages that could be for different schema revisions
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
C++
Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
auto
subscriber
=
pubsub
::
Subscriber
(
pubsub
::
MakeSubscriberConnection
(
pubsub
::
Subscription
(
project_id
,
subscription_id
)));
// Create a schema client.
auto
schema_client
=
pubsub
::
SchemaServiceClient
(
pubsub
::
MakeSchemaServiceConnection
());
// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std
::
ifstream
ifs
(
avro_file
);
avro
::
ValidSchema
reader_schema
;
avro
::
compileJsonSchema
(
ifs
,
reader_schema
);
std
::
unordered_map<std
::
string
,
avro
::
ValidSchema
>
revisions_to_schemas
;
auto
session
=
subscriber
.
Subscribe
(
[&](
pubsub
::
Message
const
&
message
,
pubsub
::
AckHandler
h
)
{
// Get the reader schema revision for the message.
auto
schema_name
=
message
.
attributes
()[
"googclient_schemaname"
];
auto
schema_revision_id
=
message
.
attributes
()[
"googclient_schemarevisionid"
];
// If we haven't received a message with this schema, look it up.
if
(
revisions_to_schemas
.
find
(
schema_revision_id
)
==
revisions_to_schemas
.
end
())
{
auto
schema_path
=
schema_name
+
"@"
+
schema_revision_id
;
// Use the schema client to get the path.
auto
schema
=
schema_client
.
GetSchema
(
schema_path
);
if
(
!
schema
)
{
std
::
cout
<<
"Schema not found:"
<<
schema_path
<<
"
\n
"
;
return
;
}
avro
::
ValidSchema
writer_schema
;
std
::
stringstream
in
;
in
<<
schema
.
value
().
definition
();
avro
::
compileJsonSchema
(
in
,
writer_schema
);
revisions_to_schemas
[
schema_revision_id
]
=
writer_schema
;
}
auto
writer_schema
=
revisions_to_schemas
[
schema_revision_id
];
auto
encoding
=
message
.
attributes
()[
"googclient_schemaencoding"
];
if
(
encoding
==
"JSON"
)
{
std
::
stringstream
in
;
in
<<
message
.
data
();
auto
avro_in
=
avro
::
istreamInputStream
(
in
);
avro
::
DecoderPtr
decoder
=
avro
::
resolvingDecoder
(
writer_schema
,
reader_schema
,
avro
::
jsonDecoder
(
writer_schema
));
decoder
-
> init
(
*
avro_in
);
v2
::
State
state
;
avro
::
decode
(
*
decoder
,
state
);
std
::
cout
<<
"Name: "
<<
state
.
name
<<
"
\n
"
;
std
::
cout
<<
"Postal Abbreviation: "
<<
state
.
post_abbr
<<
"
\n
"
;
std
::
cout
<<
"Population: "
<<
state
.
population
<<
"
\n
"
;
}
else
{
std
::
cout
<<
"Unable to decode. Received message using encoding"
<<
encoding
<<
"
\n
"
;
}
std
::
move
(
h
).
ack
();
});
C#
Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
using
Avro.Generic
;
using
Avro.IO
;
using
Google.Cloud.PubSub.V1
;
using
System
;
using
System.Collections.Concurrent
;
using
System.IO
;
using
System.Threading
;
using
System.Threading.Tasks
;
public
class
SubscribeAvroRecordsWithRevisionsSample
{
public
async
Task
< (
int
,
int
)
>
SubscribeAvroRecordsWithRevisions
(
string
projectId
,
string
subscriptionId
)
{
SchemaServiceClient
schemaService
=
SchemaServiceClient
.
Create
();
var
schemaCache
=
new
ConcurrentDictionary
< (
string
,
string
),
Avro
.
Schema
> ();
SubscriptionName
subscriptionName
=
SubscriptionName
.
FromProjectSubscription
(
projectId
,
subscriptionId
);
SubscriberClient
subscriber
=
await
SubscriberClient
.
CreateAsync
(
subscriptionName
);
int
messageCount
=
0
;
Task
startTask
=
subscriber
.
StartAsync
((
PubsubMessage
message
,
CancellationToken
cancel
)
=
>
{
// Get the schema name, revision ID and encoding type from the message.
string
encoding
=
message
.
Attributes
[
"googclient_schemaencoding"
];
string
schemaName
=
message
.
Attributes
[
"googclient_schemaname"
];
string
revision
=
message
.
Attributes
[
"googclient_schemarevisionid"
];
// Fetch the schema if we don't already have it.
var
avroSchema
=
schemaCache
.
GetOrAdd
((
schemaName
,
revision
),
key
=
>
{
var
pubSubSchema
=
schemaService
.
GetSchema
(
$"{schemaName}@{revision}"
);
return
Avro
.
Schema
.
Parse
(
pubSubSchema
.
Definition
);
});
// Read the message.
if
(
encoding
==
"BINARY"
)
{
using
var
ms
=
new
MemoryStream
(
message
.
Data
.
ToByteArray
());
var
decoder
=
new
BinaryDecoder
(
ms
);
var
reader
=
new
DefaultReader
(
avroSchema
,
avroSchema
);
var
record
=
reader
.
Read<GenericRecord>
(
null
,
decoder
);
Console
.
WriteLine
(
$"Message {message.MessageId}: {record.GetValue(0)}"
);
Interlocked
.
Increment
(
ref
messageCount
);
}
else
{
Console
.
WriteLine
(
"Expected only binary messages in this sample"
);
}
return
Task
.
FromResult
(
SubscriberClient
.
Reply
.
Ack
);
});
// Run for 10 seconds.
await
Task
.
Delay
(
10
_000
);
await
subscriber
.
StopAsync
(
CancellationToken
.
None
);
// Lets make sure that the start task finished successfully after the call to stop.
await
startTask
;
return
(
messageCount
,
schemaCache
.
Count
);
}
}
Go
Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
(
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/pubsub/v2"
schema
"cloud.google.com/go/pubsub/v2/apiv1"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
"github.com/linkedin/goavro/v2"
)
func
subscribeWithAvroSchemaRevisions
(
w
io
.
Writer
,
projectID
,
subID
string
)
error
{
// projectID := "my-project-id"
// topicID := "my-topic"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
schemaClient
,
err
:=
schema
.
NewSchemaClient
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewSchemaClient: %w"
,
err
)
}
// Create the cache for the codecs for different revision IDs.
revisionCodecs
:=
make
(
map
[
string
]
*
goavro
.
Codec
)
sub
:=
client
.
Subscriber
(
subID
)
ctx2
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Second
)
defer
cancel
()
var
mu
sync
.
Mutex
sub
.
Receive
(
ctx2
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
mu
.
Lock
()
defer
mu
.
Unlock
()
name
:=
msg
.
Attributes
[
"googclient_schemaname"
]
revision
:=
msg
.
Attributes
[
"googclient_schemarevisionid"
]
codec
,
ok
:=
revisionCodecs
[
revision
]
// If the codec doesn't exist in the map, this is the first time we
// are seeing this revision. We need to fetch the schema and cache the
// codec. It would be more typical to do this asynchronously, but is
// shown here in a synchronous way to ease readability.
if
!
ok
{
s
:=
& pubsubpb
.
GetSchemaRequest
{
Name
:
fmt
.
Sprintf
(
"%s@%s"
,
name
,
revision
),
View
:
pubsubpb
.
SchemaView_FULL
,
}
schema
,
err
:=
schemaClient
.
GetSchema
(
ctx
,
s
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"Nacking, cannot read message without schema: %v\n"
,
err
)
msg
.
Nack
()
return
}
codec
,
err
=
goavro
.
NewCodec
(
schema
.
Definition
)
if
err
!=
nil
{
msg
.
Nack
()
fmt
.
Fprintf
(
w
,
"goavro.NewCodec err: %v\n"
,
err
)
}
revisionCodecs
[
revision
]
=
codec
}
encoding
:=
msg
.
Attributes
[
"googclient_schemaencoding"
]
var
state
map
[
string
]
interface
{}
if
encoding
==
"BINARY"
{
data
,
_
,
err
:=
codec
.
NativeFromBinary
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromBinary err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a binary-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
if
encoding
==
"JSON"
{
data
,
_
,
err
:=
codec
.
NativeFromTextual
(
msg
.
Data
)
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"codec.NativeFromTextual err: %v\n"
,
err
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"Received a JSON-encoded message:\n%#v\n"
,
data
)
state
=
data
.(
map
[
string
]
interface
{})
}
else
{
fmt
.
Fprintf
(
w
,
"Unknown message type(%s), nacking\n"
,
encoding
)
msg
.
Nack
()
return
}
fmt
.
Fprintf
(
w
,
"%s is abbreviated as %s\n"
,
state
[
"name"
],
state
[
"post_abbr"
])
msg
.
Ack
()
})
return
nil
}
Java
Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. SchemaServiceClient
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.protobuf. ByteString
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
com.google.pubsub.v1. Schema
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
utilities.State
;
public
class
SubscribeWithAvroSchemaRevisionsExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use an existing subscription.
String
subscriptionId
=
"your-subscription-id"
;
subscribeWithAvroSchemaRevisionsExample
(
projectId
,
subscriptionId
);
}
static
SchemaServiceClient
getSchemaServiceClient
()
{
try
{
return
SchemaServiceClient
.
create
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"Could not get schema client: "
+
e
);
return
null
;
}
}
public
static
void
subscribeWithAvroSchemaRevisionsExample
(
String
projectId
,
String
subscriptionId
)
{
// Used to get the schemas for revsions.
final
SchemaServiceClient
schemaServiceClient
=
getSchemaServiceClient
();
if
(
schemaServiceClient
==
null
)
{
return
;
}
// Cache for the readers for different revision IDs.
Map<String
,
SpecificDatumReader<State>
>
revisionReaders
=
new
HashMap<String
,
SpecificDatumReader<State>
> ();
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
// Instantiate an asynchronous message receiver.
MessageReceiver
receiver
=
(
PubsubMessage
message
,
AckReplyConsumer
consumer
)
-
>
{
// Get the schema encoding type.
String
name
=
message
.
getAttributesMap
().
get
(
"googclient_schemaname"
);
String
revision
=
message
.
getAttributesMap
().
get
(
"googclient_schemarevisionid"
);
SpecificDatumReader<State>
reader
=
null
;
synchronized
(
revisionReaders
)
{
reader
=
revisionReaders
.
get
(
revision
);
}
if
(
reader
==
null
)
{
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder. It would be more typical
// to do this asynchronously, but is shown here in a synchronous
// way to ease readability.
try
{
Schema
schema
=
schemaServiceClient
.
getSchema
(
name
+
"@"
+
revision
);
org
.
apache
.
avro
.
Schema
avroSchema
=
new
org
.
apache
.
avro
.
Schema
.
Parser
().
parse
(
schema
.
getDefinition
());
reader
=
new
SpecificDatumReader<State>
(
avroSchema
,
State
.
getClassSchema
());
synchronized
(
revisionReaders
)
{
revisionReaders
.
put
(
revision
,
reader
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"Could not get schema: "
+
e
);
// Without the schema, we cannot read the message, so nack it.
consumer
.
nack
();
return
;
}
}
ByteString
data
=
message
.
getData
();
// Send the message data to a byte[] input stream.
InputStream
inputStream
=
new
ByteArrayInputStream
(
data
.
toByteArray
());
String
encoding
=
message
.
getAttributesMap
().
get
(
"googclient_schemaencoding"
);
Decoder
decoder
=
null
;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
try
{
switch
(
encoding
)
{
case
"BINARY"
:
decoder
=
DecoderFactory
.
get
().
directBinaryDecoder
(
inputStream
,
/* reuse= */
null
);
System
.
out
.
println
(
"Receiving a binary-encoded message:"
);
break
;
case
"JSON"
:
decoder
=
DecoderFactory
.
get
().
jsonDecoder
(
State
.
getClassSchema
(),
inputStream
);
System
.
out
.
println
(
"Receiving a JSON-encoded message:"
);
break
;
default
:
System
.
out
.
println
(
"Unknown message type; nacking."
);
consumer
.
nack
();
break
;
}
// Obtain an object of the generated Avro class using the decoder.
State
state
=
reader
.
read
(
null
,
decoder
);
System
.
out
.
println
(
state
.
getName
()
+
" is abbreviated as "
+
state
.
getPostAbbr
());
// Ack the message.
consumer
.
ack
();
}
catch
(
IOException
e
)
{
System
.
err
.
println
(
e
);
// If we failed to process the message, nack it.
consumer
.
nack
();
}
};
Subscriber
subscriber
=
null
;
try
{
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
receiver
).
build
();
subscriber
.
startAsync
().
awaitRunning
();
System
.
out
.
printf
(
"Listening for messages on %s:\n"
,
subscriptionName
.
toString
());
subscriber
.
awaitTerminated
(
30
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
timeoutException
)
{
subscriber
.
stopAsync
();
}
}
}
Ruby
Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
# subscription_id = "your-subscription-id"
pubsub
=
Google
::
Cloud
::
PubSub
.
new
subscriber
=
pubsub
.
subscriber
subscription_id
# Cache for the parsed Avro schemas mapped by revision ID.
schema_cache
=
{}
cache_mutex
=
Mutex
.
new
listener
=
subscriber
.
listen
do
|
received_message
|
schema_name
=
received_message
.
attributes
[
"googclient_schemaname"
]
revision_id
=
received_message
.
attributes
[
"googclient_schemarevisionid"
]
encoding
=
received_message
.
attributes
[
"googclient_schemaencoding"
]
# Prevent concurrent threads from racing to fetch and parse the same schema.
avro_schema
=
cache_mutex
.
synchronize
{
schema_cache
[
revision_id
]
}
if
avro_schema
.
nil?
begin
require
"avro"
# The resource name format is projects/{project}/schemas/{schema}@{revision}.
schema_resource
=
pubsub
.
schemas
.
get_schema
name
:
"
#{
schema_name
}
@
#{
revision_id
}
"
avro_schema
=
Avro
::
Schema
.
parse
schema_resource
.
definition
cache_mutex
.
synchronize
{
schema_cache
[
revision_id
]
=
avro_schema
}
rescue
StandardError
=
>
e
puts
"Could not get schema for revision
#{
revision_id
}
:
#{
e
.
message
}
"
received_message
.
reject!
next
end
end
begin
case
encoding
when
"BINARY"
require
"avro"
buffer
=
StringIO
.
new
received_message
.
data
decoder
=
Avro
::
IO
::
BinaryDecoder
.
new
buffer
reader
=
Avro
::
IO
::
DatumReader
.
new
avro_schema
message_data
=
reader
.
read
decoder
puts
"Received a binary-encoded message:
\n
#{
message_data
}
"
when
"JSON"
require
"json"
message_data
=
JSON
.
parse
received_message
.
data
puts
"Received a JSON-encoded message:
\n
#{
message_data
}
"
else
puts
"Unknown message encoding:
#{
encoding
}
. Rejecting message."
received_message
.
reject!
next
end
received_message
.
acknowledge!
rescue
StandardError
=
>
e
puts
"Failed to process message:
#{
e
.
message
}
"
received_message
.
reject!
end
end
listener
.
start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit.
sleep
60
listener
.
stop
.
wait!
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

