Create a Kafka cluster

Create a Kafka cluster

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 createCluster 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 , 
  
 subnet 
  
 string 
 , 
  
 cpu 
 , 
  
 memoryBytes 
  
 int64 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // subnet := "projects/my-project-id/regions/us-central1/subnetworks/default" 
  
 // cpu := 3 
  
 // memoryBytes := 3221225472 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 locationPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s" 
 , 
  
 projectID 
 , 
  
 region 
 ) 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s/clusters/%s" 
 , 
  
 locationPath 
 , 
  
 clusterID 
 ) 
  
 // Memory must be between 1 GiB and 8 GiB per CPU. 
  
 capacityConfig 
  
 := 
  
& managedkafkapb 
 . 
 CapacityConfig 
 { 
  
 VcpuCount 
 : 
  
 cpu 
 , 
  
 MemoryBytes 
 : 
  
 memoryBytes 
 , 
  
 } 
  
 var 
  
 networkConfig 
  
 [] 
 * 
 managedkafkapb 
 . 
 NetworkConfig 
  
 networkConfig 
  
 = 
  
 append 
 ( 
 networkConfig 
 , 
  
& managedkafkapb 
 . 
 NetworkConfig 
 { 
  
 Subnet 
 : 
  
 subnet 
 , 
  
 }) 
  
 platformConfig 
  
 := 
  
& managedkafkapb 
 . 
 Cluster_GcpConfig 
 { 
  
 GcpConfig 
 : 
  
& managedkafkapb 
 . 
 GcpConfig 
 { 
  
 AccessConfig 
 : 
  
& managedkafkapb 
 . 
 AccessConfig 
 { 
  
 NetworkConfigs 
 : 
  
 networkConfig 
 , 
  
 }, 
  
 }, 
  
 } 
  
 rebalanceConfig 
  
 := 
  
& managedkafkapb 
 . 
 RebalanceConfig 
 { 
  
 Mode 
 : 
  
 managedkafkapb 
 . 
  RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP 
 
 , 
  
 } 
  
 cluster 
  
 := 
  
& managedkafkapb 
 . 
 Cluster 
 { 
  
 Name 
 : 
  
 clusterPath 
 , 
  
 CapacityConfig 
 : 
  
 capacityConfig 
 , 
  
 PlatformConfig 
 : 
  
 platformConfig 
 , 
  
 RebalanceConfig 
 : 
  
 rebalanceConfig 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 CreateClusterRequest 
 { 
  
 Parent 
 : 
  
 locationPath 
 , 
  
 ClusterId 
 : 
  
 clusterID 
 , 
  
 Cluster 
 : 
  
 cluster 
 , 
  
 } 
  
 op 
 , 
  
 err 
  
 := 
  
 client 
 . 
 CreateCluster 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.CreateCluster got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // The duration of this operation can vary considerably, typically taking 10-40 minutes. 
  
 resp 
 , 
  
 err 
  
 := 
  
 op 
 . 
 Wait 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "op.Wait got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created cluster: %s\n" 
 , 
  
 resp 
 . 
 Name 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.gax.longrunning. OperationFuture 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationSnapshot 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationTimedPollAlgorithm 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetryingFuture 
 
 ; 
 import 
  
 com.google.api.gax.retrying. TimedRetryAlgorithm 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. AccessConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CapacityConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Cluster 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CreateClusterRequest 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. GcpConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. LocationName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaSettings 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. NetworkConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. OperationMetadata 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. RebalanceConfig 
 
 ; 
 import 
  
 java.time.Duration 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 public 
  
 class 
 CreateCluster 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 String 
  
 subnet 
  
 = 
  
 "my-subnet" 
 ; 
  
 // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet 
  
 int 
  
 cpu 
  
 = 
  
 3 
 ; 
  
 long 
  
 memoryBytes 
  
 = 
  
 3221225472L 
 ; 
  
 // 3 GiB 
  
 createCluster 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 subnet 
 , 
  
 cpu 
 , 
  
 memoryBytes 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createCluster 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 subnet 
 , 
  
 int 
  
 cpu 
 , 
  
 long 
  
 memoryBytes 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  CapacityConfig 
 
  
 capacityConfig 
  
 = 
  
  CapacityConfig 
 
 . 
 newBuilder 
 (). 
  setVcpuCount 
 
 ( 
 cpu 
 ). 
  setMemoryBytes 
 
 ( 
 memoryBytes 
 ). 
 build 
 (); 
  
  NetworkConfig 
 
  
 networkConfig 
  
 = 
  
  NetworkConfig 
 
 . 
 newBuilder 
 (). 
  setSubnet 
 
 ( 
 subnet 
 ). 
 build 
 (); 
  
  GcpConfig 
 
  
 gcpConfig 
  
 = 
  
  GcpConfig 
 
 . 
 newBuilder 
 () 
  
 . 
 setAccessConfig 
 ( 
  AccessConfig 
 
 . 
 newBuilder 
 (). 
 addNetworkConfigs 
 ( 
 networkConfig 
 ). 
 build 
 ()) 
  
 . 
 build 
 (); 
  
  RebalanceConfig 
 
  
 rebalanceConfig 
  
 = 
  
  RebalanceConfig 
 
 . 
 newBuilder 
 () 
  
 . 
  setMode 
 
 ( 
  RebalanceConfig 
 
 . 
 Mode 
 . 
 AUTO_REBALANCE_ON_SCALE_UP 
 ) 
  
 . 
 build 
 (); 
  
  Cluster 
 
  
 cluster 
  
 = 
  
  Cluster 
 
 . 
 newBuilder 
 () 
  
 . 
 setCapacityConfig 
 ( 
 capacityConfig 
 ) 
  
 . 
 setGcpConfig 
 ( 
 gcpConfig 
 ) 
  
 . 
  setRebalanceConfig 
 
 ( 
 rebalanceConfig 
 ) 
  
 . 
 build 
 (); 
  
 // Create the settings to configure the timeout for polling operations 
  
  ManagedKafkaSettings 
 
 . 
 Builder 
  
 settingsBuilder 
  
 = 
  
  ManagedKafkaSettings 
 
 . 
 newBuilder 
 (); 
  
  TimedRetryAlgorithm 
 
  
 timedRetryAlgorithm 
  
 = 
  
  OperationTimedPollAlgorithm 
 
 . 
 create 
 ( 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setTotalTimeoutDuration 
 
 ( 
 Duration 
 . 
 ofHours 
 ( 
 1L 
 )) 
  
 . 
 build 
 ()); 
  
 settingsBuilder 
 . 
 createClusterOperationSettings 
 () 
  
 . 
 setPollingAlgorithm 
 ( 
 timedRetryAlgorithm 
 ); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
  ManagedKafkaClient 
 
 . 
 create 
 ( 
  
 settingsBuilder 
 . 
 build 
 ())) 
  
 { 
  
  CreateClusterRequest 
 
  
 request 
  
 = 
  
  CreateClusterRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
  LocationName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 ). 
 toString 
 ()) 
  
 . 
  setClusterId 
 
 ( 
 clusterId 
 ) 
  
 . 
 setCluster 
 ( 
 cluster 
 ) 
  
 . 
 build 
 (); 
  
 // The duration of this operation can vary considerably, typically taking between 10-40 
  
 // minutes. 
  
 OperationFuture<Cluster 
 , 
  
 OperationMetadata 
>  
 future 
  
 = 
  
 managedKafkaClient 
 . 
  createClusterOperationCallable 
 
 (). 
 futureCall 
 ( 
 request 
 ); 
  
 // Get the initial LRO and print details. 
  
  OperationSnapshot 
 
  
 operation 
  
 = 
  
 future 
 . 
 getInitialFuture 
 (). 
  get 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n" 
 , 
  
 operation 
 . 
  getName 
 
 (), 
  
 operation 
 . 
  isDone 
 
 (), 
  
 future 
 . 
 getMetadata 
 (). 
  get 
 
 (). 
 toString 
 ()); 
  
 while 
  
 ( 
 ! 
 future 
 . 
 isDone 
 ()) 
  
 { 
  
 // The pollingFuture gives us the most recent status of the operation 
  
 RetryingFuture<OperationSnapshot> 
  
 pollingFuture 
  
 = 
  
 future 
 . 
 getPollingFuture 
 (); 
  
  OperationSnapshot 
 
  
 currentOp 
  
 = 
  
 pollingFuture 
 . 
  getAttemptResult 
 
 (). 
  get 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Polling Operation:\nName: %s\n Done: %s\n" 
 , 
  
 currentOp 
 . 
  getName 
 
 (), 
  
 currentOp 
 . 
  isDone 
 
 ()); 
  
 } 
  
 // NOTE: future.get() blocks completion until the operation is complete (isDone =  True) 
  
  Cluster 
 
  
 response 
  
 = 
  
 future 
 . 
  get 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Created cluster: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaClient.createCluster got err: %s" 
 , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Managed Service for Apache Kafka quickstart using client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" 
 # cpu = 3 
 # memory_bytes = 3221225472 
 client 
 = 
  managedkafka_v1 
 
 . 
  ManagedKafkaClient 
 
 () 
 cluster 
 = 
  managedkafka_v1 
 
 . 
  Cluster 
 
 () 
 cluster 
 . 
 name 
 = 
 client 
 . 
  cluster_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 ) 
 cluster 
 . 
 capacity_config 
 . 
 vcpu_count 
 = 
 cpu 
 cluster 
 . 
 capacity_config 
 . 
 memory_bytes 
 = 
 memory_bytes 
 cluster 
 . 
 gcp_config 
 . 
 access_config 
 . 
 network_configs 
 = 
 [ 
  managedkafka_v1 
 
 . 
  NetworkConfig 
 
 ( 
 subnet 
 = 
 subnet 
 ) 
 ] 
 cluster 
 . 
 rebalance_config 
 . 
 mode 
 = 
 ( 
  managedkafka_v1 
 
 . 
  RebalanceConfig 
 
 . 
  Mode 
 
 . 
 AUTO_REBALANCE_ON_SCALE_UP 
 ) 
 request 
 = 
  managedkafka_v1 
 
 . 
  CreateClusterRequest 
 
 ( 
 parent 
 = 
 client 
 . 
  common_location_path 
 
 ( 
 project_id 
 , 
 region 
 ), 
 cluster_id 
 = 
 cluster_id 
 , 
 cluster 
 = 
 cluster 
 , 
 ) 
 try 
 : 
 operation 
 = 
 client 
 . 
  create_cluster 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 # The duration of this operation can vary considerably, typically taking 10-40 minutes. 
 # We can set a timeout of 3000s (50 minutes). 
 response 
 = 
 operation 
 . 
 result 
 ( 
 timeout 
 = 
 3000 
 ) 
 print 
 ( 
 "Created cluster:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 
 

Terraform

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands . For more information, see the Terraform provider reference documentation .

  resource 
  
 "google_managed_kafka_cluster" 
  
 "default" 
  
 { 
  
 project 
  
 = 
  
 data.google_project.default.project_id 
 # Replace this with your project ID in quotes 
  
 cluster_id 
  
 = 
  
 "my-cluster-id" 
  
 location 
  
 = 
  
 "us-central1" 
  
 capacity_config 
  
 { 
  
 vcpu_count 
  
 = 
  
 3 
  
 memory_bytes 
  
 = 
  
 3221225472 
  
 } 
  
 gcp_config 
  
 { 
  
 access_config 
  
 { 
  
 network_configs 
  
 { 
  
 subnet 
  
 = 
  
 google_compute_subnetwork.default.id 
  
 } 
  
 } 
  
 } 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: