Optimistically subscribe without creating a subscription first

Minimizes administrative operations by first trying to subscribe and then creating a subscription if getting a NotFound error.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  auto 
  
 process_response 
  
 = 
  
 []( 
 gc 
 :: 
 StatusOr<pubsub 
 :: 
 PullResponse 
>  
 response 
 ) 
  
 { 
  
 if 
  
 ( 
 response 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 response 
 - 
> message 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 response 
 - 
> handler 
 ). 
 ack 
 (); 
  
 return 
  
 gc 
 :: 
 Status 
 (); 
  
 } 
  
 if 
  
 ( 
 response 
 . 
 status 
 (). 
 code 
 () 
  
 == 
  
 gc 
 :: 
 StatusCode 
 :: 
 kUnavailable 
  
&&  
 response 
 . 
 status 
 (). 
 message 
 () 
  
 == 
  
 "no messages returned" 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "No messages returned from Pull() 
 \n 
 " 
 ; 
  
 return 
  
 gc 
 :: 
 Status 
 (); 
  
 } 
  
 return 
  
 response 
 . 
 status 
 (); 
 }; 
 // Instead of checking if the subscription exists, optimistically try to 
 // consume from the subscription. 
 auto 
  
 status 
  
 = 
  
 process_response 
 ( 
 subscriber 
 . 
 Pull 
 ()); 
 if 
  
 ( 
 status 
 . 
 ok 
 ()) 
  
 return 
 ; 
 if 
  
 ( 
 status 
 . 
 code 
 () 
  
 != 
  
 gc 
 :: 
 StatusCode 
 :: 
 kNotFound 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 status 
 ); 
 // Since the subscription does not exist, create the subscription. 
 pubsub_admin 
 :: 
 SubscriptionAdminClient 
  
 subscription_admin_client 
 ( 
  
 pubsub_admin 
 :: 
 MakeSubscriptionAdminConnection 
 ()); 
 google 
 :: 
 pubsub 
 :: 
 v1 
 :: 
 Subscription 
  
 request 
 ; 
 request 
 . 
 set_name 
 ( 
  
 pubsub 
 :: 
 Subscription 
 ( 
 project_id 
 , 
  
 subscription_id 
 ). 
 FullName 
 ()); 
 request 
 . 
 set_topic 
 ( 
  
 pubsub 
 :: 
 Topic 
 ( 
 project_id 
 , 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )). 
 FullName 
 ()); 
 auto 
  
 sub 
  
 = 
  
 subscription_admin_client 
 . 
 CreateSubscription 
 ( 
 request 
 ); 
 if 
  
 ( 
 ! 
 sub 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 sub 
 ). 
 status 
 (); 
 // Consume from the new subscription. 
 status 
  
 = 
  
 process_response 
 ( 
 subscriber 
 . 
 Pull 
 ()); 
 if 
  
 ( 
 ! 
 status 
 . 
 ok 
 ()) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 status 
 ); 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 OptimisticSubscribeSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 OptimisticSubscribe 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 string 
  
 subscriptionId 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
  SubscriberClient 
 
 . 
  CreateAsync 
 
 ( 
 subscriptionName 
 ); 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 try 
  
 { 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
 CancellationToken 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 string 
  
 text 
  
 = 
  
 message 
 . 
 Data 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {text}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
 ); 
  
 }); 
  
 // Run for 5 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 5000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
 CancellationToken 
 . 
 None 
 ); 
  
 // Wait for the StartAsync call to finish 
  
 await 
  
 startTask 
 ; 
  
 } 
  
 // Catch exception when subscription is not found 
  
 catch 
  
 ( 
  RpcException 
 
  
 e 
 ) 
  
 when 
  
 ( 
 e 
 . 
 Status 
 . 
 StatusCode 
  
 == 
  
 StatusCode 
 . 
 NotFound 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message: {e.Message}" 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"StackTrace: {e.StackTrace}" 
 ); 
  
  SubscriberServiceApiClient 
 
  
 subscriberServiceApiClient 
  
 = 
  
  SubscriberServiceApiClient 
 
 . 
  Create 
 
 (); 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
  FromProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Subscription 
 
  
 subscription 
  
 = 
  
 subscriberServiceApiClient 
 . 
  CreateSubscription 
 
 ( 
 subscriptionName 
 , 
  
 topicName 
 , 
  
 pushConfig 
 : 
  
 null 
 , 
  
 ackDeadlineSeconds 
 : 
  
 60 
 ); 
  
 // Recursively call OptimisticSubscribe to restart the subscriber 
  
 return 
  
 await 
  
 OptimisticSubscribe 
 ( 
 projectId 
 , 
  
 topicId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "errors" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
  
 "google.golang.org/grpc/codes" 
  
 "google.golang.org/grpc/status" 
 ) 
 // optimisticSubscribe shows the recommended pattern for optimistically 
 // assuming a subscription exists prior to receiving messages. 
 func 
  
 optimisticSubscribe 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topic 
 , 
  
 subscriptionName 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topic := "projects/my-project-id/topics/my-topic" 
  
 // subscription := "projects/my-project/subscriptions/my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subscriptionName 
 ) 
  
 // Receive messages for 10 seconds, which simplifies testing. 
  
 // Comment this out in production, since `Receive` should 
  
 // be used as a long running operation. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Instead of checking if the subscription exists, optimistically try to 
  
 // receive from the subscription assuming it exists. 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 _ 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got from existing subscription: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 if 
  
 st 
 , 
  
 ok 
  
 := 
  
 status 
 . 
 FromError 
 ( 
 err 
 ); 
  
 ok 
  
 { 
  
 if 
  
 st 
 . 
 Code 
 () 
  
 == 
  
 codes 
 . 
 NotFound 
  
 { 
  
 // If the subscription does not exist, then create the subscription. 
  
 subscription 
 , 
  
 err 
  
 := 
  
 client 
 . 
 SubscriptionAdminClient 
 . 
 CreateSubscription 
 ( 
 ctx 
 , 
  
& pubsubpb 
 . 
 Subscription 
 { 
  
 Name 
 : 
  
 subscriptionName 
 , 
  
 Topic 
 : 
  
 topic 
 , 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 err 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created subscription: %q\n" 
 , 
  
 subscriptionName 
 ) 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 = 
  
 client 
 . 
 Subscriber 
 ( 
 subscription 
 . 
 GetName 
 ()) 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got from new subscription: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
 && 
 ! 
 errors 
 . 
 Is 
 ( 
 err 
 , 
  
 context 
 . 
 Canceled 
 ) 
  
 { 
  
 return 
  
 err 
  
 } 
  
 } 
  
 } 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.gax.rpc. NotFoundException 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. SubscriptionAdminClient 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. PushConfig 
 
 ; 
 import 
  
 com.google.pubsub.v1. Subscription 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 OptimisticSubscribeExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 optimisticSubscribeExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 optimisticSubscribeExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing 
  
 // when the current subscriber encounters these errors. 
  
 subscriber 
 . 
 addListener 
 ( 
  
 new 
  
  Subscriber 
 
 . 
 Listener 
 () 
  
 { 
  
 public 
  
 void 
  
 failed 
 ( 
  Subscriber 
 
 . 
 State 
  
 from 
 , 
  
 Throwable 
  
 failure 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 failure 
 . 
 getStackTrace 
 ()); 
  
 if 
  
 ( 
 failure 
  
 instanceof 
  
  NotFoundException 
 
 ) 
  
 { 
  
 try 
  
 ( 
  SubscriptionAdminClient 
 
  
 subscriptionAdminClient 
  
 = 
  
  SubscriptionAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 // Create a pull subscription with default acknowledgement deadline of 10 seconds. 
  
 // The client library will automatically extend acknowledgement deadlines. 
  
  Subscription 
 
  
 subscription 
  
 = 
  
 subscriptionAdminClient 
 . 
 createSubscription 
 ( 
  
 subscriptionName 
 , 
  
 topicName 
 , 
  
  PushConfig 
 
 . 
 getDefaultInstance 
 (), 
  
 10 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Created pull subscription: " 
  
 + 
  
 subscription 
 . 
  getName 
 
 ()); 
  
 optimisticSubscribeExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 , 
  
 topicId 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 err 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Failed to create pull subscription: " 
  
 + 
  
 err 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
  
 } 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 IllegalStateException 
  
 e 
 ) 
  
 { 
  
 // Prevent an exception from being thrown if it is the expected NotFoundException 
  
 if 
  
 ( 
 ! 
 ( 
 subscriber 
 . 
 failureCause 
 () 
  
 instanceof 
  
  NotFoundException 
 
 )) 
  
 { 
  
 throw 
  
 e 
 ; 
  
 } 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 e 
 ) 
  
 { 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
  
 * 
  
 TODO 
 ( 
 developer 
 ): 
  
 Uncomment 
  
 these 
  
 variables 
  
 before 
  
 running 
  
 the 
  
 sample 
 . 
  
 */ 
 // 
  
 const 
  
 subscriptionNameOrId 
  
 = 
  
 'YOUR_SUBSCRIPTION_NAME_OR_ID' 
 ; 
 // 
  
 const 
  
 topicNameOrId 
  
 = 
  
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
  
 const 
  
 timeout 
  
 = 
  
 60 
 ; 
 // 
  
 Imports 
  
 the 
  
 Google 
  
 Cloud 
  
 client 
  
 library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 '@google-cloud/pubsub' 
 ); 
 // 
  
 Creates 
  
 a 
  
 client 
 ; 
  
 cache 
  
 this 
  
 for 
  
 further 
  
 use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 function 
  
 optimisticSubscribe 
 ( 
 subscriptionNameOrId 
 , 
  
 topicNameOrId 
 , 
  
 timeout 
 ) 
  
 { 
  
 // 
  
 Try 
  
 using 
  
 an 
  
 existing 
  
 subscription 
  
 let 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // 
  
 Create 
  
 an 
  
 event 
  
 handler 
  
 to 
  
 handle 
  
 messages 
  
 let 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 const 
  
 messageHandler 
  
 = 
  
 message 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 ` 
 Received 
  
 message 
  
 $ 
 { 
 message 
 . 
 id 
 }: 
 ` 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
\ tData 
 : 
  
 $ 
 { 
 message 
 . 
 data 
 } 
 ` 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
\ tAttributes 
 : 
  
 $ 
 { 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
  
 messageCount 
  
 += 
  
 1 
 ; 
  
 // 
  
 "Ack" 
  
 ( 
 acknowledge 
  
 receipt 
  
 of 
 ) 
  
 the 
  
 message 
  
 message 
 . 
 ack 
 (); 
  
 }; 
  
 // 
  
 Set 
  
 an 
  
 error 
  
 handler 
  
 so 
  
 that 
  
 we 
 're notified if the subscription doesn' 
 t 
  
 // 
  
 already 
  
 exist 
 . 
  
 subscription 
 . 
 on 
 ( 
 'error' 
 , 
  
 async 
  
 e 
  
 = 
>  
 { 
  
 // 
  
 Resource 
  
 Not 
  
 Found 
  
 if 
  
 ( 
 e 
 . 
 code 
  
 === 
  
 5 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 'Subscription not found, creating it' 
 ); 
  
 await 
  
 pubSubClient 
 . 
 createSubscription 
 ( 
  
 topicNameOrId 
 , 
  
 subscriptionNameOrId 
 , 
  
 ); 
  
 // 
  
 Refresh 
  
 our 
  
 subscriber 
  
 object 
  
 and 
  
 re 
 - 
 attach 
  
 the 
  
 message 
  
 handler 
 . 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 } 
  
 }); 
  
 // 
  
 Listen 
  
 for 
  
 new 
  
 messages 
  
 until 
  
 timeout 
  
 is 
  
 hit 
 ; 
  
 this 
  
 will 
  
 attempt 
  
 to 
  
 // 
  
 open 
  
 the 
  
 actual 
  
 subscriber 
  
 streams 
 . 
  
 If 
  
 it 
  
 fails 
 , 
  
 the 
  
 error 
  
 handler 
  
 // 
  
 above 
  
 will 
  
 be 
  
 called 
 . 
  
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 // 
  
 Wait 
  
 a 
  
 while 
  
 for 
  
 the 
  
 subscription 
  
 to 
  
 run 
 . 
  
 ( 
 Part 
  
 of 
  
 the 
  
 sample 
  
 only 
 . 
 ) 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
 $ 
 { 
 messageCount 
 } 
  
 message 
 ( 
 s 
 ) 
  
 received 
 . 
 ` 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 these 
 variables 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 subscriptionNameOrId 
 = 
 'YOUR_SUBSCRIPTION_NAME_OR_ID' 
 ; 
 // 
 const 
 topicNameOrId 
 = 
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
 const 
 timeout 
 = 
 60 
 ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 , 
 Message 
 , 
 StatusError 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 function 
 optimisticSubscribe 
 ( 
 subscriptionNameOrId 
 : 
 string 
 , 
 topicNameOrId 
 : 
 string 
 , 
 timeout 
 : 
 number 
 , 
 ) 
 { 
 // 
 Try 
 using 
 an 
 existing 
 subscription 
 let 
 subscription 
 = 
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
 // 
 Create 
 an 
 event 
 handler 
 to 
 handle 
 messages 
 let 
 messageCount 
 = 
 0 
 ; 
 const 
 messageHandler 
 = 
 ( 
 message 
 : 
 Message 
 ) 
 = 
> { 
 console 
 . 
 log 
 ( 
 ` 
 Received 
 message 
 $ 
 { 
 message 
 . 
 id 
 }: 
 ` 
 ); 
 console 
 . 
 log 
 ( 
 ` 
\ tData 
 : 
 $ 
 { 
 message 
 . 
 data 
 } 
 ` 
 ); 
 console 
 . 
 log 
 ( 
 ` 
\ tAttributes 
 : 
 $ 
 { 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
 messageCount 
 += 
 1 
 ; 
 // 
 "Ack" 
 ( 
 acknowledge 
 receipt 
 of 
 ) 
 the 
 message 
 message 
 . 
 ack 
 (); 
 }; 
 // 
 Set 
 an 
 error 
 handler 
 so 
 that 
 we 
 're notified if the subscription doesn' 
 t 
 // 
 already 
 exist 
 . 
 subscription 
 . 
 on 
 ( 
 'error' 
 , 
 async 
 ( 
 e 
 : 
 StatusError 
 ) 
 = 
> { 
 // 
 Resource 
 Not 
 Found 
 if 
 ( 
 e 
 . 
 code 
 === 
 5 
 ) 
 { 
 console 
 . 
 log 
 ( 
 'Subscription not found, creating it' 
 ); 
 await 
 pubSubClient 
 . 
 createSubscription 
 ( 
 topicNameOrId 
 , 
 subscriptionNameOrId 
 , 
 ); 
 // 
 Refresh 
 our 
 subscriber 
 object 
 and 
 re 
 - 
 attach 
 the 
 message 
 handler 
 . 
 subscription 
 = 
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
 messageHandler 
 ); 
 } 
 }); 
 // 
 Listen 
 for 
 new 
 messages 
 until 
 timeout 
 is 
 hit 
 ; 
 this 
 will 
 attempt 
 to 
 // 
 open 
 the 
 actual 
 subscriber 
 streams 
 . 
 If 
 it 
 fails 
 , 
 the 
 error 
 handler 
 // 
 above 
 will 
 be 
 called 
 . 
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
 messageHandler 
 ); 
 // 
 Wait 
 a 
 while 
 for 
 the 
 subscription 
 to 
 run 
 . 
 ( 
 Part 
 of 
 the 
 sample 
 only 
 . 
 ) 
 setTimeout 
 (() 
 = 
> { 
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
 messageHandler 
 ); 
 console 
 . 
 log 
 ( 
 `$ 
 { 
 messageCount 
 } 
 message 
 ( 
 s 
 ) 
 received 
 . 
 ` 
 ); 
 }, 
 timeout 
 * 
 1000 
 ); 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\Core\Exception\NotFoundException; 
 use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Optimistically subscribes to a topic 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $subscriptionId  The ID of the subscription. 
 */ 
 function optimistic_subscribe( 
 string $projectId, 
 string $topicName, 
 string $subscriptionId 
 ): void { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $subscription = $pubsub->subscription($subscriptionId); 
 try { 
 $messages = $subscription->pull(); 
 foreach ($messages as $message) { 
 printf('PubSub Message: %s' . PHP_EOL, $message->data()); 
 $subscription->acknowledge($message); 
 } 
 } catch (NotFoundException $e) { // Subscription is not found 
 printf('Exception Message: %s' . PHP_EOL, $e->getMessage()); 
 printf('StackTrace: %s' . PHP_EOL, $e->getTraceAsString()); 
 // Create subscription and retry the pull. Any messages published before subscription creation would not be received. 
 $pubsub->subscribe($subscriptionId, $topicName); 
 optimistic_subscribe($projectId, $topicName, $subscriptionId); 
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 NotFound 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 concurrent.futures 
  
 import 
 TimeoutError 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 # Number of seconds the subscriber should listen for messages 
 # timeout = 5.0 
 # topic_id = "your-topic-id" 
 # Create a subscriber client. 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 # The `subscription_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/subscriptions/{subscription_id}` 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 # Define callback to be called when a message is received. 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 # Ack message after processing it. 
 message 
 . 
  ack 
 
 () 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 try 
 : 
 # Optimistically subscribe to messages on the subscription. 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 print 
 ( 
 "Successfully subscribed until the timeout passed." 
 ) 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 except 
 NotFound 
 : 
 print 
 ( 
 f 
 "Subscription 
 { 
 subscription_path 
 } 
 not found, creating it." 
 ) 
 try 
 : 
 # If the subscription does not exist, then create it. 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 subscription 
 = 
 subscriber 
 . 
 create_subscription 
 ( 
 request 
 = 
 { 
 "name" 
 : 
 subscription_path 
 , 
 "topic" 
 : 
 topic_path 
 } 
 ) 
 if 
 subscription 
 : 
 print 
 ( 
 f 
 "Subscription 
 { 
 subscription 
 . 
 name 
 } 
 created" 
 ) 
 else 
 : 
 raise 
 ValueError 
 ( 
 "Subscription creation failed." 
 ) 
 # Subscribe on the created subscription. 
 try 
 : 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription 
 . 
 name 
 , 
 callback 
 = 
 callback 
 ) 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 except 
 Exception 
 as 
 e 
 : 
 print 
 ( 
 f 
 "Exception occurred when creating subscription and subscribing to it: 
 { 
 e 
 } 
 " 
 ) 
 except 
 Exception 
 as 
 e 
 : 
 print 
 ( 
 f 
 "Exception occurred when attempting optimistic subscribe: 
 { 
 e 
 } 
 " 
 ) 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 # Propagate exception from child threads to the main thread as soon as it is 
 # raised. Exceptions happened in the callback thread are collected in the 
 # callback thread pool and do not propagate to the main thread 
 Thread 
 . 
 abort_on_exception 
  
 = 
  
 true 
 begin 
  
 subscriber 
  
 = 
  
 pubsub 
 . 
 subscriber 
  
 subscription_id 
  
 listener 
  
 = 
  
 subscriber 
 . 
 listen 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 received_message 
 . 
 acknowledge! 
  
 end 
  
 listener 
 . 
 start 
  
 # Let the main thread sleep for 60 seconds so the thread for listening 
  
 # messages does not quit 
  
 sleep 
  
 60 
  
 listener 
 . 
 stop 
 . 
 wait! 
 rescue 
  
 Google 
 :: 
 Cloud 
 :: 
 NotFoundError 
  
 = 
>  
 e 
  
 puts 
  
 "Subscription 
 #{ 
 subscription_id 
 } 
 does not exist." 
  
 subscription 
  
 = 
  
 pubsub 
 . 
 subscription_admin 
 . 
 create_subscription 
  
 \ 
  
 name 
 : 
  
 pubsub 
 . 
 subscription_path 
 ( 
 subscription_id 
 ), 
  
 topic 
 : 
  
 pubsub 
 . 
 topic_path 
 ( 
 topic_id 
 ) 
  
 puts 
  
 "Subscription 
 #{ 
 subscription_id 
 } 
 created." 
 end 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: