Update a Google Cloud Managed Service for Apache Kafka cluster

You can edit a Google Cloud Managed Service for Apache Kafka cluster to update properties like the number of vCPUs, memory, subnets, encryption type, or labels. You can also configure whether the service rebalances partitions across brokers when a broker is added to the cluster. The service creates new brokers automatically based on memory and vCPU configuration of the cluster.

To edit a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update a cluster.

Before you begin

If you update the vCPU count or memory, the following rules apply:

  • The cluster's overall vCPU-to-memory ratio must always remain between 1:1 and 1:8.

  • If you downscale, there must be at least 1 vCPU and 1 GiB of memory for each existing broker. The number of brokers never decreases.

  • If you upscale, and the change results in adding new brokers, the average vCPU and memory per broker can't decrease by more than 10% compared to the averages before the update.

    For example, if you try to upscale a cluster from 45 vCPUs (3 brokers) to 48 vCPUs (4 brokers), the operation fails. This is because the average vCPU per broker decreases from 15 to 12, which is a 20% reduction, exceeding the 10% limit.

For more information, see Update the cluster size .

Updating certain properties, such as vCPU count and memory, might require the service to restart the cluster. Clusters are restarted one broker at a time. This leads to temporary failures of requests to individual brokers, but these failures are transient. Commonly used client libraries handle such errors automatically.

You cannot edit the cluster name, the cluster location, or the encryption type.

Required roles and permissions to edit a cluster

To get the permissions that you need to update a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor ( roles/managedkafka.clusterEditor ) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to update a cluster. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to update a cluster:

  • Edit a cluster: managedkafka.clusters.update

You might also be able to get these permissions with custom roles or other predefined roles .

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Managed Service for Apache Kafka clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Managed Service for Apache Kafka predefined roles .

Edit a cluster

To edit a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Clusterspage.

    Go to Clusters

  2. From the list of clusters, click the cluster whose properties you want to edit.

    The cluster details page is displayed.

  3. In the cluster details page, click Edit.

  4. Edit the properties as required. The following properties of a cluster are editable from the console:

    • Memory
    • vCPUs
    • Subnet
    • Rebalancing configuration
    • mTLS configuration
    • Labels
  5. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka clusters update command:

     gcloud  
    managed-kafka  
    clusters  
    update  
     CLUSTER_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --cpu = 
     CPU 
      
     \ 
      
    --memory = 
     MEMORY 
      
     \ 
      
    --subnets = 
     SUBNETS 
      
     \ 
      
    --auto-rebalance  
     \ 
      
    --labels = 
     LABELS 
     
    

    Replace the following:

    • CLUSTER_ID : The ID or name of the cluster. You can't update this value.
    • LOCATION : The location of the cluster. You can't update this value.
    • CPU : The number of virtual CPUs for the cluster.
    • MEMORY : The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".
    • SUBNETS : The list of subnets to connect to. Use commas to separate multiple subnet values.
    • auto-rebalance : Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.
    • LABELS : Labels to associate with the cluster.

If you use the --async flag with your command, the system sends the update request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster update happens in the background. If you don't use the --async flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

REST

Before using any of the request data, make the following replacements:

  • PROJECT_ID : your Google Cloud project ID
  • LOCATION : the location of the cluster
  • CLUSTER_ID : the ID of the cluster
  • UPDATE_MASK : which fields to update, as a comma-separated list of fully qualified names. Example: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT : the number of vCPUs for the cluster
  • MEMORY : the amount of memory for the cluster, in bytes
  • SUBNET_ID : subnet ID of the subnet to connect to

HTTP method and URL:

PATCH https://managedkafka.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION 
/clusters/ CLUSTER_ID 
?updateMask= UPDATE_MASK 

Request JSON body:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT 
,
    "memoryBytes": MEMORY 
},
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/ PROJECT_ID 
/regions/ LOCATION 
/subnetworks/ SUBNET_ID 
"
        }
      ]
    }
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "name": "projects/ PROJECT_ID 
/locations/ LOCATION 
/clusters/ CLUSTER_ID 
/topics/ TOPIC_ID 
",
  "partitionCount": PARTITION_COUNT 
,
  "replicationFactor": REPLICATION_FACTOR 
}

In the request body, include only the fields that you are updating, as specified in the UPDATE_MASK query parameter. To add a subnet, append a new entry to networkConfigs .

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/protobuf/types/known/fieldmaskpb" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 updateCluster 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
  
 string 
 , 
  
 memory 
  
 int64 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // memoryBytes := 4221225472 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/clusters/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 ) 
  
 capacityConfig 
  
 := 
  
& managedkafkapb 
 . 
 CapacityConfig 
 { 
  
 MemoryBytes 
 : 
  
 memory 
 , 
  
 } 
  
 cluster 
  
 := 
  
& managedkafkapb 
 . 
 Cluster 
 { 
  
 Name 
 : 
  
 clusterPath 
 , 
  
 CapacityConfig 
 : 
  
 capacityConfig 
 , 
  
 } 
  
 paths 
  
 := 
  
 [] 
 string 
 { 
 "capacity_config.memory_bytes" 
 } 
  
 updateMask 
  
 := 
  
& fieldmaskpb 
 . 
 FieldMask 
 { 
  
 Paths 
 : 
  
 paths 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 UpdateClusterRequest 
 { 
  
 UpdateMask 
 : 
  
 updateMask 
 , 
  
 Cluster 
 : 
  
 cluster 
 , 
  
 } 
  
 op 
 , 
  
 err 
  
 := 
  
 client 
 . 
 UpdateCluster 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.UpdateCluster got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 resp 
 , 
  
 err 
  
 := 
  
 op 
 . 
 Wait 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "op.Wait got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Updated cluster: %#v\n" 
 , 
  
 resp 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  import 
  
 com.google.api.gax.longrunning. OperationFuture 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationSnapshot 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationTimedPollAlgorithm 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.api.gax.retrying. TimedRetryAlgorithm 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CapacityConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Cluster 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ClusterName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaSettings 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. OperationMetadata 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. UpdateClusterRequest 
 
 ; 
 import 
  
 com.google.protobuf. FieldMask 
 
 ; 
 import 
  
 java.time. Duration 
 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 public 
  
 class 
 UpdateCluster 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 long 
  
 memoryBytes 
  
 = 
  
 25769803776L 
 ; 
  
 // 24 GiB 
  
 updateCluster 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 memoryBytes 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 updateCluster 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 long 
  
 memoryBytes 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  CapacityConfig 
 
  
 capacityConfig 
  
 = 
  
  CapacityConfig 
 
 . 
 newBuilder 
 (). 
  setMemoryBytes 
 
 ( 
 memoryBytes 
 ). 
 build 
 (); 
  
  Cluster 
 
  
 cluster 
  
 = 
  
  Cluster 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  ClusterName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 ). 
 toString 
 ()) 
  
 . 
 setCapacityConfig 
 ( 
 capacityConfig 
 ) 
  
 . 
 build 
 (); 
  
  FieldMask 
 
  
 updateMask 
  
 = 
  
  FieldMask 
 
 . 
 newBuilder 
 (). 
  addPaths 
 
 ( 
 "capacity_config.memory_bytes" 
 ). 
 build 
 (); 
  
 // Create the settings to configure the timeout for polling operations 
  
  ManagedKafkaSettings 
 
 . 
 Builder 
  
 settingsBuilder 
  
 = 
  
  ManagedKafkaSettings 
 
 . 
 newBuilder 
 (); 
  
  TimedRetryAlgorithm 
 
  
 timedRetryAlgorithm 
  
 = 
  
  OperationTimedPollAlgorithm 
 
 . 
 create 
 ( 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setTotalTimeoutDuration 
 
 ( 
  Duration 
 
 . 
 ofHours 
 ( 
 1L 
 )) 
  
 . 
 build 
 ()); 
  
 settingsBuilder 
 . 
 updateClusterOperationSettings 
 () 
  
 . 
 setPollingAlgorithm 
 ( 
 timedRetryAlgorithm 
 ); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
  ManagedKafkaClient 
 
 . 
 create 
 ( 
  
 settingsBuilder 
 . 
 build 
 ())) 
  
 { 
  
  UpdateClusterRequest 
 
  
 request 
  
 = 
  
  UpdateClusterRequest 
 
 . 
 newBuilder 
 (). 
 setUpdateMask 
 ( 
 updateMask 
 ). 
 setCluster 
 ( 
 cluster 
 ). 
 build 
 (); 
  
 OperationFuture<Cluster 
 , 
  
 OperationMetadata 
>  
 future 
  
 = 
  
 managedKafkaClient 
 . 
  updateClusterOperationCallable 
 
 (). 
 futureCall 
 ( 
 request 
 ); 
  
 // Get the initial LRO and print details. CreateCluster contains sample code for polling logs. 
  
  OperationSnapshot 
 
  
 operation 
  
 = 
  
 future 
 . 
 getInitialFuture 
 (). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n" 
 , 
  
 operation 
 . 
  getName 
 
 (), 
  
 operation 
 . 
  isDone 
 
 (), 
  
 future 
 . 
 getMetadata 
 (). 
 get 
 (). 
 toString 
 ()); 
  
  Cluster 
 
  
 response 
  
 = 
  
 future 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Updated cluster: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaClient.updateCluster got err: %s" 
 , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 from 
  
 google.protobuf 
  
 import 
 field_mask_pb2 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # memory_bytes = 4295000000 
 client 
 = 
  managedkafka_v1 
 
 . 
  ManagedKafkaClient 
 
 () 
 cluster 
 = 
  managedkafka_v1 
 
 . 
  Cluster 
 
 () 
 cluster 
 . 
 name 
 = 
 client 
 . 
  cluster_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 ) 
 cluster 
 . 
 capacity_config 
 . 
 memory_bytes 
 = 
 memory_bytes 
 update_mask 
 = 
 field_mask_pb2 
 . 
 FieldMask 
 () 
 update_mask 
 . 
 paths 
 . 
 append 
 ( 
 "capacity_config.memory_bytes" 
 ) 
 # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties. 
 request 
 = 
  managedkafka_v1 
 
 . 
  UpdateClusterRequest 
 
 ( 
 update_mask 
 = 
 update_mask 
 , 
 cluster 
 = 
 cluster 
 , 
 ) 
 try 
 : 
 operation 
 = 
 client 
 . 
  update_cluster 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 response 
 = 
 operation 
 . 
 result 
 () 
 print 
 ( 
 "Updated cluster:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 
 

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Design a Mobile Site
View Site in Mobile | Classic
Share by: