Create a Kafka topic

Create a Kafka topic

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 createTopic 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 , 
  
 topicID 
  
 string 
 , 
  
 partitionCount 
 , 
  
 replicationFactor 
  
 int32 
 , 
  
 configs 
  
 map 
 [ 
 string 
 ] 
 string 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // topicID := "my-topic" 
  
 // partitionCount := 10 
  
 // replicationFactor := 3 
  
 // configs := map[string]string{"min.insync.replicas":"1"} 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/clusters/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 ) 
  
 topicPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s/topics/%s" 
 , 
  
 clusterPath 
 , 
  
 topicID 
 ) 
  
 topicConfig 
  
 := 
  
& managedkafkapb 
 . 
 Topic 
 { 
  
 Name 
 : 
  
 topicPath 
 , 
  
 PartitionCount 
 : 
  
 partitionCount 
 , 
  
 ReplicationFactor 
 : 
  
 replicationFactor 
 , 
  
 Configs 
 : 
  
 configs 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 CreateTopicRequest 
 { 
  
 Parent 
 : 
  
 clusterPath 
 , 
  
 TopicId 
 : 
  
 topicID 
 , 
  
 Topic 
 : 
  
 topicConfig 
 , 
  
 } 
  
 topic 
 , 
  
 err 
  
 := 
  
 client 
 . 
 CreateTopic 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.CreateTopic got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created topic: %s\n" 
 , 
  
 topic 
 . 
 Name 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.gax.rpc. ApiException 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ClusterName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CreateTopicRequest 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Topic 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 public 
  
 class 
 CreateTopic 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "my-topic" 
 ; 
  
 int 
  
 partitionCount 
  
 = 
  
 100 
 ; 
  
 int 
  
 replicationFactor 
  
 = 
  
 3 
 ; 
  
 Map<String 
 , 
  
 String 
>  
 configs 
  
 = 
  
 new 
  
 HashMap<String 
 , 
  
 String 
> () 
  
 { 
  
 { 
  
 put 
 ( 
 "min.insync.replicas" 
 , 
  
 "2" 
 ); 
  
 } 
  
 }; 
  
 createTopic 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 topicId 
 , 
  
 partitionCount 
 , 
  
 replicationFactor 
 , 
  
 configs 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createTopic 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 topicId 
 , 
  
 int 
  
 partitionCount 
 , 
  
 int 
  
 replicationFactor 
 , 
  
 Map<String 
 , 
  
 String 
>  
 configs 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  Topic 
 
  
 topic 
  
 = 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 topicId 
 ). 
 toString 
 ()) 
  
 . 
  setPartitionCount 
 
 ( 
 partitionCount 
 ) 
  
 . 
  setReplicationFactor 
 
 ( 
 replicationFactor 
 ) 
  
 . 
 putAllConfigs 
 ( 
 configs 
 ) 
  
 . 
 build 
 (); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
  ManagedKafkaClient 
 
 . 
 create 
 ()) 
  
 { 
  
  CreateTopicRequest 
 
  
 request 
  
 = 
  
  CreateTopicRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
  ClusterName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 ). 
 toString 
 ()) 
  
 . 
  setTopicId 
 
 ( 
 topicId 
 ) 
  
 . 
 setTopic 
 ( 
 topic 
 ) 
  
 . 
 build 
 (); 
  
 // This operation is being handled synchronously. 
  
  Topic 
 
  
 response 
  
 = 
  
 managedKafkaClient 
 . 
 createTopic 
 ( 
 request 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Created topic: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
  ApiException 
 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaClient.createTopic got err: %s" 
 , 
  
 e 
 . 
  getMessage 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 AlreadyExists 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # topic_id = "my-topic" 
 # partition_count = 10 
 # replication_factor = 3 
 # configs = {"min.insync.replicas": "1"} 
 client 
 = 
  managedkafka_v1 
 
 . 
  ManagedKafkaClient 
 
 () 
 topic 
 = 
  managedkafka_v1 
 
 . 
  Topic 
 
 () 
 topic 
 . 
 name 
 = 
 client 
 . 
  topic_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 , 
 topic_id 
 ) 
 topic 
 . 
 partition_count 
 = 
 partition_count 
 topic 
 . 
 replication_factor 
 = 
 replication_factor 
 # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs 
 topic 
 . 
 configs 
 = 
 configs 
 request 
 = 
  managedkafka_v1 
 
 . 
  CreateTopicRequest 
 
 ( 
 parent 
 = 
 client 
 . 
  cluster_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 ), 
 topic_id 
 = 
 topic_id 
 , 
 topic 
 = 
 topic 
 , 
 ) 
 try 
 : 
 response 
 = 
 client 
 . 
  create_topic 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 "Created topic:" 
 , 
 response 
 . 
 name 
 ) 
 except 
 AlreadyExists 
 as 
 e 
 : 
 print 
 ( 
 f 
 "Failed to create topic 
 { 
 topic 
 . 
 name 
 } 
 with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 
 

Terraform

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands . For more information, see the Terraform provider reference documentation .

  resource 
  
 "google_managed_kafka_topic" 
  
 "default" 
  
 { 
  
 project 
  
 = 
  
 data.google_project.default.project_id 
 # Replace this with your project ID in quotes 
  
 topic_id 
  
 = 
  
 "my-topic-id" 
  
 cluster 
  
 = 
  
 google_managed_kafka_cluster.default.cluster_id 
  
 location 
  
 = 
  
 "us-central1" 
  
 partition_count 
  
 = 
  
 2 
  
 replication_factor 
  
 = 
  
 3 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: