Update a Google Cloud Managed Service for Apache Kafka topic

After a topic is created, you can edit the topic configuration to update these properties: the number of partitions and topic configurations that don't default to the properties already set at the cluster-level. You can only increase the number of partitions, you cannot decrease it.

To update a single topic, you can use the Google Cloud console, the Google Cloud CLI, the client library, the Managed Kafka API, or the open source Apache Kafka APIs.

Required roles and permissions to edit a topic

To get the permissions that you need to edit a topic, ask your administrator to grant you the Managed Kafka Topic Editor( roles/managedkafka.topicEditor ) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to edit a topic. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to edit a topic:

  • Update a topic: managedkafka.topics.update

You might also be able to get these permissions with custom roles or other predefined roles .

For more information about this role, see Managed Service for Apache Kafka predefined roles .

Edit a topic

To edit a topic, follow these steps:

Console

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

  2. The clusters you created in a project are listed.

  3. Click the cluster to which the topic that you want to edit belongs.

    The Cluster details page opens. In the cluster details page, for the Resources tab, the topics are listed.

  4. Click the topic that you want to edit.

    The Topic details page opens.

  5. To make your edits, click Edit .
  6. Click Save after the changes.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka topics update command:

    gcloud  
    managed-kafka  
    topics  
    update  
     TOPIC_ID 
      
     \ 
      
    --cluster = 
     CLUSTER_ID 
      
     \ 
      
    --location = 
     LOCATION_ID 
      
     \ 
      
    --partitions = 
     PARTITIONS 
      
     \ 
      
    --configs = 
     CONFIGS 
    

    This command modifies the configuration of an existing topic in the specified Managed Service for Apache Kafka cluster. You can use this command to increase the number of partitions in the topic or update topic-level configuration settings.

    Replace the following:

    • TOPIC_ID : The ID of the topic.

    • CLUSTER_ID : The ID of the cluster containing the topic.

    • LOCATION_ID : The location of the cluster.

    • PARTITIONS : The updated number of partitions for the topic. You can only increase the number of partitions, you cannot decrease it.

    • CONFIGS : Topic-level optional configurations. Specify as comma-separated key-value pairs. For example, `compression.type=producer`.

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/protobuf/types/known/fieldmaskpb" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 updateTopic 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 , 
  
 topicID 
  
 string 
 , 
  
 partitionCount 
  
 int32 
 , 
  
 configs 
  
 map 
 [ 
 string 
 ] 
 string 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // clusterID := "my-cluster" 
  
 // topicID := "my-topic" 
  
 // partitionCount := 20 
  
 // configs := map[string]string{"min.insync.replicas":"1"} 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 clusterPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/clusters/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 clusterID 
 ) 
  
 topicPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s/topics/%s" 
 , 
  
 clusterPath 
 , 
  
 topicID 
 ) 
  
 TopicConfig 
  
 := 
  
 managedkafkapb 
 . 
 Topic 
 { 
  
 Name 
 : 
  
 topicPath 
 , 
  
 PartitionCount 
 : 
  
 partitionCount 
 , 
  
 Configs 
 : 
  
 configs 
 , 
  
 } 
  
 paths 
  
 := 
  
 [] 
 string 
 { 
 "partition_count" 
 , 
  
 "configs" 
 } 
  
 updateMask 
  
 := 
  
& fieldmaskpb 
 . 
 FieldMask 
 { 
  
 Paths 
 : 
  
 paths 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 UpdateTopicRequest 
 { 
  
 UpdateMask 
 : 
  
 updateMask 
 , 
  
 Topic 
 : 
  
& TopicConfig 
 , 
  
 } 
  
 topic 
 , 
  
 err 
  
 := 
  
 client 
 . 
 UpdateTopic 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.UpdateTopic got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Updated topic: %#v\n" 
 , 
  
 topic 
 ) 
  
 return 
  
 nil 
 } 
 

Java

  import 
  
 com.google.api.gax.rpc. ApiException 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaClient 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Topic 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. TopicName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. UpdateTopicRequest 
 
 ; 
 import 
  
 com.google.protobuf. FieldMask 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.Arrays 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 public 
  
 class 
 UpdateTopic 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-cluster" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "my-topic" 
 ; 
  
 int 
  
 partitionCount 
  
 = 
  
 200 
 ; 
  
 Map<String 
 , 
  
 String 
>  
 configs 
  
 = 
  
 new 
  
 HashMap<String 
 , 
  
 String 
> () 
  
 { 
  
 { 
  
 put 
 ( 
 "min.insync.replicas" 
 , 
  
 "1" 
 ); 
  
 } 
  
 }; 
  
 updateTopic 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 topicId 
 , 
  
 partitionCount 
 , 
  
 configs 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 updateTopic 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 topicId 
 , 
  
 int 
  
 partitionCount 
 , 
  
 Map<String 
 , 
  
 String 
>  
 configs 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
  Topic 
 
  
 topic 
  
 = 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 topicId 
 ). 
 toString 
 ()) 
  
 . 
  setPartitionCount 
 
 ( 
 partitionCount 
 ) 
  
 . 
 putAllConfigs 
 ( 
 configs 
 ) 
  
 . 
 build 
 (); 
  
 String 
 [] 
  
 paths 
  
 = 
  
 { 
 "partition_count" 
 , 
  
 "configs" 
 }; 
  
  FieldMask 
 
  
 updateMask 
  
 = 
  
  FieldMask 
 
 . 
 newBuilder 
 (). 
  addAllPaths 
 
 ( 
 Arrays 
 . 
 asList 
 ( 
 paths 
 )). 
 build 
 (); 
  
 try 
  
 ( 
  ManagedKafkaClient 
 
  
 managedKafkaClient 
  
 = 
  
  ManagedKafkaClient 
 
 . 
 create 
 ()) 
  
 { 
  
  UpdateTopicRequest 
 
  
 request 
  
 = 
  
  UpdateTopicRequest 
 
 . 
 newBuilder 
 (). 
 setUpdateMask 
 ( 
 updateMask 
 ). 
 setTopic 
 ( 
 topic 
 ). 
 build 
 (); 
  
 // This operation is being handled synchronously. 
  
  Topic 
 
  
 response 
  
 = 
  
 managedKafkaClient 
 . 
 updateTopic 
 ( 
 request 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Updated topic: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
  ApiException 
 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaClient.updateCluster got err: %s" 
 , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 
 

Python

  from 
  
 google.api_core.exceptions 
  
 import 
 NotFound 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 from 
  
 google.protobuf 
  
 import 
 field_mask_pb2 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # cluster_id = "my-cluster" 
 # topic_id = "my-topic" 
 # partition_count = 20 
 # configs = {"min.insync.replicas": "1"} 
 client 
 = 
  managedkafka_v1 
 
 . 
  ManagedKafkaClient 
 
 () 
 topic 
 = 
  managedkafka_v1 
 
 . 
  Topic 
 
 () 
 topic 
 . 
 name 
 = 
 client 
 . 
  topic_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 cluster_id 
 , 
 topic_id 
 ) 
 topic 
 . 
 partition_count 
 = 
 partition_count 
 topic 
 . 
 configs 
 = 
 configs 
 update_mask 
 = 
 field_mask_pb2 
 . 
 FieldMask 
 () 
 update_mask 
 . 
 paths 
 . 
 extend 
 ([ 
 "partition_count" 
 , 
 "configs" 
 ]) 
 # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties. 
 request 
 = 
  managedkafka_v1 
 
 . 
  UpdateTopicRequest 
 
 ( 
 update_mask 
 = 
 update_mask 
 , 
 topic 
 = 
 topic 
 , 
 ) 
 try 
 : 
 response 
 = 
 client 
 . 
  update_topic 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 "Updated topic:" 
 , 
 response 
 ) 
 except 
 NotFound 
 as 
 e 
 : 
 print 
 ( 
 f 
 "Failed to update topic 
 { 
 topic_id 
 } 
 with error: 
 { 
 e 
 . 
 message 
 } 
 " 
 ) 
 

What's next?

Design a Mobile Site
View Site in Mobile | Classic
Share by: