Receive messages of Avro schema type

Receive a message of Avro schema type, convert the message data to an object of a generated Avro class, and acknowledge the message.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 return 
  
 []( 
 pubsub 
 :: 
 Subscriber 
  
 subscriber 
 ) 
  
 { 
  
 auto 
  
 session 
  
 = 
  
 subscriber 
 . 
 Subscribe 
 ( 
  
 []( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Message contents: " 
 << 
 m 
 . 
 data 
 () 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 }); 
  
 return 
  
 session 
 ; 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
 Avro.IO 
 ; 
 using 
  
 Avro.Specific 
 ; 
 using 
  
  Google.Api.Gax 
 
 ; 
 using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 Newtonsoft.Json 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.IO 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PullAvroMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PullAvroMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
 new 
  
  SubscriberClientBuilder 
 
  
 { 
  
 SubscriptionName 
  
 = 
  
 subscriptionName 
 , 
  
 Settings 
  
 = 
  
 new 
  
 SubscriberClient 
 . 
 Settings 
  
 { 
  
 AckExtensionWindow 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 4 
 ), 
  
 AckDeadline 
  
 = 
  
 TimeSpan 
 . 
 FromSeconds 
 ( 
 10 
 ), 
  
 FlowControlSettings 
  
 = 
  
 new 
  
  FlowControlSettings 
 
 ( 
 maxOutstandingElementCount 
 : 
  
 100 
 , 
  
 maxOutstandingByteCount 
 : 
  
 10240 
 ) 
  
 } 
  
 }. 
 BuildAsync 
 (); 
  
 // SubscriberClient runs your message handle function on multiple 
  
 // threads to maximize throughput. 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
  CancellationToken 
 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 string 
  
 encoding 
  
 = 
  
 message 
 . 
 Attributes 
 [ 
 "googclient_schemaencoding" 
 ]; 
  
 // AvroUtilities is a namespace. Below are files using the namespace. 
  
 // https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/State.cs 
  
 // https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/pubsub/api/Pubsub.Samples/Utilities/StateUtils.cs 
  
 AvroUtilities 
 . 
 State 
  
 state 
  
 = 
  
 new 
  
 AvroUtilities 
 . 
 State 
 (); 
  
 switch 
  
 ( 
 encoding 
 ) 
  
 { 
  
 case 
  
 "BINARY" 
 : 
  
 using 
  
 ( 
 var 
  
 ms 
  
 = 
  
 new 
  
 MemoryStream 
 ( 
 message 
 . 
 Data 
 . 
 ToByteArray 
 ())) 
  
 { 
  
 var 
  
 decoder 
  
 = 
  
 new 
  
 BinaryDecoder 
 ( 
 ms 
 ); 
  
 var 
  
 reader 
  
 = 
  
 new 
  
 SpecificDefaultReader 
 ( 
 state 
 . 
 Schema 
 , 
  
 state 
 . 
 Schema 
 ); 
  
 reader 
 . 
 Read<AvroUtilities 
 . 
 State 
> ( 
 state 
 , 
  
 decoder 
 ); 
  
 } 
  
 break 
 ; 
  
 case 
  
 "JSON" 
 : 
  
 state 
  
 = 
  
 JsonConvert 
 . 
 DeserializeObject<AvroUtilities 
 . 
 State 
> ( 
 message 
 . 
  Data 
 
 . 
 ToStringUtf8 
 ()); 
  
 break 
 ; 
  
 default 
 : 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Encoding not provided in message." 
 ); 
  
 break 
 ; 
  
 } 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {state}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
 acknowledge 
  
 ? 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
  
 : 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Nack 
 
 ); 
  
 }); 
  
 // Run for 5 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 5000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
  CancellationToken 
 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "os" 
  
 "sync" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "github.com/linkedin/goavro/v2" 
 ) 
 func 
  
 subscribeWithAvroSchema 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
 , 
  
 avscFile 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 avroSchema 
 , 
  
 err 
  
 := 
  
 os 
 . 
 ReadFile 
 ( 
 avscFile 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "os.ReadFile err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 codec 
 , 
  
 err 
  
 := 
  
 goavro 
 . 
 NewCodec 
 ( 
 string 
 ( 
 avroSchema 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "goavro.NewCodec err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 ctx2 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 var 
  
 mu 
  
 sync 
 . 
 Mutex 
  
 sub 
 . 
 Receive 
 ( 
 ctx2 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 mu 
 . 
 Lock 
 () 
  
 defer 
  
 mu 
 . 
 Unlock 
 () 
  
 encoding 
  
 := 
  
 msg 
 . 
 Attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 var 
  
 state 
  
 map 
 [ 
 string 
 ] 
 interface 
 {} 
  
 if 
  
 encoding 
  
 == 
  
 "BINARY" 
  
 { 
  
 data 
 , 
  
 _ 
 , 
  
 err 
  
 := 
  
 codec 
 . 
 NativeFromBinary 
 ( 
 msg 
 . 
 Data 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "codec.NativeFromBinary err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received a binary-encoded message:\n%#v\n" 
 , 
  
 data 
 ) 
  
 state 
  
 = 
  
 data 
 .( 
 map 
 [ 
 string 
 ] 
 interface 
 {}) 
  
 } 
  
 else 
  
 if 
  
 encoding 
  
 == 
  
 "JSON" 
  
 { 
  
 data 
 , 
  
 _ 
 , 
  
 err 
  
 := 
  
 codec 
 . 
 NativeFromTextual 
 ( 
 msg 
 . 
 Data 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "codec.NativeFromTextual err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received a JSON-encoded message:\n%#v\n" 
 , 
  
 data 
 ) 
  
 state 
  
 = 
  
 data 
 .( 
 map 
 [ 
 string 
 ] 
 interface 
 {}) 
  
 } 
  
 else 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Unknown message type(%s), nacking\n" 
 , 
  
 encoding 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "%s is abbreviated as %s\n" 
 , 
  
 state 
 [ 
 "name" 
 ], 
  
 state 
 [ 
 "post_abbr" 
 ]) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.io.ByteArrayInputStream 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.io.InputStream 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 import 
  
 org.apache.avro.io.Decoder 
 ; 
 import 
  
 org.apache.avro.io.DecoderFactory 
 ; 
 import 
  
 org.apache.avro.specific.SpecificDatumReader 
 ; 
 import 
  
 utilities.State 
 ; 
 public 
  
 class 
 SubscribeWithAvroSchemaExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 // Use an existing subscription. 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeWithAvroSchemaExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeWithAvroSchemaExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Prepare a reader for the encoded Avro records. 
  
 SpecificDatumReader<State> 
  
 reader 
  
 = 
  
 new 
  
 SpecificDatumReader 
<> ( 
 State 
 . 
 getClassSchema 
 ()); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 ByteString 
  
 data 
  
 = 
  
 message 
 . 
 getData 
 (); 
  
 // Get the schema encoding type. 
  
 String 
  
 encoding 
  
 = 
  
 message 
 . 
 getAttributesMap 
 (). 
 get 
 ( 
 "googclient_schemaencoding" 
 ); 
  
 // Send the message data to a byte[] input stream. 
  
 InputStream 
  
 inputStream 
  
 = 
  
 new 
  
 ByteArrayInputStream 
 ( 
 data 
 . 
 toByteArray 
 ()); 
  
 Decoder 
  
 decoder 
  
 = 
  
 null 
 ; 
  
 // Prepare an appropriate decoder for the message data in the input stream 
  
 // based on the schema encoding type. 
  
 block 
 : 
  
 try 
  
 { 
  
 switch 
  
 ( 
 encoding 
 ) 
  
 { 
  
 case 
  
 "BINARY" 
 : 
  
 decoder 
  
 = 
  
 DecoderFactory 
 . 
 get 
 (). 
 directBinaryDecoder 
 ( 
 inputStream 
 , 
  
 /* reuse= */ 
  
 null 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Receiving a binary-encoded message:" 
 ); 
  
 break 
 ; 
  
 case 
  
 "JSON" 
 : 
  
 decoder 
  
 = 
  
 DecoderFactory 
 . 
 get 
 (). 
 jsonDecoder 
 ( 
 State 
 . 
 getClassSchema 
 (), 
  
 inputStream 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Receiving a JSON-encoded message:" 
 ); 
  
 break 
 ; 
  
 default 
 : 
  
 break 
  
 block 
 ; 
  
 } 
  
 // Obtain an object of the generated Avro class using the decoder. 
  
 State 
  
 state 
  
 = 
  
 reader 
 . 
 read 
 ( 
 null 
 , 
  
 decoder 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 state 
 . 
 getName 
 () 
  
 + 
  
 " is abbreviated as " 
  
 + 
  
 state 
 . 
 getPostAbbr 
 ()); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 println 
 ( 
 e 
 ); 
  
 } 
  
 // Ack the message. 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

 ] 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 these 
 variables 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 subscriptionNameOrId 
 = 
 'YOUR_SUBSCRIPTION_NAME_OR_ID' 
 ; 
 // 
 const 
 timeout 
 = 
 60 
 ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 , 
 Schema 
 , 
 Encodings 
 , 
 Message 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Node 
 FS 
 library 
 , 
 to 
 load 
 definitions 
 import 
  
 * 
 as 
 fs 
 from 
  
 'fs' 
 ; 
 // 
 And 
 the 
 Apache 
 Avro 
 library 
 import 
  
 * 
 as 
 avro 
 from 
  
 'avro-js' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 function 
 listenForAvroRecords 
 ( 
 subscriptionNameOrId 
 : 
 string 
 , 
 timeout 
 : 
 number 
 ) 
 { 
 // 
 References 
 an 
 existing 
 subscription 
 const 
 subscription 
 = 
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
 // 
 Make 
 an 
 encoder 
 using 
 the 
 official 
 avro 
 - 
 js 
 library 
 . 
 const 
 definition 
 = 
 fs 
 . 
 readFileSync 
 ( 
 'system-test/fixtures/provinces.avsc' 
 ) 
 . 
 toString 
 (); 
 const 
 type 
 = 
 avro 
 . 
 parse 
 ( 
 definition 
 ); 
 // 
 Create 
 an 
 event 
 handler 
 to 
 handle 
 messages 
 let 
 messageCount 
 = 
 0 
 ; 
 const 
 messageHandler 
 = 
 async 
 ( 
 message 
 : 
 Message 
 ) 
 = 
> { 
 // 
 "Ack" 
 ( 
 acknowledge 
 receipt 
 of 
 ) 
 the 
 message 
 message 
 . 
 ack 
 (); 
 // 
 Get 
 the 
 schema 
 metadata 
 from 
  
 the 
 message 
 . 
 const 
 schemaMetadata 
 = 
 Schema 
 . 
 metadataFromMessage 
 ( 
 message 
 . 
 attributes 
 ); 
 let 
 result 
 : 
 object 
 | 
 undefined 
 ; 
 switch 
 ( 
 schemaMetadata 
 . 
 encoding 
 ) 
 { 
 case 
 Encodings 
 . 
 Binary 
 : 
 result 
 = 
 type 
 . 
 fromBuffer 
 ( 
 message 
 . 
 data 
 ); 
 break 
 ; 
 case 
 Encodings 
 . 
 Json 
 : 
 result 
 = 
 type 
 . 
 fromString 
 ( 
 message 
 . 
 data 
 . 
 toString 
 ()); 
 break 
 ; 
 default 
 : 
 console 
 . 
 log 
 ( 
 ` 
 Unknown 
 schema 
 encoding 
 : 
 $ 
 { 
 schemaMetadata 
 . 
 encoding 
 } 
 ` 
 ); 
 break 
 ; 
 } 
 console 
 . 
 log 
 ( 
 ` 
 Received 
 message 
 $ 
 { 
 message 
 . 
 id 
 }: 
 ` 
 ); 
 console 
 . 
 log 
 ( 
 ` 
\ tData 
 : 
 $ 
 { 
 JSON 
 . 
 stringify 
 ( 
 result 
 , 
 null 
 , 
 4 
 )} 
 ` 
 ); 
 console 
 . 
 log 
 ( 
 ` 
\ tAttributes 
 : 
 $ 
 { 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
 messageCount 
 += 
 1 
 ; 
 }; 
 // 
 Listen 
 for 
 new 
 messages 
 until 
 timeout 
 is 
 hit 
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
 messageHandler 
 ); 
 setTimeout 
 (() 
 = 
> { 
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
 messageHandler 
 ); 
 console 
 . 
 log 
 ( 
 `$ 
 { 
 messageCount 
 } 
 message 
 ( 
 s 
 ) 
 received 
 . 
 ` 
 ); 
 }, 
 timeout 
 * 
 1000 
 ); 
 use 
 ? 
 . 
 optional 
 ? 
 . 
 chaining 
 ? 
 . 
 (); 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Subscribe and pull messages using an AVRO schema. 
 * 
 * @param string $projectId 
 * @param string $subscriptionId 
 */ 
 function subscribe_avro_records($projectId, $subscriptionId, $definitionFile) 
 { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $subscription = $pubsub->subscription($subscriptionId); 
 $definition = file_get_contents($definitionFile); 
 $messages = $subscription->pull(); 
 foreach ($messages as $message) { 
 $decodedMessageData = ''; 
 $encoding = $message->attribute('googclient_schemaencoding'); 
 switch ($encoding) { 
 case 'BINARY': 
 $io = new \AvroStringIO($message->data()); 
 $schema = \AvroSchema::parse($definition); 
 $reader = new \AvroIODatumReader($schema); 
 $decoder = new \AvroIOBinaryDecoder($io); 
 $decodedMessageData = json_encode($reader->read($decoder)); 
 break; 
 case 'JSON': 
 $decodedMessageData = $message->data(); 
 break; 
 } 
 printf('Received a %d-encoded message %s', $encoding, $decodedMessageData); 
 } 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # subscription_id = "your-subscription-id" 
 # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 encoding 
  
 = 
  
 received_message 
 . 
 attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 case 
  
 encoding 
  
 when 
  
 "BINARY" 
  
 require 
  
 "avro" 
  
 avro_schema 
  
 = 
  
 Avro 
 :: 
 Schema 
 . 
 parse 
  
 File 
 . 
 read 
 ( 
 avsc_file 
 ) 
  
 buffer 
  
 = 
  
 StringIO 
 . 
  new 
 
  
 received_message 
 . 
 data 
  
 decoder 
  
 = 
  
 Avro 
 :: 
 IO 
 :: 
 BinaryDecoder 
 . 
  new 
 
  
 buffer 
  
 reader 
  
 = 
  
 Avro 
 :: 
 IO 
 :: 
 DatumReader 
 . 
  new 
 
  
 avro_schema 
  
 message_data 
  
 = 
  
 reader 
 . 
 read 
  
 decoder 
  
 puts 
  
 "Received a binary-encoded message: 
 \n 
 #{ 
 message_data 
 } 
 " 
  
 when 
  
 "JSON" 
  
 require 
  
 "json" 
  
 message_data 
  
 = 
  
 JSON 
 . 
 parse 
  
 received_message 
 . 
 data 
  
 puts 
  
 "Received a JSON-encoded message: 
 \n 
 #{ 
 message_data 
 } 
 " 
  
 else 
  
 "Received a message with no encoding: 
 \n 
 #{ 
 received_message 
 . 
 message_id 
 } 
 " 
  
 end 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: