Update a connector

You can edit a connector to update its configuration, such as changing the topics it reads from or writes to, modifying data transformations, or adjusting error handling settings.

To update a connector in a Connect cluster, you can use the Google Cloud console, the gcloud CLI, the Managed Service for Apache Kafka client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update the connectors.

Before you begin

Before updating a connector, review its existing configuration and understand the potential impact of any changes you make.

Required roles and permissions to update a connector

To get the permissions that you need to edit a connector, ask your administrator to grant you the Managed Kafka Connector Editor ( roles/managedkafka.connectorEditor ) IAM role on the project containing the Connect cluster. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to edit a connector. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to edit a connector:

  • Grant the update connector permission on the parent Connect cluster: managedkafka.connectors.update
  • Grant the list connectors permission on the parent Connect cluster: This permission is only required for updating a connector using the Google Cloud console

You might also be able to get these permissions with custom roles or other predefined roles .

For more information about the Managed Kafka Connector Editor role, see Google Cloud Managed Service for Apache Kafka predefined roles .

Editable properties of a connector

The editable properties of a connector depend on its type. Here's a summary of the editable properties for the supported connector types:

MirrorMaker 2.0 Source connector

  • Comma-separated topic names or topic regex : The topics to be replicated.

    For more information about the property, see Topic names .

  • Configuration : Additional configuration settings for the connector.

    For more information about the property, see Configuration .

  • Task restart policy : The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy .

BigQuery Sink connector

  • Topics : The Kafka topics from which to stream data.

    For more information about the property, see Topics .

  • Dataset : The BigQuery dataset to store the data.

    For more information about the property, see Dataset .

  • Configuration : Additional configuration settings for the connector.

    For more information about the property, see Configuration .

  • Task restart policy : The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy .

Cloud Storage Sink connector

  • Topics : The Kafka topics from which to stream data.

    For more information about the property, see Topics .

  • Cloud Storage bucket : The Cloud Storage bucket to store the data.

    For more information about the property, see Bucket .

  • Configuration : Additional configuration settings for the connector.

    For more information about the property, see Configuration .

  • Task restart policy : The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy .

Pub/Sub Source connector

  • Pub/Sub subscription : The Pub/Sub subscription from which to receive messages.
  • Kafka topic : The Kafka topic to which to stream messages.
  • Configuration : Additional configuration settings for the connector. For more information, see Configure the connector .
  • Task restart policy : The policy for restarting failed connector tasks. For more information, see Task restart policy .

Pub/Sub Sink connector

  • Topics : The Kafka topics from which to stream messages.

    For more information about the property, see Topics .

  • Pub/Sub topic : The Pub/Sub topic to which to send messages.

    For more information about the property, see Pub/Sub topic .

  • Configuration : Additional configuration settings for the connector.

    For more information about the property, see Configuration .

  • Task restart policy : The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy .

Update a connector

Updating a connector may cause a temporary interruption in data flow while the changes are applied.

Console

  1. In the Google Cloud console, go to the Connect Clusterspage.

    Go to Connect Clusters

  2. Click the Connect cluster that hosts the connector you want to update.

    The Connect cluster detailspage is displayed.

  3. On the Resourcestab, find the connector in the list and click its name.

    You are redirected to the Connector detailspage.

  4. Click Edit.

  5. Update the required properties for the connector. The available properties vary depending on the connector type.

  6. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Use the gcloud managed-kafka connectors update command to update a connector:

    You can update a connector's configuration using either the --configs flag with comma-separated key-value pairs or the --config-file flag with a path to a JSON or YAML file.

    Here is the syntax that uses the --configs flag with comma-separated key-value pairs.

     gcloud  
    managed-kafka  
    connectors  
    update  
     CONNECTOR_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --connect-cluster = 
     CONNECT_CLUSTER_ID 
      
     \ 
      
    --configs = 
      KEY1 
     = 
    VALUE1,KEY2 = 
    VALUE2... 
     
    

    Here is the syntax that uses the --config-file flag with a path to a JSON or YAML file.

     gcloud  
    managed-kafka  
    connectors  
    update  
     CONNECTOR_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --connect-cluster = 
     CONNECT_CLUSTER_ID 
      
     \ 
      
    --config-file = 
     PATH_TO_CONFIG_FILE 
     
    

    Replace the following:

    • CONNECTOR_ID : Required. The ID of the connector you want to update.
    • LOCATION : Required. The location of the Connect cluster containing the connector.
    • CONNECT_CLUSTER_ID : Required. The ID of the Connect cluster containing the connector.
    • KEY1=VALUE1,KEY2=VALUE2... : Comma-separated configuration properties to update. For example, tasks.max=2,value.converter.schemas.enable=true .
    • PATH_TO_CONFIG_FILE : The path to a JSON or YAML file containing the configuration properties to update. For example, config.json .

    Example command using --configs :

     gcloud  
    managed-kafka  
    connectors  
    update  
    test-connector  
     \ 
      
    --location = 
    us-central1  
     \ 
      
    --connect-cluster = 
    test-connect-cluster  
     \ 
      
    --configs = 
    tasks.max = 
     2 
    ,value.converter.schemas.enable = 
     true 
     
    

    Example command using --config-file . The following is a sample file that is named update_config.yaml :

      tasks.max 
     : 
      
     3 
     topic 
     : 
      
     updated-test-topic 
     
    

    The following is a sample command that uses the file:

     gcloud  
    managed-kafka  
    connectors  
    update  
    test-connector  
     \ 
      
    --location = 
    us-central1  
     \ 
      
    --connect-cluster = 
    test-connect-cluster  
     \ 
      
    --config-file = 
    update_config.yaml 
    

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
  
 "google.golang.org/protobuf/types/known/fieldmaskpb" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
 ) 
 func 
  
 updateConnector 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 connectClusterID 
 , 
  
 connectorID 
  
 string 
 , 
  
 config 
  
 map 
 [ 
 string 
 ] 
 string 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // connectClusterID := "my-connect-cluster" 
  
 // connectorID := "my-connector" 
  
 // config := map[string]string{"tasks.max": "6"} 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
 NewManagedKafkaConnectClient 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewManagedKafkaConnectClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 connectorPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/connectClusters/%s/connectors/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 connectClusterID 
 , 
  
 connectorID 
 ) 
  
 connector 
  
 := 
  
& managedkafkapb 
 . 
 Connector 
 { 
  
 Name 
 : 
  
 connectorPath 
 , 
  
 Configs 
 : 
  
 config 
 , 
  
 } 
  
 paths 
  
 := 
  
 [] 
 string 
 { 
 "configs" 
 } 
  
 updateMask 
  
 := 
  
& fieldmaskpb 
 . 
 FieldMask 
 { 
  
 Paths 
 : 
  
 paths 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 UpdateConnectorRequest 
 { 
  
 UpdateMask 
 : 
  
 updateMask 
 , 
  
 Connector 
 : 
  
 connector 
 , 
  
 } 
  
 resp 
 , 
  
 err 
  
 := 
  
 client 
 . 
 UpdateConnector 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.UpdateConnector got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Updated connector: %#v\n" 
 , 
  
 resp 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  import 
  
 com.google.api.gax.rpc. ApiException 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Connector 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ConnectorName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaConnectClient 
 
 ; 
 import 
  
 com.google.protobuf. FieldMask 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 public 
  
 class 
 UpdateConnector 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 clusterId 
  
 = 
  
 "my-connect-cluster" 
 ; 
  
 String 
  
 connectorId 
  
 = 
  
 "my-connector" 
 ; 
  
 // The new value for the 'tasks.max' configuration. 
  
 String 
  
 maxTasks 
  
 = 
  
 "5" 
 ; 
  
 updateConnector 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 connectorId 
 , 
  
 maxTasks 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 updateConnector 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 clusterId 
 , 
  
 String 
  
 connectorId 
 , 
  
 String 
  
 maxTasks 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  ManagedKafkaConnectClient 
 
  
 managedKafkaConnectClient 
  
 = 
  
  ManagedKafkaConnectClient 
 
 . 
 create 
 ()) 
  
 { 
  
 Map<String 
 , 
  
 String 
>  
 configMap 
  
 = 
  
 new 
  
 HashMap 
<> (); 
  
 configMap 
 . 
 put 
 ( 
 "tasks.max" 
 , 
  
 maxTasks 
 ); 
  
  Connector 
 
  
 connector 
  
 = 
  
  Connector 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  ConnectorName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 clusterId 
 , 
  
 connectorId 
 ). 
 toString 
 ()) 
  
 . 
 putAllConfigs 
 ( 
 configMap 
 ) 
  
 . 
 build 
 (); 
  
 // The field mask specifies which fields to update. Here, we update the 'config' field. 
  
  FieldMask 
 
  
 updateMask 
  
 = 
  
  FieldMask 
 
 . 
 newBuilder 
 (). 
  addPaths 
 
 ( 
 "config" 
 ). 
 build 
 (); 
  
 // This operation is handled synchronously. 
  
  Connector 
 
  
 updatedConnector 
  
 = 
  
 managedKafkaConnectClient 
 . 
 updateConnector 
 ( 
 connector 
 , 
  
 updateMask 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Updated connector: %s\n" 
 , 
  
 updatedConnector 
 . 
  getName 
 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 updatedConnector 
 . 
 getAllFields 
 ()); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
  ApiException 
 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaConnectClient.updateConnector got err: %s\n" 
 , 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud 
  
 import 
  managedkafka_v1 
 
 from 
  
 google.cloud.managedkafka_v1.services.managed_kafka_connect 
  
 import 
 ( 
  ManagedKafkaConnectClient 
 
 , 
 ) 
 from 
  
 google.cloud.managedkafka_v1.types 
  
 import 
  Connector 
 
 from 
  
 google.protobuf 
  
 import 
 field_mask_pb2 
 # TODO(developer) 
 # project_id = "my-project-id" 
 # region = "us-central1" 
 # connect_cluster_id = "my-connect-cluster" 
 # connector_id = "my-connector" 
 # configs = { 
 #     "tasks.max": "6", 
 #     "value.converter.schemas.enable": "true" 
 # } 
 connect_client 
 = 
 ManagedKafkaConnectClient 
 () 
 connector 
 = 
 Connector 
 () 
 connector 
 . 
 name 
 = 
 connect_client 
 . 
  connector_path 
 
 ( 
 project_id 
 , 
 region 
 , 
 connect_cluster_id 
 , 
 connector_id 
 ) 
 connector 
 . 
 configs 
 = 
 configs 
 update_mask 
 = 
 field_mask_pb2 
 . 
 FieldMask 
 () 
 update_mask 
 . 
 paths 
 . 
 append 
 ( 
 "config" 
 ) 
 # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties. 
 request 
 = 
  managedkafka_v1 
 
 . 
  UpdateConnectorRequest 
 
 ( 
 update_mask 
 = 
 update_mask 
 , 
 connector 
 = 
 connector 
 , 
 ) 
 try 
 : 
 operation 
 = 
 connect_client 
 . 
  update_connector 
 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 response 
 = 
 operation 
 . 
 result 
 () 
 print 
 ( 
 "Updated connector:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 } 
 " 
 ) 
 
  You 
 can 
 also 
 update 
 the 
 connector 
 's task restart policy without 
 including 
 the 
 configuration 
 , 
 by 
 using 
 the 
 ` 
 -- 
 task 
 - 
 restart 
 - 
 min 
 - 
 backoff 
 ` 
 and 
 ` 
 -- 
 task 
 - 
 restart 
 - 
 max 
 - 
 backoff 
 ` 
 flags 
 . 
 For 
 example 
 : 
 ``` 
 sh 
 gcloud 
 managed 
 - 
 kafka 
 connectors 
 update 
 test 
 - 
 connector 
\ -- 
 location 
 = 
 us 
 - 
 central1 
\ -- 
 connect 
 - 
 cluster 
 = 
 test 
 - 
 connect 
 - 
 cluster 
\ -- 
 task 
 - 
 restart 
 - 
 min 
 - 
 backoff 
 = 
 "60s" 
\ -- 
 task 
 - 
 restart 
 - 
 max 
 - 
 backoff 
 = 
 "90s" 
 
Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Create a Mobile Website
View Site in Mobile | Classic
Share by: