Create a Google Cloud Managed Service for Apache Kafka cluster

A Managed Service for Apache Kafka cluster provides an environment for storing and processing streams of messages organized into topics.

To create a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to create a cluster.

Before you begin

Verify that you are familiar with the following:

Required roles and permissions to create a cluster

To get the permissions that you need to create a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor ( roles/managedkafka.clusterEditor ) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to create a cluster. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to create a cluster:

  • Create a cluster: managedkafka.clusters.create

You might also be able to get these permissions with custom roles or other predefined roles .

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Managed Service for Apache Kafka clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Managed Service for Apache Kafka predefined roles .

Properties of a Managed Service for Apache Kafka cluster

When you create or update a Managed Service for Apache Kafka cluster, you must specify the following properties.

Cluster name

The name or ID of the Managed Service for Apache Kafka cluster that you are creating. For guidelines on how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource . The name of a cluster is immutable.

Location

The location where you are creating the cluster. The location must be one of the supported Google Cloud regions. The location of a cluster cannot be changed later. For a list of available locations, see Managed Service for Apache Kafka locations .

Capacity configuration

Capacity configuration requires you to configure the number of vCPUs and the amount of memory for your Kafka setup. For more information on how to configure the capacity of a cluster, see Plan your Kafka cluster size .

The following are the properties for capacity configuration:

  • vCPUs: The number of vCPUs in the cluster. At 3 least vCPUs per cluster are required.

  • Memory: The amount of memory that is assigned to the cluster. You must provision between 1 GiB and 8 GiB per vCPU.

    For example, if you create a cluster with 6 vCPUs, the minimum memory you can allocate to the cluster is 6 GiB (1 GiB per vCPU), and the maximum is 48 GiB (8 GiB per vCPU).

For more information about how to change the memory and number of vCPUs after a cluster is created, see Update the cluster size .

Network configuration

Network configuration is a list of subnets in the VPCs where the cluster is accessible. The IP addresses of the broker and bootstrap server are automatically allocated in each subnet. In addition, DNS entries for these IP addresses are created for each in the corresponding VPCs.

The following are some guidelines for your network configuration:

  • A minimum of 1 subnet is required for a cluster. The maximum is 10.

  • Exactly one subnet per network is allowed for any given cluster.

  • Each subnet must be in the same region as the cluster. The project and network can be different.

  • If you add a subnet from a different project, you must grant permissions to the Google-managed service account that is associated with the cluster. For more information, see Connect a cluster across projects .

Labels

Labels are key-value pairs that help you with organization and identification. Labels enable categorizing resources based on environment. Examples are "env:production" and "owner:data-engineering" .

You can filter and search for resources based on their labels. For example, assume you have multiple Managed Service for Apache Kafka clusters for different departments. You can configure and search for clusters with the label "department:marketing" to find the relevant one quickly.

Rebalancing configuration

This setting determines if the service automatically rebalances partition replicas across brokers.

The available modes are:

  • Auto rebalance on scale up: When this option is enabled, the service automatically triggers a rebalance of replicas when you scale up the cluster. This mode helps maintain an even load distribution but might temporarily impact performance during the rebalancing operation.

  • No rebalance: When this option is enabled, the service doesn't automatically rebalance replicas.

Encryption

Managed Service for Apache Kafka can encrypt messages with Google-owned and Google-managed encryption keys (default) or Customer-managed encryption keys (CMEK). Every message is encrypted at rest and in transit. The encryption type for a cluster is immutable.

Google-owned and Google-managed encryption keysare used by default. These keys are created, managed, and stored entirely by Google Cloud within its infrastructure.

CMEK(s)are encryption keys that you manage using Cloud Key Management Service. This feature lets you have greater control over the keys that are used to encrypt data at rest within supported Google Cloud services. Using CMEK incurs additional costs related to Cloud Key Management Service. For CMEK usage, your key ring must be in the same location as the resources you use it with. For more information, see Configure message encryption .

mTLS configuration

You can optionally configure mTLS as an alternative authentication method that uses client certificates. The configuration includes the following:

  • CA pools: A list of one to ten Certificate Authority Service (CAS) pools that the cluster trusts for client authentication.

  • SSL Principal mapping rules: An optional but recommended ssl.principal.mapping.rules broker property to simplify long certificate principal names for use in Kafka ACLs.

For more information about mTLS, see Configure mTLS authentication .

Create a cluster

Before you create a cluster, review the documentation of cluster properties .

Creating a cluster usually takes 20-30 minutes.

To create a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Clusterspage.

    Go to Clusters

  2. Select Create.

    The Create Kafka clusterpage opens.

  3. For the Cluster name, enter a string.

    For more information about how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource .

  4. For Location, enter a supported location.

    For more information about supported locations, see Supported Managed Service for Apache Kafka locations .

  5. For Capacity configuration, enter values for Memoryand vCPUs.

    For more information about how to size an Managed Service for Apache Kafka cluster, see Plan your Kafka cluster size .

  6. For Network configuration, enter the following details:

    1. Project: The project where the subnetwork is located. The subnet must be located in the same region as the cluster, but the project might be different.
    2. Network: The network to which the subnet is connected.
    3. Subnetwork: The name of the subnet.
    4. Subnet URI path: This field is automatically populated. Or, you can enter the subnet path here. The name of the subnet must be in the format: projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_ID .
    5. Click Done.
  7. (Optional) Add additional subnets by clicking Add a connected subnet.

    You can add additional subnets, up to a maximum value of ten.

  8. Retain the other default values.

  9. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka clusters create command:

     gcloud  
    managed-kafka  
    clusters  
    create  
     CLUSTER_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --cpu = 
     CPU 
      
     \ 
      
    --memory = 
     MEMORY 
      
     \ 
      
    --subnets = 
     SUBNETS 
      
     \ 
      
    --auto-rebalance  
     \ 
      
    --encryption-key = 
     ENCRYPTION_KEY 
      
     \ 
      
    --async  
     \ 
      
    --labels = 
     LABELS 
     
    

    Replace the following:

    • CLUSTER_ID : The ID or name of the cluster.

      For more information about how to name a cluster, see Guidelines to name a Managed Service for Apache Kafka resource .

    • LOCATION : The location of the cluster.

      For more information about supported locations, see Managed Service for Apache Kafka locations .

    • CPU : The number of vCPUs for the cluster.

      For more information about how to size an Managed Service for Apache Kafka cluster, see Plan your Kafka cluster size .

    • MEMORY : The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".

    • SUBNETS : The list of subnets to connect to. Use commas to separate multiple subnet values.

      The format of the subnet is projects/ PROJECT_ID /regions/ REGION /subnetworks/ SUBNET_ID .

    • auto-rebalance : Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.

    • ENCRYPTION_KEY : ID of the customer-managed encryption key to use for the cluster.

      The format is projects/ PROJECT_ID /locations/ LOCATION /keyRings/ KEY_RING /cryptoKeys/ CRYPTO_KEY .

    • --async : Lets the system send the create request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster creation happens in the background. If you don't use the flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

    • LABELS : Labels to associate with the cluster.

      For more information about the format for labels, see Labels .

    You get a response similar to the following:

     Create  
    request  
    issued  
     for 
    :  
     [ 
     CLUSTER_ID 
     ] 
    Check  
    operation  
     [ 
    projects/ PROJECT_ID 
    /locations/ LOCATION 
    /operations/ OPERATION_ID 
     ] 
      
     for 
      
    status. 
    

    Store the OPERATION_ID to track progress .

REST

Before using any of the request data, make the following replacements:

  • PROJECT_ID : your Google Cloud project ID
  • LOCATION : the location of the cluster
  • CLUSTER_ID : the ID of the cluster
  • CPU_COUNT : the number of vCPUs for the cluster
  • MEMORY : the amount of memory for the cluster, in bytes. Example: 3221225472 .
  • SUBNET_ID : subnet ID of the subnet to connect to. Example: default .

HTTP method and URL:

POST https://managedkafka.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION 
/clusters?clusterId= CLUSTER_ID 

Request JSON body:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT 
,
    "memoryBytes": MEMORY 
},
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/ PROJECT_ID 
/regions/ LOCATION 
/subnetworks/ SUBNET_ID 
"
        }
      ]
    }
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "name": "projects/ PROJECT_ID 
/locations/ LOCATION 
/operations/ OPERATION_ID 
",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": " CREATE_TIME 
",
    "target": "projects/ PROJECT_ID 
/locations/ LOCATION 
/clusters/ CLUSTER_ID 
",
    "verb": "create",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Terraform

You can use a Terraform resource to create a cluster .

 resource "google_managed_kafka_cluster" "default" {
  project    = data.google_project.default.project_id # Replace this with your project ID in quotes
  cluster_id = "my-cluster-id"
  location   = "us-central1"
  capacity_config {
    vcpu_count   = 3
    memory_bytes = 3221225472
  }
  gcp_config {
    access_config {
      network_configs {
        subnet = google_compute_subnetwork.default.id
      }
    }
  }
} 

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands .

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 createCluster 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 , 
  
 subnet 
  
 string 
 , 
  
 cpu 
 , 
  
 memoryBytes 
  
 int64 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // subnet := "projects/my-project-id/regions/us-central1/subnetworks/default" 
  
  // cpu := 
 
3  
 // memoryBytes := 3221225472 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 locationPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s" 
 , 
  
 projectID 
 , 
  
 region 
 ) 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Spr&intf 
 ( 
 "%s/clusters/%s" 
 , 
  
 locationPath 
 , 
  
 clusterID 
 ) 
  
 // Memory must be between 1 GiB and 8 GiB per CPU. 
  
 capacityConfig 
  
 := 
  
 managedkafkapb 
 . 
 CapacityConfig 
 { 
  
 VcpuCount 
& : 
  
 cpu 
 , 
  
 MemoryBytes 
 : 
  
 memoryBytes 
 , 
  
 } 
  
 var 
  
 networkConfig 
  
 [] 
 * 
 managedka&fkapb 
 . 
 NetworkConfig 
  
 networkConfig 
  
 = 
  
 append 
 ( 
 net&workConfig 
 , 
  
 managedkafkapb 
 . 
 NetworkConfig 
 { 
 & 
 Subnet 
 : 
  
 subnet 
 , 
  
 }) 
  
 platformConfig 
  
 := 
  
 managedkafkapb 
 . 
 Cluster_GcpConfig 
 { 
  
 GcpConfig 
 : 
  
 managedkafk&apb 
 . 
 GcpConfig 
 { 
  
 AccessConfig 
 : 
  
 managedkafkapb 
 . 
 AccessCo nfig 
 
 { 
  
 NetworkConfigs 
 : 
  
 networkConfig 
 , 
  
 }, 
  
 }, 
  
 } 
  
 reb&alanceConfig 
  
 := 
  
 managedkafkapb 
 . 
 RebalanceConfig 
 { 
  
 Mode 
 : 
  
 managedkafkapb 
 . 
 RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP 
 , 
  
 } 
  
 cluster 
  
 := 
  
 managedkafkapb 
 . 
 Cluster 
 { 
  
 Name 
 : 
  
 c&lusterPath 
 , 
  
 CapacityConfig 
 : 
  
 capacityConfig 
 , 
  
 PlatformConfig 
 : 
  
 platformConfig 
 , 
  
 RebalanceConfig 
 : 
  
 rebalanceConfig 
 , 
  
 } 
  
 req 
  
 := 
  
 managedkafkapb 
 . 
 CreateClusterRequest 
 { 
  
 Parent 
 : 
  
 locationPath 
 , 
  
 ClusterId 
 : 
  
 clusterID 
 , 
  
 Cluster 
 : 
  
 cluster 
 , 
  
 } 
  
 op 
 , 
  
 err 
  
 := 
  
 client 
 . 
 CreateCluster 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.CreateCluster got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // The duration of this operation can vary considerably, typically taking 10-40 minutes. 
  
 resp 
 , 
  
 err 
  
 := 
  
 op 
 . 
 Wait 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "op.Wait got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created cluster: %s\n" 
 , 
  
 resp 
 . 
 Name 
 ) 
  
 return 
  
 nil 
 } 

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  import 
  
 com.google.api.gax.longrunning. OperationFuture 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationSnapshot 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationTimedPollAlgorithm 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetryingFuture 
 
 ; 
 import 
  
 com.google.api.gax.retrying. TimedRetryAlgorithm 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. AccessConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CapacityConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Cluster 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CreateClusterRequest 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. GcpConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. LocationName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaSettings 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. NetworkConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. OperationMetadata 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. RebalanceConfig 
 
 ; 
 import 
  
 java.time.Duration 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 public 
  
 class 
 CreateCluster 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 String 
  
 subnet 
  
 = 
  
 "my-subnet" 
 ; 
  
 // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet 
  
 int 
  
 cpu 
  
 = 
  
 3 
 ; 
  
 long 
  
 memoryBytes 
  
 = 
  
 3221225472L 
 ; 
  
 // 3 GiB 
  
 createCluster 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 subnet 
 , 
  
 cpu 
 , 
  
 memoryBytes 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createCluster 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 subnet 
 , 
  
 int 
  
 cpu 
 , 
  
 long 
  
 me moryBytes 
 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  CapacityConfig 
 
  
 capacityConfi g 
 
  
 = 
  
 Capaci tyConfig 
 
 . 
 newBuilder 
 (). 
 setVcpuCount 
 ( 
 cpu 
 ). 
 s etMemoryBytes 
 
 ( 
 memoryBytes 
 ). 
 bui ld 
 
 (); 
  
 NetworkConfig 
  
 net workConfi 
g 
  
 = 
  
 NetworkConfig 
 . 
 newB uilder 
 
 (). 
 setSubnet 
 ( 
 subnet 
 ). 
 bui ld 
 
 (); 
  
 GcpConfig 
  
 gcpConfig 
  
 = 
  
 GcpConfig 
 . 
 ne wBuilder 
 
 () 
  
 . 
 setAccessConfig 
 ( 
 AccessConfig 
 . 
 newBuilder 
 (). 
 addNetworkConfigs 
 ( 
 networkCon fig 
 
 ). 
 build 
 ()) 
  
 . 
 build 
 (); 
  
 Re balanceConfig 
 
  
 rebalanceConfig 
  
 = 
  
 Re balance 
C onfig 
 
 . 
 newBuilder 
 () 
  
 . 
 setMode 
 ( 
 RebalanceConfig 
 . 
 Mode 
 . 
 AUTO_REBALANCE _ON_SCA 
LE_UP 
 ) 
  
  . 
 build 
 ( 
 
);  
 Cluster 
  
 cluster 
  
 = 
  
 Cluster 
 . 
 newBuilder 
 () 
  
 . 
 setCapacityConfig 
 ( 
 capacityConfig 
 ) 
   
 
 . 
 setGcpConfig 
 ( 
 gcpConfig 
 ) 
  
 . 
 setRebalanceConfig 
 ( 
 rebalanceConfig 
 ) 
  
 . 
 build 
 (); 
  
 // Create the settings to configure the timeout for pol 
ling operations 
  
 Managed KafkaSettings 
 
 . 
 Builder 
  
 settingsBuilder 
  
 = 
   
 ManagedKafkaSettin 
 
gs . 
 newBuilder 
 (); 
  
 Ti medRetryAlgorithm 
 
  
 timedRetryAlgorithm 
  
 = 
  
 Oper ationTimedPol 
lAlgorithm 
 . 
 create 
 ( 
  
  RetrySettings 
 . 
 newBuilde 
 
r () 
  
 . 
 setTotalTimeoutDuration 
 ( 
 Duration 
 . 
 ofHours 
 ( 
 1L 
 )) 
  
 . 
 build 
 ()); 
  
 settingsBuilder 
 . 
 createClusterOperationSettings 
 () 
  
 . 
 setPollingAl gorithm 
 
 ( 
 timedRetryAlgorithm 
 ); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
 ManagedKafkaClient 
 . 
 create 
 ( 
   
 
 settingsBuilder 
 . 
 build 
 ())) 
  
 { 
  
 C reateClusterRequest 
 
  
 request 
  
 = 
  
 CreateClusterReques t 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 LocationName 
 . 
 of 
 ( 
 projec tId 
 
 , 
  
 region 
 ). 
 toString 
 ()) 
  
 . 
 setClusterId 
 ( 
 clusterId 
 ) 
  
 . 
 setCluster 
 ( 
 cluster 
 ) 
  
 . 
 build 
 (); 
  
 // The duration of this operation can vary considerably, typically taking between 10-40 
<  
 // minutes. 
  
 O>perationFutureCluster 
 , 
  
 OperationMetadat a 
 
  
 future 
  
 = 
  
 managedKafkaClient 
 . 
 createClusterOperationCallable 
 (). 
 futureCall 
 ( 
 request 
 ); 
  
 // Get t he initial LRO an 
d print details. 
  
 OperationSnapsho t 
 
  
 operation 
  
 = 
  
 future 
 . 
 getInitialFuture 
 (). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Cluster creation started. Operation name: %s\nDo ne: %s\ 
nMetadata: %s\n" 
 , 
   
 
 operation 
 . 
 getName 
 (), 
  
 o per 
ation 
 . 
 isDone 
 (), 
  
 future 
 . 
 getMetadata 
 (). 
 get 
 (). 
 toString 
 ()); 
  
 while 
  
 ( 
 ! 
 future 
 . 
 isDone 
 ()) 
  
 { 
  
 // The pollingFuture gives us the most rec<ent status of the> operation 
  
 RetryingFutureOperationSnapshot 
  
 p ollingFuture 
 
  
 = 
  
 future 
 . 
 getPollingFuture 
 (); 
   
 
 OperationSnap sho 
t 
  
 currentOp 
  
 = 
  
 pollingFuture 
 . 
 getAttemptResult 
 (). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Polling Ope ration: 
\nName: %s\n Done: %s\n&qu ot; 
 
 , 
  
 currentOp 
 . 
 getName 
 (), 
  
 currentOp 
 . 
 isDone 
 ()); 
  
 } 
  
 // NOTE: future.get() blocks comple tion un 
til the operation i s c 
omplete (isDone =  True) 
  
 Cluster 
  
 response 
  
 = 
  
 future 
 . 
 get 
 () ; 
 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Created cluster: %s\n" 
 , 
  
 response 
 . 
 getName 
 ()); 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "m 
 
anagedKafkaClient.createCluster got err: %s" , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" 
 # cpu = 3 
 
 # memory_bytes = 3221225472 
 
 client 
 = 
 managedk afka_v1 
 
 . 
 ManagedK afkaCli 
ent 
 () 
 cluster 
 = 
 managedk afka_v1 
 
 . 
 Cluster 
 () 
 cluster 
 . 
 name 
 = 
 client 
 . 
 cluster_path 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 ) 
 cluster 
 . 
 capacity_config 
 . 
 vcpu_count 
 = 
 cpu 
 cluster 
 . 
 capacity_config 
 . 
 memory_bytes 
 = 
 memory_bytes 
 cluster 
 . 
 gcp_confi g 
 
 . 
 access_config 
 . 
  network_confi 
 
gs = 
 [ 
 managedkafka_v1 
 . 
 NetworkConfig 
 ( 
 subnet 
 = 
 subnet 
 ) 
  ] 
 cluster 
 . 
 rebal 
 
a nce_config . 
 mode 
 
  = 
 ( 
 
 managedkafka_v1 
 . 
 RebalanceConfig 
 . 
 Mode 
 . 
  AUTO_REBALANCE_ 
 
O N_SCALE_UP ) 
 reques 
 
t = 
 managedkafka_v1 
 . 
  CreateClusterRequest 
 
 ( 
 parent 
 = 
 client 
 . 
 common_location_path 
 ( 
 project_id 
 , 
 region 
 ), 
 cluster_id 
 = 
 cluster_id 
 , 
 cluster 
 = 
  cluster 
 , 
 ) 
 tr 
 
y : 
 operation 
 = 
 client 
 . 
 create_cluster 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 # The duration of this operation can vary considerably, typically taking 10-40 minutes. 
 # We can set a timeout of 3000s (50 minutes). 
 response 
 = 
 operation 
 . 
 result 
 ( 
 timeout 
 = 
 3000 
 ) 
 print 
 ( 
 "Created cluster:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 

Monitor the cluster creation operation

You can run the following command only if you ran the gcloud CLI for creating the cluster.

  • Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the gcloud managed-kafka clusters create command uses a long-running operation (LRO), which you can monitor using the following command:

     gcloud  
    managed-kafka  
    operations  
    describe  
     OPERATION_ID 
      
     \ 
      
    --location = 
     LOCATION 
     
    

    Replace the following:

    • OPERATION_ID with the value of the operation ID from the previous section.
    • LOCATION with the value of the location from the previous section.

Troubleshooting

The following are some errors you may encounter when creating clusters.

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

The Managed Service for Apache Kafka service agent is missing the required permission to access the Cloud KMS key. See the documentation for required roles for configuring CMEK .

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

The Managed Service for Apache Kafka service agent is missing the required role to configure networking in the VPC network that the Kafka clients run in. For more information, see Connect a cluster across projects .

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Create a Mobile Website
View Site in Mobile | Classic
Share by: