Update a Google Cloud Managed Service for Apache Kafka cluster

You can edit a Google Cloud Managed Service for Apache Kafka cluster to update properties like the number of vCPUs, memory, subnets, encryption type, or labels. You can also configure whether the service rebalances partitions across brokers when a broker is added to the cluster. The service creates new brokers automatically based on memory and vCPU configuration of the cluster.

To edit a cluster you can use the Google Cloud console, the Google Cloud CLI, the client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update a cluster.

Before you begin

Review the properties of the cluster before making any changes.

Required roles and permissions to edit a cluster

To get the permissions that you need to update a cluster, ask your administrator to grant you the Managed Kafka Cluster Editor ( roles/managedkafka.clusterEditor ) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to update a cluster. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to update a cluster:

  • Edit a cluster: managedkafka.clusters.update

You might also be able to get these permissions with custom roles or other predefined roles .

The Managed Kafka Cluster Editor role does not let you create, delete, or modify topics and consumer groups on Managed Service for Apache Kafka clusters. Nor does it allow data plane access to publish or consume messages within clusters. For more information about this role, see Managed Service for Apache Kafka predefined roles .

Editable properties of a cluster

Not all properties can be edited in the Google Cloud console. Use the gcloud CLI, or the client libraries to edit properties not available in the Google Cloud console.

Update the memory and vCPUs

Understanding how your cluster's vCPU and memory relate to the number of brokers is key to managing performance during scaling operations. Specific rules apply when the number of vCPUs is changed.

A new cluster's broker count is set by its initial vCPU count. The system provisions 1 broker for every 15 vCPUs, using the following formula:

number of brokers = ceiling(Number of vCPUs / 15)

For example, a cluster with 75 vCPUs would start with 5 brokers.

The cluster's overall vCPU-to-memory ratio must always remain between 1:1 and 1:8.

When you update an existing cluster, the following rules apply:

  • The number of brokers in a cluster never decreases from the initial peak count, even when you downscale (reduce vCPU or memory).

  • When you downscale, the resulting resources must be sufficient to provide at least 1 vCPU and 1 GiB of memory to each existing broker.

  • When you upscale, if the change results in adding new brokers, the average vCPU and memory per broker can't decrease by more than 10% compared to the pre-update averages.

    For example, if you attempt to upscale a cluster from 45 vCPUs with 3 brokers to 48 vCPUs with 4 brokers, the operation fails. This is because the average vCPU per broker decreases from 15 to 12 (a 20% reduction), and this reduction exceeds the 10% limit.

    If you need to decrease the CPU count by more than 10%, we recommend doing it in several stages. During the process, monitor the resource utilization and rebalance partitions, if needed. You can also bypass this check if you are confident your brokers would have enough capacity after the change. To turn off the check, set allow_broker_downscale_on_cluster_upscale to true . This flag signals that you accept the potential performance risk.

Example for an upscale operation

The following examples start with a cluster that has 75 vCPUs, 130 GiB RAM, and 5 brokers.

Example for a failed upscale operation

You attempt to upscale the cluster to 80 vCPUsand 140 GiB RAM.

  • The system determines if a new broker is needed.

    ceiling(80 vCPUs / 15) = 6 brokers

    The cluster would grow from 5 to 6 brokers, so the 10% safety check is triggered.

  • The current average is:

    75 vCPUs / 5 brokers = 15 vCPUs/broker

    130 GiB / 5 brokers = 26 GiB/broker

  • The new average with 6 brokers is:

    80 vCPUs / 6 brokers = 13.33 vCPUs/broker (an 11.1% reduction)

    140 GiB / 6 brokers = 23.33 GiB/broker (a 10.2% reduction)

    An operation that violates this minimum fails.

Example for a successful upscale operation

You upscale the cluster to 85 vCPUsand 150 GiB RAM.

  • The system determines if a new broker is needed.

    ceiling(85 vCPUs / 15) = 6 brokers

    The cluster would grow from 5 to 6 brokers, so the 10% safety check is triggered.

  • The current average is:

    75 vCPUs / 5 brokers = 15 vCPUs/broker

    130 GiB / 5 brokers = 26 GiB/broker

  • The new average with 6 brokers is:

    85 vCPUs / 6 brokers = 14.17 vCPUs/broker (a 5.5% reduction)

    150 GiB / 6 brokers = 25 GiB/broker (a 3.8% reduction)

This operation is successful because the reduction in average vCPU and memory per broker is within the 10% limit.

Edit a cluster

Before you edit a cluster, review the editable properties of a cluster . Updating certain properties, such as CPU and memory, might require a cluster restart. When required, clusters are restarted one broker at a time. This leads to temporary failures of requests to individual brokers. These failures are transient. Commonly used client libraries handle such errors automatically.

To edit a cluster, follow these steps:

Console

  1. In the Google Cloud console, go to the Cluster page.

    Go to Clusters

  2. From the list of clusters, click the cluster whose properties you want to edit.

    The cluster details page is displayed.

  3. In the cluster details page, click Edit.
  4. Edit the properties as required. The following properties of a cluster are editable from the console:
    • Memory
    • vCPUs
    • Subnet
    • Labels

    You cannot edit the cluster name, the cluster location, or the encryption type.

  5. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka clusters update command:

    gcloud  
    managed-kafka  
    clusters  
    update  
     CLUSTER_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --cpu = 
     CPU 
      
     \ 
      
    --memory = 
     MEMORY 
      
     \ 
      
    --subnets = 
     SUBNETS 
      
     \ 
      
    --auto-rebalance  
     \ 
      
    --labels = 
     LABELS 
    

    Replace the following:

    • CLUSTER_ID : The ID or name of the cluster. You can't update this value.

    • LOCATION : The location of the cluster. You can't update this value.

    • CPU : The number of virtual CPUs for the cluster.

    • MEMORY : The amount of memory for the cluster. Use "MB", "MiB", "GB", "GiB", "TB", or "TiB" units. For example, "10GiB".

    • SUBNETS : The list of subnets to connect to. Use commas to separate multiple subnet values.

    • auto-rebalance : Enables automatic rebalancing of topic partitions among brokers when the number of CPUs in the cluster changes. This is enabled by default.

    • LABELS : Labels to associate with the cluster.

If you use the --async flag with your command, the system sends the update request and immediately returns a response, without waiting for the operation to complete. With the --async flag, you can continue with other tasks while the cluster update happens in the background. If you don't use the --async flag, the system waits for the operation to complete before returning a response. You have to wait until the cluster is fully updated before you can continue with other tasks.

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/protobuf/types/known/fieldmaskpb" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 updateCluster 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
  
 string 
 , 
  
 memory 
  
 int64 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // memoryBytes := 4221225472 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/clusters/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 ) 
  
 capacityConfig 
  
 := 
  
& managedkafkapb 
 . 
 CapacityConfig 
 { 
  
 MemoryBytes 
 : 
  
 memory 
 , 
  
 } 
  
 cluster 
  
 := 
  
& managedkafkapb 
 . 
 Cluster 
 { 
  
 Name 
 : 
  
 clusterPath 
 , 
  
 CapacityConfig 
 : 
  
 capacityConfig 
 , 
  
 } 
  
 paths 
  
 := 
  
 [] 
 string 
 { 
 "capacity_config.memory_bytes" 
 } 
  
 updateMask 
  
 := 
  
& fieldmaskpb 
 . 
 FieldMask 
 { 
  
 Paths 
 : 
  
 paths 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 UpdateClusterRequest 
 { 
  
 UpdateMask 
 : 
  
 updateMask 
 , 
  
 Cluster 
 : 
  
 cluster 
 , 
  
 } 
  
 op 
 , 
  
 err 
  
 := 
  
 client 
 . 
 UpdateCluster 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.UpdateCluster got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 resp 
 , 
  
 err 
  
 := 
  
 op 
 . 
 Wait 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "op.Wait got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Updated cluster: %#v\n" 
 , 
  
 resp 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  import 
  
 com.google.api.gax.longrunning. OperationFuture 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationSnapshot 
 
 ; 
 import 
  
 com.google.api.gax.longrunning. OperationTimedPollAlgorithm 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.api.gax.retrying. TimedRetryAlgorithm 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CapacityConfig 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Cluster 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ClusterName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaSettings 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. OperationMetadata 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. UpdateClusterRequest 
 
 ; 
 import 
  
 com.google.protobuf. FieldMask 
 
 ; 
 import 
  
 java.time. Duration 
 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 public 
  
 class 
 UpdateCluster 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 long 
  
 memoryBytes 
  
 = 
  
 4221225472L 
 ; 
  
 // 4 GiB 
  
 updateCluster 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 memoryBytes 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 updateCluster 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 long 
  
 memoryBytes 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  CapacityConfig 
 
  
 capacityConfig 
  
 = 
  
  CapacityConfig 
 
 . 
 newBuilder 
 (). 
  setMemoryBytes 
 
 ( 
 memoryBytes 
 ). 
 build 
 (); 
  
  Cluster 
 
  
 cluster 
  
 = 
  
  Cluster 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  ClusterName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 ). 
 toString 
 ()) 
  
 . 
 setCapacityConfig 
 ( 
 capacityConfig 
 ) 
  
 . 
 build 
 (); 
  
  FieldMask 
 
  
 updateMask 
  
 = 
  
  FieldMask 
 
 . 
 newBuilder 
 (). 
  addPaths 
 
 ( 
 "capacity_config.memory_bytes" 
 ). 
 build 
 (); 
  
 // Create the settings to configure the timeout for polling operations 
  
  ManagedKafkaSettings 
 
 . 
 Builder 
  
 settingsBuilder 
  
 = 
  
  ManagedKafkaSettings 
 
 . 
 newBuilder 
 (); 
  
  TimedRetryAlgorithm 
 
  
 timedRetryAlgorithm 
  
 = 
  
  OperationTimedPollAlgorithm 
 
 . 
 create 
 ( 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setTotalTimeoutDuration 
 
 ( 
  Duration 
 
 . 
 ofHours 
 ( 
 1L 
 )) 
  
 . 
 build 
 ()); 
  
 settingsBuilder 
 . 
 updateClusterOperationSettings 
 () 
  
 . 
 setPollingAlgorithm 
 ( 
 timedRetryAlgorithm 
 ); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
  ManagedKafkaClient 
 
 . 
 create 
 ( 
  
 settingsBuilder 
 . 
 build 
 ())) 
  
 { 
  
  UpdateClusterRequest 
 
  
 request 
  
 = 
  
  UpdateClusterRequest 
 
 . 
 newBuilder 
 (). 
 setUpdateMask 
 ( 
 updateMask 
 ). 
 setCluster 
 ( 
 cluster 
 ). 
 build 
 (); 
  
 OperationFuture<Cluster 
 , 
  
 OperationMetadata 
>  
 future 
  
 = 
  
 managedKafkaClient 
 . 
  updateClusterOperationCallable 
 
 (). 
 futureCall 
 ( 
 request 
 ); 
  
 // Get the initial LRO and print details. CreateCluster contains sample code for polling logs. 
  
  OperationSnapshot 
 
  
 operation 
  
 = 
  
 future 
 . 
 getInitialFuture 
 (). 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n" 
 , 
  
 operation 
 . 
  getName 
 
 (), 
  
 operation 
 . 
  isDone 
 
 (), 
  
 future 
 . 
 getMetadata 
 (). 
 get 
 (). 
 toString 
 ()); 
  
  Cluster 
 
  
 response 
  
 = 
  
 future 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Updated cluster: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaClient.updateCluster got err: %s" 
 , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 from 
  
 google.protobuf 
  
 import 
 field_mask_pb2 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # memory_bytes = 4295000000 
 client 
 = 
  managedkafka_v1 
 
 . 
  ManagedKafkaClient 
 
 () 
 cluster 
 = 
  managedkafka_v1 
 
 . 
  Cluster 
 
 () 
 cluster 
 . 
 name 
 = 
 client 
 . 
  cluster_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 ) 
 cluster 
 . 
 capacity_config 
 . 
 memory_bytes 
 = 
 memory_bytes 
 update_mask 
 = 
 field_mask_pb2 
 . 
 FieldMask 
 () 
 update_mask 
 . 
 paths 
 . 
 append 
 ( 
 "capacity_config.memory_bytes" 
 ) 
 # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties. 
 request 
 = 
  managedkafka_v1 
 
 . 
  UpdateClusterRequest 
 
 ( 
 update_mask 
 = 
 update_mask 
 , 
 cluster 
 = 
 cluster 
 , 
 ) 
 try 
 : 
 operation 
 = 
 client 
 . 
  update_cluster 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 response 
 = 
 operation 
 . 
 result 
 () 
 print 
 ( 
 "Updated cluster:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 
 

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Design a Mobile Site
View Site in Mobile | Classic
Share by: