Receive messages that could be for different schema revisions

Receive messages that could be for different schema revisions

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  auto 
  
 subscriber 
  
 = 
  
 pubsub 
 :: 
 Subscriber 
 ( 
 pubsub 
 :: 
 MakeSubscriberConnection 
 ( 
  
 pubsub 
 :: 
 Subscription 
 ( 
 project_id 
 , 
  
 subscription_id 
 ))); 
 // Create a schema client. 
 auto 
  
 schema_client 
  
 = 
  
 pubsub 
 :: 
 SchemaServiceClient 
 ( 
 pubsub 
 :: 
 MakeSchemaServiceConnection 
 ()); 
 // Read the reader schema. This is the schema you want the messages to be 
 // evaluated using. 
 std 
 :: 
 ifstream 
  
 ifs 
 ( 
 avro_file 
 ); 
 avro 
 :: 
 ValidSchema 
  
 reader_schema 
 ; 
 avro 
 :: 
 compileJsonSchema 
 ( 
 ifs 
 , 
  
 reader_schema 
 ); 
 std 
 :: 
 unordered_map<std 
 :: 
 string 
 , 
  
 avro 
 :: 
 ValidSchema 
>  
 revisions_to_schemas 
 ; 
 auto 
  
 session 
  
 = 
  
 subscriber 
 . 
 Subscribe 
 ( 
  
 [&]( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 message 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 // Get the reader schema revision for the message. 
  
 auto 
  
 schema_name 
  
 = 
  
 message 
 . 
 attributes 
 ()[ 
 "googclient_schemaname" 
 ]; 
  
 auto 
  
 schema_revision_id 
  
 = 
  
 message 
 . 
 attributes 
 ()[ 
 "googclient_schemarevisionid" 
 ]; 
  
 // If we haven't received a message with this schema, look it up. 
  
 if 
  
 ( 
 revisions_to_schemas 
 . 
 find 
 ( 
 schema_revision_id 
 ) 
  
 == 
  
 revisions_to_schemas 
 . 
 end 
 ()) 
  
 { 
  
 auto 
  
 schema_path 
  
 = 
  
 schema_name 
  
 + 
  
 "@" 
  
 + 
  
 schema_revision_id 
 ; 
  
 // Use the schema client to get the path. 
  
 auto 
  
 schema 
  
 = 
  
 schema_client 
 . 
 GetSchema 
 ( 
 schema_path 
 ); 
  
 if 
  
 ( 
 ! 
 schema 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Schema not found:" 
 << 
 schema_path 
 << 
 " 
 \n 
 " 
 ; 
  
 return 
 ; 
  
 } 
  
 avro 
 :: 
 ValidSchema 
  
 writer_schema 
 ; 
  
 std 
 :: 
 stringstream 
  
 in 
 ; 
  
 in 
 << 
 schema 
 . 
 value 
 (). 
 definition 
 (); 
  
 avro 
 :: 
 compileJsonSchema 
 ( 
 in 
 , 
  
 writer_schema 
 ); 
  
 revisions_to_schemas 
 [ 
 schema_revision_id 
 ] 
  
 = 
  
 writer_schema 
 ; 
  
 } 
  
 auto 
  
 writer_schema 
  
 = 
  
 revisions_to_schemas 
 [ 
 schema_revision_id 
 ]; 
  
 auto 
  
 encoding 
  
 = 
  
 message 
 . 
 attributes 
 ()[ 
 "googclient_schemaencoding" 
 ]; 
  
 if 
  
 ( 
 encoding 
  
 == 
  
 "JSON" 
 ) 
  
 { 
  
 std 
 :: 
 stringstream 
  
 in 
 ; 
  
 in 
 << 
 message 
 . 
 data 
 (); 
  
 auto 
  
 avro_in 
  
 = 
  
 avro 
 :: 
 istreamInputStream 
 ( 
 in 
 ); 
  
 avro 
 :: 
 DecoderPtr 
  
 decoder 
  
 = 
  
 avro 
 :: 
 resolvingDecoder 
 ( 
  
 writer_schema 
 , 
  
 reader_schema 
 , 
  
 avro 
 :: 
 jsonDecoder 
 ( 
 writer_schema 
 )); 
  
 decoder 
 - 
> init 
 ( 
 * 
 avro_in 
 ); 
  
 v2 
 :: 
 State 
  
 state 
 ; 
  
 avro 
 :: 
 decode 
 ( 
 * 
 decoder 
 , 
  
 state 
 ); 
  
 std 
 :: 
 cout 
 << 
 "Name: " 
 << 
 state 
 . 
 name 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 cout 
 << 
 "Postal Abbreviation: " 
 << 
 state 
 . 
 post_abbr 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 cout 
 << 
 "Population: " 
 << 
 state 
 . 
 population 
 << 
 " 
 \n 
 " 
 ; 
  
 } 
  
 else 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Unable to decode. Received message using encoding" 
 << 
 encoding 
 << 
 " 
 \n 
 " 
 ; 
  
 } 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 }); 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
 Avro.Generic 
 ; 
 using 
  
 Avro.IO 
 ; 
 using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Concurrent 
 ; 
 using 
  
 System.IO 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 SubscribeAvroRecordsWithRevisionsSample 
 { 
  
 public 
  
 async 
  
 Task 
< ( 
 int 
 , 
  
 int 
 ) 
>  
 SubscribeAvroRecordsWithRevisions 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 ) 
  
 { 
  
 SchemaServiceClient 
  
 schemaService 
  
 = 
  
 SchemaServiceClient 
 . 
 Create 
 (); 
  
 var 
  
 schemaCache 
  
 = 
  
 new 
  
 ConcurrentDictionary 
< ( 
 string 
 , 
  
 string 
 ), 
  
 Avro 
 . 
 Schema 
> (); 
  
 SubscriptionName 
  
 subscriptionName 
  
 = 
  
 SubscriptionName 
 . 
 FromProjectSubscription 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 SubscriberClient 
  
 subscriber 
  
 = 
  
 await 
  
 SubscriberClient 
 . 
 CreateAsync 
 ( 
 subscriptionName 
 ); 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
 StartAsync 
 (( 
 PubsubMessage 
  
 message 
 , 
  
 CancellationToken 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 // Get the schema name, revision ID and encoding type from the message. 
  
 string 
  
 encoding 
  
 = 
  
 message 
 . 
 Attributes 
 [ 
 "googclient_schemaencoding" 
 ]; 
  
 string 
  
 schemaName 
  
 = 
  
 message 
 . 
 Attributes 
 [ 
 "googclient_schemaname" 
 ]; 
  
 string 
  
 revision 
  
 = 
  
 message 
 . 
 Attributes 
 [ 
 "googclient_schemarevisionid" 
 ]; 
  
 // Fetch the schema if we don't already have it. 
  
 var 
  
 avroSchema 
  
 = 
  
 schemaCache 
 . 
 GetOrAdd 
 (( 
 schemaName 
 , 
  
 revision 
 ), 
  
 key 
  
 = 
>  
 { 
  
 var 
  
 pubSubSchema 
  
 = 
  
 schemaService 
 . 
 GetSchema 
 ( 
 $"{schemaName}@{revision}" 
 ); 
  
 return 
  
 Avro 
 . 
 Schema 
 . 
 Parse 
 ( 
 pubSubSchema 
 . 
 Definition 
 ); 
  
 }); 
  
 // Read the message. 
  
 if 
  
 ( 
 encoding 
  
 == 
  
 "BINARY" 
 ) 
  
 { 
  
 using 
  
 var 
  
 ms 
  
 = 
  
 new 
  
 MemoryStream 
 ( 
 message 
 . 
 Data 
 . 
 ToByteArray 
 ()); 
  
 var 
  
 decoder 
  
 = 
  
 new 
  
 BinaryDecoder 
 ( 
 ms 
 ); 
  
 var 
  
 reader 
  
 = 
  
 new 
  
 DefaultReader 
 ( 
 avroSchema 
 , 
  
 avroSchema 
 ); 
  
 var 
  
 record 
  
 = 
  
 reader 
 . 
 Read<GenericRecord> 
 ( 
 null 
 , 
  
 decoder 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {record.GetValue(0)}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 } 
  
 else 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 "Expected only binary messages in this sample" 
 ); 
  
 } 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
 ); 
  
 }); 
  
 // Run for 10 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 10 
 _000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
 CancellationToken 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 ( 
 messageCount 
 , 
  
 schemaCache 
 . 
 Count 
 ); 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "sync" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 schema 
  
 "cloud.google.com/go/pubsub/v2/apiv1" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
  
 "github.com/linkedin/goavro/v2" 
 ) 
 func 
  
 subscribeWithAvroSchemaRevisions 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 schemaClient 
 , 
  
 err 
  
 := 
  
 schema 
 . 
 NewSchemaClient 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewSchemaClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Create the cache for the codecs for different revision IDs. 
  
 revisionCodecs 
  
 := 
  
 make 
 ( 
 map 
 [ 
 string 
 ] 
 * 
 goavro 
 . 
 Codec 
 ) 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 ctx2 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 var 
  
 mu 
  
 sync 
 . 
 Mutex 
  
 sub 
 . 
 Receive 
 ( 
 ctx2 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 mu 
 . 
 Lock 
 () 
  
 defer 
  
 mu 
 . 
 Unlock 
 () 
  
 name 
  
 := 
  
 msg 
 . 
 Attributes 
 [ 
 "googclient_schemaname" 
 ] 
  
 revision 
  
 := 
  
 msg 
 . 
 Attributes 
 [ 
 "googclient_schemarevisionid" 
 ] 
  
 codec 
 , 
  
 ok 
  
 := 
  
 revisionCodecs 
 [ 
 revision 
 ] 
  
 // If the codec doesn't exist in the map, this is the first time we 
  
 // are seeing this revision. We need to fetch the schema and cache the 
  
 // codec. It would be more typical to do this asynchronously, but is 
  
 // shown here in a synchronous way to ease readability. 
  
 if 
  
 ! 
 ok 
  
 { 
  
 s 
  
 := 
  
& pubsubpb 
 . 
 GetSchemaRequest 
 { 
  
 Name 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s@%s" 
 , 
  
 name 
 , 
  
 revision 
 ), 
  
 View 
 : 
  
 pubsubpb 
 . 
 SchemaView_FULL 
 , 
  
 } 
  
 schema 
 , 
  
 err 
  
 := 
  
 schemaClient 
 . 
 GetSchema 
 ( 
 ctx 
 , 
  
 s 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Nacking, cannot read message without schema: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 codec 
 , 
  
 err 
  
 = 
  
 goavro 
 . 
 NewCodec 
 ( 
 schema 
 . 
 Definition 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 msg 
 . 
 Nack 
 () 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "goavro.NewCodec err: %v\n" 
 , 
  
 err 
 ) 
  
 } 
  
 revisionCodecs 
 [ 
 revision 
 ] 
  
 = 
  
 codec 
  
 } 
  
 encoding 
  
 := 
  
 msg 
 . 
 Attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 var 
  
 state 
  
 map 
 [ 
 string 
 ] 
 interface 
 {} 
  
 if 
  
 encoding 
  
 == 
  
 "BINARY" 
  
 { 
  
 data 
 , 
  
 _ 
 , 
  
 err 
  
 := 
  
 codec 
 . 
 NativeFromBinary 
 ( 
 msg 
 . 
 Data 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "codec.NativeFromBinary err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received a binary-encoded message:\n%#v\n" 
 , 
  
 data 
 ) 
  
 state 
  
 = 
  
 data 
 .( 
 map 
 [ 
 string 
 ] 
 interface 
 {}) 
  
 } 
  
 else 
  
 if 
  
 encoding 
  
 == 
  
 "JSON" 
  
 { 
  
 data 
 , 
  
 _ 
 , 
  
 err 
  
 := 
  
 codec 
 . 
 NativeFromTextual 
 ( 
 msg 
 . 
 Data 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "codec.NativeFromTextual err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received a JSON-encoded message:\n%#v\n" 
 , 
  
 data 
 ) 
  
 state 
  
 = 
  
 data 
 .( 
 map 
 [ 
 string 
 ] 
 interface 
 {}) 
  
 } 
  
 else 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Unknown message type(%s), nacking\n" 
 , 
  
 encoding 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "%s is abbreviated as %s\n" 
 , 
  
 state 
 [ 
 "name" 
 ], 
  
 state 
 [ 
 "post_abbr" 
 ]) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. SchemaServiceClient 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. Schema 
 
 ; 
 import 
  
 java.io.ByteArrayInputStream 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.io.InputStream 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 import 
  
 org.apache.avro.io.Decoder 
 ; 
 import 
  
 org.apache.avro.io.DecoderFactory 
 ; 
 import 
  
 org.apache.avro.specific.SpecificDatumReader 
 ; 
 import 
  
 utilities.State 
 ; 
 public 
  
 class 
 SubscribeWithAvroSchemaRevisionsExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 // Use an existing subscription. 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeWithAvroSchemaRevisionsExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 static 
  
  SchemaServiceClient 
 
  
 getSchemaServiceClient 
 () 
  
 { 
  
 try 
  
 { 
  
 return 
  
  SchemaServiceClient 
 
 . 
 create 
 (); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Could not get schema client: " 
  
 + 
  
 e 
 ); 
  
 return 
  
 null 
 ; 
  
 } 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeWithAvroSchemaRevisionsExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
 // Used to get the schemas for revsions. 
  
 final 
  
  SchemaServiceClient 
 
  
 schemaServiceClient 
  
 = 
  
 getSchemaServiceClient 
 (); 
  
 if 
  
 ( 
 schemaServiceClient 
  
 == 
  
 null 
 ) 
  
 { 
  
 return 
 ; 
  
 } 
  
 // Cache for the readers for different revision IDs. 
  
 Map<String 
 , 
  
 SpecificDatumReader<State> 
>  
 revisionReaders 
  
 = 
  
 new 
  
 HashMap<String 
 , 
  
 SpecificDatumReader<State> 
> (); 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Get the schema encoding type. 
  
 String 
  
 name 
  
 = 
  
 message 
 . 
 getAttributesMap 
 (). 
 get 
 ( 
 "googclient_schemaname" 
 ); 
  
 String 
  
 revision 
  
 = 
  
 message 
 . 
 getAttributesMap 
 (). 
 get 
 ( 
 "googclient_schemarevisionid" 
 ); 
  
 SpecificDatumReader<State> 
  
 reader 
  
 = 
  
 null 
 ; 
  
 synchronized 
  
 ( 
 revisionReaders 
 ) 
  
 { 
  
 reader 
  
 = 
  
 revisionReaders 
 . 
 get 
 ( 
 revision 
 ); 
  
 } 
  
 if 
  
 ( 
 reader 
  
 == 
  
 null 
 ) 
  
 { 
  
 // This is the first time we are seeing this revision. We need to 
  
 // fetch the schema and cache its decoder. It would be more typical 
  
 // to do this asynchronously, but is shown here in a synchronous 
  
 // way to ease readability. 
  
 try 
  
 { 
  
  Schema 
 
  
 schema 
  
 = 
  
 schemaServiceClient 
 . 
  getSchema 
 
 ( 
 name 
  
 + 
  
 "@" 
  
 + 
  
 revision 
 ); 
  
 org 
 . 
 apache 
 . 
 avro 
 . 
  Schema 
 
  
 avroSchema 
  
 = 
  
 new 
  
 org 
 . 
 apache 
 . 
 avro 
 . 
  Schema 
 
 . 
  Parser 
 
 (). 
 parse 
 ( 
 schema 
 . 
  getDefinition 
 
 ()); 
  
 reader 
  
 = 
  
 new 
  
 SpecificDatumReader<State> 
 ( 
 avroSchema 
 , 
  
 State 
 . 
 getClassSchema 
 ()); 
  
 synchronized 
  
 ( 
 revisionReaders 
 ) 
  
 { 
  
 revisionReaders 
 . 
 put 
 ( 
 revision 
 , 
  
 reader 
 ); 
  
 } 
  
 } 
  
 catch 
  
 ( 
 Exception 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Could not get schema: " 
  
 + 
  
 e 
 ); 
  
 // Without the schema, we cannot read the message, so nack it. 
  
 consumer 
 . 
 nack 
 (); 
  
 return 
 ; 
  
 } 
  
 } 
  
  ByteString 
 
  
 data 
  
 = 
  
 message 
 . 
 getData 
 (); 
  
 // Send the message data to a byte[] input stream. 
  
 InputStream 
  
 inputStream 
  
 = 
  
 new 
  
 ByteArrayInputStream 
 ( 
 data 
 . 
  toByteArray 
 
 ()); 
  
 String 
  
 encoding 
  
 = 
  
 message 
 . 
 getAttributesMap 
 (). 
 get 
 ( 
 "googclient_schemaencoding" 
 ); 
  
 Decoder 
  
 decoder 
  
 = 
  
 null 
 ; 
  
 // Prepare an appropriate decoder for the message data in the input stream 
  
 // based on the schema encoding type. 
  
 try 
  
 { 
  
 switch 
  
 ( 
 encoding 
 ) 
  
 { 
  
 case 
  
 "BINARY" 
 : 
  
 decoder 
  
 = 
  
 DecoderFactory 
 . 
 get 
 (). 
 directBinaryDecoder 
 ( 
 inputStream 
 , 
  
 /* reuse= */ 
  
 null 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Receiving a binary-encoded message:" 
 ); 
  
 break 
 ; 
  
 case 
  
 "JSON" 
 : 
  
 decoder 
  
 = 
  
 DecoderFactory 
 . 
 get 
 (). 
 jsonDecoder 
 ( 
 State 
 . 
 getClassSchema 
 (), 
  
 inputStream 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Receiving a JSON-encoded message:" 
 ); 
  
 break 
 ; 
  
 default 
 : 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Unknown message type; nacking." 
 ); 
  
 consumer 
 . 
 nack 
 (); 
  
 break 
 ; 
  
 } 
  
 // Obtain an object of the generated Avro class using the decoder. 
  
 State 
  
 state 
  
 = 
  
 reader 
 . 
 read 
 ( 
 null 
 , 
  
 decoder 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 state 
 . 
 getName 
 () 
  
 + 
  
 " is abbreviated as " 
  
 + 
  
 state 
 . 
 getPostAbbr 
 ()); 
  
 // Ack the message. 
  
 consumer 
 . 
 ack 
 (); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 println 
 ( 
 e 
 ); 
  
 // If we failed to process the message, nack it. 
  
 consumer 
 . 
 nack 
 (); 
  
 } 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 # Cache for the parsed Avro schemas mapped by revision ID. 
 schema_cache 
  
 = 
  
 {} 
 cache_mutex 
  
 = 
  
 Mutex 
 . 
  new 
 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 schema_name 
  
 = 
  
 received_message 
 . 
 attributes 
 [ 
 "googclient_schemaname" 
 ] 
  
 revision_id 
  
 = 
  
 received_message 
 . 
 attributes 
 [ 
 "googclient_schemarevisionid" 
 ] 
  
 encoding 
  
 = 
  
 received_message 
 . 
 attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 # Prevent concurrent threads from racing to fetch and parse the same schema. 
  
 avro_schema 
  
 = 
  
 cache_mutex 
 . 
 synchronize 
  
 { 
  
 schema_cache 
 [ 
 revision_id 
 ] 
  
 } 
  
 if 
  
 avro_schema 
 . 
 nil? 
  
 begin 
  
 require 
  
 "avro" 
  
 # The resource name format is projects/{project}/schemas/{schema}@{revision}. 
  
 schema_resource 
  
 = 
  
 pubsub 
 . 
 schemas 
 . 
 get_schema 
  
 name 
 : 
  
 " 
 #{ 
 schema_name 
 } 
 @ 
 #{ 
 revision_id 
 } 
 " 
  
 avro_schema 
  
 = 
  
 Avro 
 :: 
 Schema 
 . 
 parse 
  
 schema_resource 
 . 
  definition 
 
  
 cache_mutex 
 . 
 synchronize 
  
 { 
  
 schema_cache 
 [ 
 revision_id 
 ] 
  
 = 
  
 avro_schema 
  
 } 
  
 rescue 
  
 StandardError 
  
 = 
>  
 e 
  
 puts 
  
 "Could not get schema for revision 
 #{ 
 revision_id 
 } 
 : 
 #{ 
 e 
 . 
 message 
 } 
 " 
  
 received_message 
 . 
  reject! 
 
  
 next 
  
 end 
  
 end 
  
 begin 
  
 case 
  
 encoding 
  
 when 
  
 "BINARY" 
  
 require 
  
 "avro" 
  
 buffer 
  
 = 
  
 StringIO 
 . 
  new 
 
  
 received_message 
 . 
 data 
  
 decoder 
  
 = 
  
 Avro 
 :: 
 IO 
 :: 
 BinaryDecoder 
 . 
  new 
 
  
 buffer 
  
 reader 
  
 = 
  
 Avro 
 :: 
 IO 
 :: 
 DatumReader 
 . 
  new 
 
  
 avro_schema 
  
 message_data 
  
 = 
  
 reader 
 . 
 read 
  
 decoder 
  
 puts 
  
 "Received a binary-encoded message: 
 \n 
 #{ 
 message_data 
 } 
 " 
  
 when 
  
 "JSON" 
  
 require 
  
 "json" 
  
 message_data 
  
 = 
  
 JSON 
 . 
 parse 
  
 received_message 
 . 
 data 
  
 puts 
  
 "Received a JSON-encoded message: 
 \n 
 #{ 
 message_data 
 } 
 " 
  
 else 
  
 puts 
  
 "Unknown message encoding: 
 #{ 
 encoding 
 } 
 . Rejecting message." 
  
 received_message 
 . 
  reject! 
 
  
 next 
  
 end 
  
 received_message 
 . 
  acknowledge! 
 
  
 rescue 
  
 StandardError 
  
 = 
>  
 e 
  
 puts 
  
 "Failed to process message: 
 #{ 
 e 
 . 
 message 
 } 
 " 
  
 received_message 
 . 
  reject! 
 
  
 end 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit. 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: