Change the type of a topic

Change the type of a topic between ingestion and non-ingestion

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 namespace 
  
 pubsub_admin 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub_admin 
 ; 
 []( 
 pubsub_admin 
 :: 
 TopicAdminClient 
  
 client 
 , 
  
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 , 
  
 std 
 :: 
 string 
  
 stream_arn 
 , 
  
 std 
 :: 
 string 
  
 consumer_arn 
 , 
  
 std 
 :: 
 string 
  
 aws_role_arn 
 , 
  
 std 
 :: 
 string 
  
 gcp_service_account 
 ) 
  
 { 
  
 google 
 :: 
 pubsub 
 :: 
 v1 
 :: 
 UpdateTopicRequest 
  
 request 
 ; 
  
 request 
 . 
 mutable_topic 
 () 
 - 
> set_name 
 ( 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )). 
 FullName 
 ()); 
  
 auto 
 * 
  
 aws_kinesis 
  
 = 
  
 request 
 . 
 mutable_topic 
 () 
  
 - 
> mutable_ingestion_data_source_settings 
 () 
  
 - 
> mutable_aws_kinesis 
 (); 
  
 aws_kinesis 
 - 
> set_stream_arn 
 ( 
 stream_arn 
 ); 
  
 aws_kinesis 
 - 
> set_consumer_arn 
 ( 
 consumer_arn 
 ); 
  
 aws_kinesis 
 - 
> set_aws_role_arn 
 ( 
 aws_role_arn 
 ); 
  
 aws_kinesis 
 - 
> set_gcp_service_account 
 ( 
 gcp_service_account 
 ); 
  
 * 
 request 
 . 
 mutable_update_mask 
 () 
 - 
> add_paths 
 () 
  
 = 
  
 "ingestion_data_source_settings" 
 ; 
  
 auto 
  
 topic 
  
 = 
  
 client 
 . 
 UpdateTopic 
 ( 
 request 
 ); 
  
 if 
  
 ( 
 ! 
 topic 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 topic 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "The topic was successfully updated: " 
 << 
 topic 
 - 
> DebugString 
 () 
 << 
 " 
 \n 
 " 
 ; 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Google.Protobuf.WellKnownTypes 
 
 ; 
 using 
  
 System 
 ; 
 public 
  
 class 
  
 UpdateTopicTypeSample 
 { 
  
 public 
  
 Topic 
  
 UpdateTopicType 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 string 
  
 streamArn 
 , 
  
 string 
  
 consumerArn 
 , 
  
 string 
  
 awsRoleArn 
 , 
  
 string 
  
 gcpServiceAccount 
 ) 
  
 { 
  
 // Define settings for Kinesis ingestion 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
 new 
  
  IngestionDataSourceSettings 
 
  
 { 
  
 AwsKinesis 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 AwsKinesis 
  
 { 
  
 AwsRoleArn 
  
 = 
  
 awsRoleArn 
 , 
  
 ConsumerArn 
  
 = 
  
 consumerArn 
 , 
  
 GcpServiceAccount 
  
 = 
  
 gcpServiceAccount 
 , 
  
 StreamArn 
  
 = 
  
 streamArn 
  
 } 
  
 }; 
  
 // Construct Topic with Kinesis ingestion settings 
  
  Topic 
 
  
 topic 
  
 = 
  
 new 
  
  Topic 
 
 () 
  
 { 
  
 Name 
  
 = 
  
  TopicName 
 
 . 
  FormatProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ), 
  
 IngestionDataSourceSettings 
  
 = 
  
 ingestionDataSourceSettings 
  
 }; 
  
  PublisherServiceApiClient 
 
  
 client 
  
 = 
  
  PublisherServiceApiClient 
 
 . 
  Create 
 
 (); 
  
  UpdateTopicRequest 
 
  
 updateTopicRequest 
  
 = 
  
 new 
  
  UpdateTopicRequest 
 
  
 { 
  
 Topic 
  
 = 
  
 topic 
 , 
  
 //Construct a field mask to indicate which field to update in the topic. 
  
 UpdateMask 
  
 = 
  
 new 
  
  FieldMask 
 
  
 { 
  
 Paths 
  
 = 
  
 { 
  
 "ingestion_data_source_settings" 
  
 } 
  
 } 
  
 }; 
  
  Topic 
 
  
 updatedTopic 
  
 = 
  
 client 
 . 
  UpdateTopic 
 
 ( 
 updateTopicRequest 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Topic {topic. Name 
} updated." 
 ); 
  
 return 
  
 updatedTopic 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
  
 "google.golang.org/protobuf/types/known/fieldmaskpb" 
 ) 
 func 
  
 updateTopicType 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topic 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topic := "projects/my-project-id/topics/my-topic" 
  
 streamARN 
  
 := 
  
 "stream-arn" 
  
 consumerARN 
  
 := 
  
 "consumer-arn" 
  
 awsRoleARN 
  
 := 
  
 "aws-role-arn" 
  
 gcpServiceAccount 
  
 := 
  
 "gcp-service-account" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 pbTopic 
  
 := 
  
& pubsubpb 
 . 
 Topic 
 { 
  
 Name 
 : 
  
 topic 
 , 
  
 IngestionDataSourceSettings 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings 
 { 
  
 Source 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_AwsKinesis_ 
 { 
  
 AwsKinesis 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_AwsKinesis 
 { 
  
 StreamArn 
 : 
  
 streamARN 
 , 
  
 ConsumerArn 
 : 
  
 consumerARN 
 , 
  
 AwsRoleArn 
 : 
  
 awsRoleARN 
 , 
  
 GcpServiceAccount 
 : 
  
 gcpServiceAccount 
 , 
  
 }, 
  
 }, 
  
 }, 
  
 } 
  
 updateReq 
  
 := 
  
& pubsubpb 
 . 
 UpdateTopicRequest 
 { 
  
 Topic 
 : 
  
 pbTopic 
 , 
  
 UpdateMask 
 : 
  
& fieldmaskpb 
 . 
 FieldMask 
 { 
  
 Paths 
 : 
  
 [] 
 string 
 { 
 "ingestion_data_source_settings" 
 }, 
  
 }, 
  
 } 
  
 topicCfg 
 , 
  
 err 
  
 := 
  
 client 
 . 
 TopicAdminClient 
 . 
 UpdateTopic 
 ( 
 ctx 
 , 
  
 updateReq 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "topic.Update: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Topic updated with kinesis source: %v\n" 
 , 
  
 topicCfg 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1. TopicAdminClient 
 
 ; 
 import 
  
 com.google.protobuf. FieldMask 
 
 ; 
 import 
  
 com.google.pubsub.v1. IngestionDataSourceSettings 
 
 ; 
 import 
  
 com.google.pubsub.v1. Topic 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 com.google.pubsub.v1. UpdateTopicRequest 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 public 
  
 class 
 UpdateTopicTypeExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 // Kinesis ingestion settings. 
  
 String 
  
 streamArn 
  
 = 
  
 "stream-arn" 
 ; 
  
 String 
  
 consumerArn 
  
 = 
  
 "consumer-arn" 
 ; 
  
 String 
  
 awsRoleArn 
  
 = 
  
 "aws-role-arn" 
 ; 
  
 String 
  
 gcpServiceAccount 
  
 = 
  
 "gcp-service-account" 
 ; 
  
 UpdateTopicTypeExample 
 . 
 updateTopicTypeExample 
 ( 
  
 projectId 
 , 
  
 topicId 
 , 
  
 streamArn 
 , 
  
 consumerArn 
 , 
  
 awsRoleArn 
 , 
  
 gcpServiceAccount 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 updateTopicTypeExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 , 
  
 String 
  
 streamArn 
 , 
  
 String 
  
 consumerArn 
 , 
  
 String 
  
 awsRoleArn 
 , 
  
 String 
  
 gcpServiceAccount 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  TopicAdminClient 
 
  
 topicAdminClient 
  
 = 
  
  TopicAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  IngestionDataSourceSettings 
 
 . 
  AwsKinesis 
 
  
 awsKinesis 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 AwsKinesis 
 . 
 newBuilder 
 () 
  
 . 
 setStreamArn 
 ( 
 streamArn 
 ) 
  
 . 
  setConsumerArn 
 
 ( 
 consumerArn 
 ) 
  
 . 
 setAwsRoleArn 
 ( 
 awsRoleArn 
 ) 
  
 . 
 setGcpServiceAccount 
 ( 
 gcpServiceAccount 
 ) 
  
 . 
 build 
 (); 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 newBuilder 
 (). 
  setAwsKinesis 
 
 ( 
 awsKinesis 
 ). 
 build 
 (); 
  
 // Construct the topic with Kinesis ingestion settings. 
  
  Topic 
 
  
 topic 
  
 = 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
 topicName 
 . 
  toString 
 
 ()) 
  
 . 
  setIngestionDataSourceSettings 
 
 ( 
 ingestionDataSourceSettings 
 ) 
  
 . 
 build 
 (); 
  
 // Construct a field mask to indicate which field to update in the topic. 
  
  FieldMask 
 
  
 updateMask 
  
 = 
  
  FieldMask 
 
 . 
 newBuilder 
 (). 
  addPaths 
 
 ( 
 "ingestion_data_source_settings" 
 ). 
 build 
 (); 
  
  UpdateTopicRequest 
 
  
 request 
  
 = 
  
  UpdateTopicRequest 
 
 . 
 newBuilder 
 (). 
 setTopic 
 ( 
 topic 
 ). 
 setUpdateMask 
 ( 
 updateMask 
 ). 
 build 
 (); 
  
  Topic 
 
  
 response 
  
 = 
  
 topicAdminClient 
 . 
 updateTopic 
 ( 
 request 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Updated topic with Kinesis ingestion settings: " 
  
 + 
  
 response 
 . 
 getAllFields 
 ()); 
  
 } 
  
 } 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Changes the type of a Pub/Sub topic. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $streamArn  The Kinesis stream ARN to ingest data from. 
 * @param string $consumerArn  The Kinesis consumer ARN to used for ingestion in Enhanced Fan-Out mode. The consumer must be already created and ready to be used. 
 * @param string $awsRoleArn  AWS role ARN to be used for Federated Identity authentication with Kinesis. Check the Pub/Sub docs for how to set up this role and the required permissions that need to be attached to it. 
 * @param string $gcpServiceAccount  The GCP service account to be used for Federated Identity authentication with Kinesis (via a AssumeRoleWithWebIdentity call for the provided role). The $awsRoleArn must be set up with accounts.google.com:sub equals to this service account number. 
 */ 
 function update_topic_type( 
 string $projectId, 
 string $topicName, 
 string $streamArn, 
 string $consumerArn, 
 string $awsRoleArn, 
 string $gcpServiceAccount 
 ): void { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $topic = $pubsub->topic($topicName); 
 $topic->update([ 
 'ingestionDataSourceSettings' => [ 
 'aws_kinesis' => [ 
 'stream_arn' => $streamArn, 
 'consumer_arn' => $consumerArn, 
 'aws_role_arn' => $awsRoleArn, 
 'gcp_service_account' => $gcpServiceAccount 
 ] 
 ] 
 ], [ 
 'updateMask' => [ 
 'ingestionDataSourceSettings' 
 ] 
 ]); 
 printf('Topic updated: %s' . PHP_EOL, $topic->name()); 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 # stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/stream-name" 
 # consumer_arn = 
 # "arn:aws:kinesis:us-west-2:111111111111:stream/x/consumer/y:1111111111" 
 # aws_role_arn = "arn:aws:iam::111111111111:role/role-name" 
 # gcp_service_account = "service-account@project.iam.gserviceaccount.com" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 topic_admin 
  
 = 
  
 pubsub 
 . 
 topic_admin 
 ingestion_data_source_settings 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
 PubSub 
 :: 
 V1 
 :: 
 IngestionDataSourceSettings 
 . 
 new 
  
 \ 
  
 aws_kinesis 
 : 
  
 { 
  
 stream_arn 
 : 
  
 stream_arn 
 , 
  
 consumer_arn 
 : 
  
 consumer_arn 
 , 
  
 aws_role_arn 
 : 
  
 aws_role_arn 
 , 
  
 gcp_service_account 
 : 
  
 gcp_service_account 
  
 } 
 topic 
  
 = 
  
 topic_admin 
 . 
 get_topic 
  
 topic 
 : 
  
 pubsub 
 . 
 topic_path 
 ( 
 topic_id 
 ) 
 topic 
 . 
 ingestion_data_source_settings 
  
 = 
  
 ingestion_data_source_settings 
 topic 
  
 = 
  
 topic_admin 
 . 
 update_topic 
  
 topic 
 : 
  
 topic 
 , 
  
 update_mask 
 : 
  
 { 
  
 paths 
 : 
  
 [ 
 "ingestion_data_source_settings" 
 ] 
  
 } 
 puts 
  
 "Topic 
 #{ 
 topic 
 . 
 name 
 } 
 updated." 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: