Create topic with Confluent Cloud ingestion

Create topic with Confluent Cloud ingestion

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 namespace 
  
 pubsub_admin 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub_admin 
 ; 
 []( 
 pubsub_admin 
 :: 
 TopicAdminClient 
  
 client 
 , 
  
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 bootstrap_server 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 cluster_id 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 confluent_topic 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 identity_pool_id 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 gcp_service_account 
 ) 
  
 { 
  
 google 
 :: 
 pubsub 
 :: 
 v1 
 :: 
 Topic 
  
 request 
 ; 
  
 request 
 . 
 set_name 
 ( 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )). 
 FullName 
 ()); 
  
 auto 
 * 
  
 confluent_cloud 
  
 = 
  
 request 
 . 
 mutable_ingestion_data_source_settings 
 () 
  
 - 
> mutable_confluent_cloud 
 (); 
  
 confluent_cloud 
 - 
> set_bootstrap_server 
 ( 
 bootstrap_server 
 ); 
  
 confluent_cloud 
 - 
> set_cluster_id 
 ( 
 cluster_id 
 ); 
  
 confluent_cloud 
 - 
> set_topic 
 ( 
 confluent_topic 
 ); 
  
 confluent_cloud 
 - 
> set_identity_pool_id 
 ( 
 identity_pool_id 
 ); 
  
 confluent_cloud 
 - 
> set_gcp_service_account 
 ( 
 gcp_service_account 
 ); 
  
 auto 
  
 topic 
  
 = 
  
 client 
 . 
 CreateTopic 
 ( 
 request 
 ); 
  
 // Note that kAlreadyExists is a possible error when the library retries. 
  
 if 
  
 ( 
 topic 
 . 
 status 
 (). 
 code 
 () 
  
 == 
  
 google 
 :: 
 cloud 
 :: 
 StatusCode 
 :: 
 kAlreadyExists 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "The topic already exists 
 \n 
 " 
 ; 
  
 return 
 ; 
  
 } 
  
 if 
  
 ( 
 ! 
 topic 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 topic 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "The topic was successfully created: " 
 << 
 topic 
 - 
> DebugString 
 () 
 << 
 " 
 \n 
 " 
 ; 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 public 
  
 class 
  
 CreateTopicWithConfluentCloudIngestionSample 
 { 
  
 public 
  
 Topic 
  
 CreateTopicWithConfluentCloudIngestion 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 string 
  
 bootstrapServer 
 , 
  
 string 
  
 clusterId 
 , 
  
 string 
  
 confluentTopic 
 , 
  
 string 
  
 identityPoolId 
 , 
  
 string 
  
 gcpServiceAccount 
 ) 
  
 { 
  
 // Define settings for Confluent Cloud ingestion 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
 new 
  
  IngestionDataSourceSettings 
 
  
 { 
  
 ConfluentCloud 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 ConfluentCloud 
  
 { 
  
 BootstrapServer 
  
 = 
  
 bootstrapServer 
 , 
  
 ClusterId 
  
 = 
  
 clusterId 
 , 
  
 Topic 
  
 = 
  
 confluentTopic 
 , 
  
 IdentityPoolId 
  
 = 
  
 identityPoolId 
 , 
  
 GcpServiceAccount 
  
 = 
  
 gcpServiceAccount 
  
 } 
  
 }; 
  
  PublisherServiceApiClient 
 
  
 publisher 
  
 = 
  
  PublisherServiceApiClient 
 
 . 
  Create 
 
 (); 
  
  Topic 
 
  
 topic 
  
 = 
  
 new 
  
  Topic 
 
 () 
  
 { 
  
 Name 
  
 = 
  
  TopicName 
 
 . 
  FormatProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ), 
  
 IngestionDataSourceSettings 
  
 = 
  
 ingestionDataSourceSettings 
  
 }; 
  
  Topic 
 
  
 createdTopic 
  
 = 
  
 publisher 
 . 
  CreateTopic 
 
 ( 
 topic 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Topic {topic. Name 
} created." 
 ); 
  
 return 
  
 createdTopic 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
 ) 
 func 
  
 createTopicWithConfluentCloudIngestion 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
 , 
  
 bootstrapServer 
 , 
  
 clusterID 
 , 
  
 confluentTopic 
 , 
  
 poolID 
 , 
  
 gcpSA 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // // Confluent Cloud ingestion settings. 
  
 // bootstrapServer := "bootstrap-server" 
  
 // clusterID := "cluster-id" 
  
 // confluentTopic := "confluent-topic" 
  
 // poolID := "identity-pool-id" 
  
 // gcpSA := "gcp-service-account" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 topicpb 
  
 := 
  
& pubsubpb 
 . 
 Topic 
 { 
  
 Name 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 topicID 
 ), 
  
 IngestionDataSourceSettings 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings 
 { 
  
 Source 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_ConfluentCloud_ 
 { 
  
 ConfluentCloud 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_ConfluentCloud 
 { 
  
 BootstrapServer 
 : 
  
 bootstrapServer 
 , 
  
 ClusterId 
 : 
  
 clusterID 
 , 
  
 Topic 
 : 
  
 confluentTopic 
 , 
  
 IdentityPoolId 
 : 
  
 poolID 
 , 
  
 GcpServiceAccount 
 : 
  
 gcpSA 
 , 
  
 }, 
  
 }, 
  
 }, 
  
 } 
  
 topic 
 , 
  
 err 
  
 := 
  
 client 
 . 
 TopicAdminClient 
 . 
 CreateTopic 
 ( 
 ctx 
 , 
  
 topicpb 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "CreateTopic: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created topic with Confluent Cloud ingestion: %v\n" 
 , 
  
 topic 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1. TopicAdminClient 
 
 ; 
 import 
  
 com.google.pubsub.v1. IngestionDataSourceSettings 
 
 ; 
 import 
  
 com.google.pubsub.v1. Topic 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 public 
  
 class 
 CreateTopicWithConfluentCloudIngestionExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 // Confluent Cloud ingestion settings. 
  
 String 
  
 bootstrapServer 
  
 = 
  
 "bootstrap-server" 
 ; 
  
 String 
  
 clusterId 
  
 = 
  
 "cluster-id" 
 ; 
  
 String 
  
 confluentTopic 
  
 = 
  
 "confluent-topic" 
 ; 
  
 String 
  
 identityPoolId 
  
 = 
  
 "identity-pool-id" 
 ; 
  
 String 
  
 gcpServiceAccount 
  
 = 
  
 "gcp-service-account" 
 ; 
  
 createTopicWithConfluentCloudIngestionExample 
 ( 
  
 projectId 
 , 
  
 topicId 
 , 
  
 bootstrapServer 
 , 
  
 clusterId 
 , 
  
 confluentTopic 
 , 
  
 identityPoolId 
 , 
  
 gcpServiceAccount 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createTopicWithConfluentCloudIngestionExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 , 
  
 String 
  
 bootstrapServer 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 confluentTopic 
 , 
  
 String 
  
 identityPoolId 
 , 
  
 String 
  
 gcpServiceAccount 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  TopicAdminClient 
 
  
 topicAdminClient 
  
 = 
  
  TopicAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  IngestionDataSourceSettings 
 
 . 
  ConfluentCloud 
 
  
 confluentCloud 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 ConfluentCloud 
 . 
 newBuilder 
 () 
  
 . 
  setBootstrapServer 
 
 ( 
 bootstrapServer 
 ) 
  
 . 
 setClusterId 
 ( 
 clusterId 
 ) 
  
 . 
 setTopic 
 ( 
 confluentTopic 
 ) 
  
 . 
  setIdentityPoolId 
 
 ( 
 identityPoolId 
 ) 
  
 . 
 setGcpServiceAccount 
 ( 
 gcpServiceAccount 
 ) 
  
 . 
 build 
 (); 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 newBuilder 
 (). 
  setConfluentCloud 
 
 ( 
 confluentCloud 
 ). 
 build 
 (); 
  
  Topic 
 
  
 topic 
  
 = 
  
 topicAdminClient 
 . 
 createTopic 
 ( 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
 topicName 
 . 
  toString 
 
 ()) 
  
 . 
  setIngestionDataSourceSettings 
 
 ( 
 ingestionDataSourceSettings 
 ) 
  
 . 
 build 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Created topic with Confluent Cloud ingestion settings: " 
  
 + 
  
 topic 
 . 
 getAllFields 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
  
 * 
  
 TODO 
 ( 
 developer 
 ): 
  
 Uncomment 
  
 these 
  
 variables 
  
 before 
  
 running 
  
 the 
  
 sample 
 . 
  
 */ 
 // 
  
 const 
  
 topicNameOrId 
  
 = 
  
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
  
 const 
  
 bootstrapServer 
  
 = 
  
 'url:port' 
 ; 
 // 
  
 const 
  
 clusterId 
  
 = 
  
 'YOUR_CLUSTER_ID' 
 ; 
 // 
  
 const 
  
 confluentTopic 
  
 = 
  
 'YOUR_CONFLUENT_TOPIC' 
 ; 
 // 
  
 const 
  
 identityPoolId 
  
 = 
  
 'pool-ID' 
 ; 
 // 
  
 const 
  
 gcpServiceAccount 
  
 = 
  
 'ingestion-account@...' 
 ; 
 // 
  
 Imports 
  
 the 
  
 Google 
  
 Cloud 
  
 client 
  
 library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 '@google-cloud/pubsub' 
 ); 
 // 
  
 Creates 
  
 a 
  
 client 
 ; 
  
 cache 
  
 this 
  
 for 
  
 further 
  
 use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 createTopicWithConfluentCloudIngestion 
 ( 
  
 topicNameOrId 
 , 
  
 bootstrapServer 
 , 
  
 clusterId 
 , 
  
 confluentTopic 
 , 
  
 identityPoolId 
 , 
  
 gcpServiceAccount 
 , 
 ) 
  
 { 
  
 // 
  
 Creates 
  
 a 
  
 new 
  
 topic 
  
 with 
  
 Confluent 
  
 Cloud 
  
 ingestion 
 . 
  
 await 
  
 pubSubClient 
 . 
 createTopic 
 ({ 
  
 name 
 : 
  
 topicNameOrId 
 , 
  
 ingestionDataSourceSettings 
 : 
  
 { 
  
 confluentCloud 
 : 
  
 { 
  
 bootstrapServer 
 , 
  
 clusterId 
 , 
  
 topic 
 : 
  
 confluentTopic 
 , 
  
 identityPoolId 
 , 
  
 gcpServiceAccount 
 , 
  
 }, 
  
 }, 
  
 }); 
  
 console 
 . 
 log 
 ( 
 ` 
 Topic 
  
 $ 
 { 
 topicNameOrId 
 } 
  
 created 
  
 with 
  
 Confluent 
  
 Cloud 
  
 ingestion 
 . 
 ` 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 these 
 variables 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 topicNameOrId 
 = 
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
 const 
 bootstrapServer 
 = 
 'url:port' 
 ; 
 // 
 const 
 clusterId 
 = 
 'YOUR_CLUSTER_ID' 
 ; 
 // 
 const 
 confluentTopic 
 = 
 'YOUR_CONFLUENT_TOPIC' 
 ; 
 // 
 const 
 identityPoolId 
 = 
 'pool-ID' 
 ; 
 // 
 const 
 gcpServiceAccount 
 = 
 'ingestion-account@...' 
 ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 async 
 function 
 createTopicWithConfluentCloudIngestion 
 ( 
 topicNameOrId 
 : 
 string 
 , 
 bootstrapServer 
 : 
 string 
 , 
 clusterId 
 : 
 string 
 , 
 confluentTopic 
 : 
 string 
 , 
 identityPoolId 
 : 
 string 
 , 
 gcpServiceAccount 
 : 
 string 
 , 
 ) 
 { 
 // 
 Creates 
 a 
 new 
 topic 
 with 
 Confluent 
 Cloud 
 ingestion 
 . 
 await 
 pubSubClient 
 . 
 createTopic 
 ({ 
 name 
 : 
 topicNameOrId 
 , 
 ingestionDataSourceSettings 
 : 
 { 
 confluentCloud 
 : 
 { 
 bootstrapServer 
 , 
 clusterId 
 , 
 topic 
 : 
 confluentTopic 
 , 
 identityPoolId 
 , 
 gcpServiceAccount 
 , 
 }, 
 }, 
 }); 
 console 
 . 
 log 
 ( 
 ` 
 Topic 
 $ 
 { 
 topicNameOrId 
 } 
 created 
 with 
 Confluent 
 Cloud 
 ingestion 
 . 
 ` 
 ); 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Creates a Pub/Sub topic with Confluent Cloud ingestion. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $bootstrapServer  The address of the bootstrap server. The format is url:port. 
 * @param string $clusterId  The id of the cluster. 
 * @param string $confluentTopic  The name of the topic in the Confluent Cloud cluster that Pub/Sub will import from. 
 * @param string $identityPoolId  The id of the identity pool to be used for Federated Identity authentication with Confluent Cloud. 
 * @param string $gcpServiceAccount  The GCP service account to be used for Federated Identity authentication with identity_pool_id. 
 */ 
 function create_topic_with_confluent_cloud_ingestion( 
 string $projectId, 
 string $topicName, 
 string $bootstrapServer, 
 string $clusterId, 
 string $confluentTopic, 
 string $identityPoolId, 
 string $gcpServiceAccount 
 ): void { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $topic = $pubsub->createTopic($topicName, [ 
 'ingestionDataSourceSettings' => [ 
 'confluent_cloud' => [ 
 'bootstrap_server' => $bootstrapServer, 
 'cluster_id' => $clusterId, 
 'topic' => $confluentTopic, 
 'identity_pool_id' => $identityPoolId, 
 'gcp_service_account' => $gcpServiceAccount 
 ] 
 ] 
 ]); 
 printf('Topic created: %s' . PHP_EOL, $topic->name()); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 google.pubsub_v1.types 
  
 import 
  Topic 
 
 from 
  
 google.pubsub_v1.types 
  
 import 
  IngestionDataSourceSettings 
 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # bootstrap_server = "your-bootstrap-server" 
 # cluster_id = "your-cluster-id" 
 # confluent_topic = "your-confluent-topic" 
 # identity_pool_id = "your-identity-pool-id" 
 # gcp_service_account = "your-gcp-service-account" 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 request 
 = 
 Topic 
 ( 
 name 
 = 
 topic_path 
 , 
 ingestion_data_source_settings 
 = 
 IngestionDataSourceSettings 
 ( 
 confluent_cloud 
 = 
  IngestionDataSourceSettings 
 
 . 
  ConfluentCloud 
 
 ( 
 bootstrap_server 
 = 
 bootstrap_server 
 , 
 cluster_id 
 = 
 cluster_id 
 , 
 topic 
 = 
 confluent_topic 
 , 
 identity_pool_id 
 = 
 identity_pool_id 
 , 
 gcp_service_account 
 = 
 gcp_service_account 
 , 
 ) 
 ), 
 ) 
 topic 
 = 
 publisher 
 . 
 create_topic 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Created topic: 
 { 
 topic 
 . 
 name 
 } 
 with Confluent Cloud Ingestion Settings" 
 ) 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 # bootstrap_server = "bootstrap-server-id.us-south1.gcp.confluent.cloud:9092" 
 # cluster_id = "cluster-id" 
 # confluent_topic = "confluent-topic-name" 
 # identity_pool_id = "identity-pool-id" 
 # gcp_service_account = "service-account@project.iam.gserviceaccount.com" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 topic_admin 
  
 = 
  
 pubsub 
 . 
 topic_admin 
 topic 
  
 = 
  
 topic_admin 
 . 
 create_topic 
  
 name 
 : 
  
 pubsub 
 . 
 topic_path 
 ( 
 topic_id 
 ), 
  
 ingestion_data_source_settings 
 : 
  
 { 
  
 confluent_cloud 
 : 
  
 { 
  
 bootstrap_server 
 : 
  
 bootstrap_server 
 , 
  
 cluster_id 
 : 
  
 cluster_id 
 , 
  
 topic 
 : 
  
 confluent_topic 
 , 
  
 identity_pool_id 
 : 
  
 identity_pool_id 
 , 
  
 gcp_service_account 
 : 
  
 gcp_service_account 
  
 } 
  
 } 
 puts 
  
 "Topic with Confluent Cloud Ingestion 
 #{ 
 topic 
 . 
 name 
 } 
 created." 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: