Create a BigQuery Sink connector

BigQuery Sink connectors let you stream data from Kafka into BigQuery, enabling real-time data ingestion and analysis within BigQuery. A BigQuery Sink connector consumes records from one or more Kafka topics, and writes the data to one or more tables within a single BigQuery dataset.

Before you begin

Before creating a BigQuery Sink connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a BigQuery Sink connector, ask your administrator to grant you the Managed Kafka Connector Editor ( roles/managedkafka.connectorEditor ) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to create a BigQuery Sink connector. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to create a BigQuery Sink connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles .

For more information about the Managed Kafka Connector Editorrole, see Managed Service for Apache Kafka predefined roles .

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the cluster is in a different project, refer to Create a Connect Cluster in a different project .

Grant permissions to write to the BigQuery table

The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com , requires permission to write to the BigQuery table. To do so, grant the BigQuery Data Editor( roles/bigquery.dataEditor ) role to the Connect cluster service account on the project containing the BigQuery table.

Schemas for a BigQuery Sink connector

The BigQuery Sink connector uses the configured value converter ( value.converter ) to parse Kafka record values into fields. Then it writes the fields to columns of the same name in the BigQuery table.

The connector requires a schema to operate. The schema can be provided in the following ways:

  • Message-based schema: The schema is included as part of each message.
  • Table-based schema: The connector infers the message schema from the BigQuery table schema.
  • Schema registry: The connector reads the schema from a schema registry, such as Managed Service for Apache Kafka schema registry ( Preview ).

The next sections describe these options.

Message-based schema

In this mode, each Kafka record includes a JSON schema. The connector uses the schema to write the record data as a BigQuery table row.

To use message-based schemas, set the following properties on the connector:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Example Kafka record value:

  { 
  
 "schema" 
 : 
  
 { 
  
 "type" 
 : 
  
 "struct" 
 , 
  
 "fields" 
 : 
  
 [ 
  
 { 
  
 "field" 
 : 
  
 "user" 
 , 
  
 "type" 
 : 
  
 "string" 
 , 
  
 "optional" 
 : 
  
 false 
  
 }, 
  
 { 
  
 "field" 
 : 
  
 "age" 
 , 
  
 "type" 
 : 
  
 "int64" 
 , 
  
 "optional" 
 : 
  
 false 
  
 } 
  
 ] 
  
 }, 
  
 "payload" 
 : 
  
 { 
  
 "user" 
 : 
  
 "userId" 
 , 
  
 "age" 
 : 
  
 30 
  
 } 
 } 
 

If the destination table already exists, the BigQuery table schema must be compatible with the embedded message schema. If autoCreateTables=true , the connector automatically creates the destination table if needed. For more information, see Table creation .

If you want the connector to update the BigQuery table schema as message schemas change, set allowNewBigQueryFields , allowSchemaUnionization , or allowBigQueryRequiredFieldRelaxation to true .

Table-based schema

In this mode, the Kafka records contain plain JSON data without an explicit schema. The connector infers the schema from the destination table.

Requirements:

  • The BigQuery table must already exist.
  • The Kafka record data must be compatible with the table schema.
  • This mode doesn't support dynamic schema updates based on incoming messages.

To use table-based schemas, set the following properties on the connector:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

If the BigQuery table uses time-based partitioning with daily partitioning, bigQueryPartitionDecorator can be true . Otherwise, set this property to false .

Example Kafka record value:

  { 
  
 "user" 
 : 
  
 "userId" 
 , 
  
 "age" 
 : 
  
 30 
 } 
 

Schema registry

In this mode, each Kafka record contains Apache Avro data, and the message schema is stored in a schema registry.

To use the BigQuery Sink connector with a schema registry, set the following properties on the connector:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url= SCHEMA_REGISTRY_URL

Replace SCHEMA_REGISTRY_URL with the URL of the schema registry.

To use the connector with Managed Service for Apache Kafka schema registry, set the following property:

  • value.converter.bearer.auth.credentials.source=GCP

For more information, see Use Kafka Connect with schema registry .

BigLake tables for Apache Iceberg in BigQuery

The BigQuery Sink connector supports BigLake tables for Apache Iceberg in BigQuery (hereafter, BigLake Iceberg tables in BigQuery ) as a sink target.

BigLake Iceberg tables in BigQuery provide the foundation for building open-format lakehouses on Google Cloud. BigLake Iceberg tables in BigQuery offer the same fully managed experience as BigQuery tables, but store data in customer-owned storage buckets using Parquet to be interoperable with Apache Iceberg open table formats.

For information on how to create an Apache Iceberg table, see Create an Apache Iceberg table .

Create a BigQuery Sink connector

Console

  1. In the Google Cloud console, go to the Connect Clusterspage.

    Go to Connect Clusters

  2. Click the Connect cluster where you want to create the connector.

  3. Click Create connector.

  4. For the connector name, enter a string.

    For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource .

  5. For Connector plugin, select BigQuery Sink.

  6. In the Topicssection, specify the Kafka topics to read from. You can specify a list of topics or a regular expression to match against topic names.

    • Option 1: Choose Select a list of Kafka topics. In the Kafka topicslist, select one or more topics. Click OK.

    • Option 2: Choose Use a topic regex. In the Topic regexfield, enter a regular expression.

  7. Click Datasetand specify a BigQuery dataset. You can choose an existing dataset or create a new one.

  8. Optional: In the Configurationsbox, add configuration properties or edit the default properties. For more information, see Configure the connector .

  9. Select the Task restart policy. For more information, see Task restart policy .

  10. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud managed-kafka connectors create command:

     gcloud  
    managed-kafka  
    connectors  
    create  
     CONNECTOR_ID 
      
     \ 
      
    --location = 
     LOCATION 
      
     \ 
      
    --connect-cluster = 
     CONNECT_CLUSTER_ID 
      
     \ 
      
    --config-file = 
     CONFIG_FILE 
     
    

    Replace the following:

    • CONNECTOR_ID : The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource . The name of a connector is immutable.

    • LOCATION : The location where you create the connector. This must be the same location where you created the Connect cluster .

    • CONNECT_CLUSTER_ID : The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE : The path to the YAML configuration file for the BigQuery Sink connector.

    Here is an example of a configuration file for the BigQuery Sink connector:

      name 
     : 
      
     " BQ_SINK_CONNECTOR_ID 
    " 
     project 
     : 
      
     " GCP_PROJECT_ID 
    " 
     topics 
     : 
      
     " GMK_TOPIC_ID 
    " 
     tasks.max 
     : 
      
     3 
     connector.class 
     : 
      
     "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" 
     key.converter 
     : 
      
     "org.apache.kafka.connect.storage.StringConverter" 
     value.converter 
     : 
      
     "org.apache.kafka.connect.json.JsonConverter" 
     value.converter.schemas.enable 
     : 
      
     "false" 
     defaultDataset 
     : 
      
     " BQ_DATASET_ID 
    " 
     
    

    Replace the following:

    • BQ_SINK_CONNECTOR_ID : The ID or name of the BigQuery Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource . The name of a connector is immutable.

    • GCP_PROJECT_ID : The ID of the Google Cloud project where your BigQuery dataset resides.

    • GMK_TOPIC_ID : The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.

    • BQ_DATASET_ID : The ID of the BigQuery dataset that acts as the sink for the pipeline.

Terraform

You can use a Terraform resource to create a connector .

 resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-bigquery-sink-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "name"                           = "my-bigquery-sink-connector"
    "project"                        = data.google_project.default.project_id
    "topics"                         = "GMK_TOPIC_ID"
    "tasks.max"                      = "3"
    "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
    "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
    "value.converter.schemas.enable" = "false"
    "defaultDataset"                 = "BQ_DATASET_ID"
  }

  provider = google-beta
} 

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands .

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 managedkafka 
  
 "cloud.google.com/go/managedkafka/apiv1" 
  
 "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" 
  
 "google.golang.org/api/option" 
 ) 
 // createBigQuerySinkConnector creates a BigQuery Sink connector. 
 func 
  
 createBigQuerySinkConnector 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 connectClusterID 
 , 
  
 connectorID 
 , 
  
 topics 
 , 
  
 tasksMax 
 , 
  
 keyConverter 
 , 
  
 valueConverter 
 , 
  
 valueConverterSchemasEnable 
 , 
  
 defaultDataset 
  
 string 
 , 
  
 opts 
  
 ... 
 option 
 . 
 ClientOption 
 ) 
  
 error 
  
 { 
  
 // TODO(developer): Update with your config values. Here is a sample configuration: 
  
 // projectID := "my-project-id" 
  
 // region := "us-central1" 
  
 // connectClusterID := "my-connect-cluster" 
  
 // connectorID := "BQ_SINK_CONNECTOR_ID" 
  
 // topics := "GMK_TOPIC_ID" 
  
 // tasksMax := "3" 
  
 // keyConverter := "org.apache.kafka.connect.storage.StringConverter" 
  
 // valueConverter := "org.apache.kafka.connect.json.JsonConverter" 
  
 // valueConverterSchemasEnable := "false" 
  
 // defaultDataset := "BQ_DATASET_ID" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 managedkafka 
 . 
 NewManagedKafkaConnectClient 
 ( 
 ctx 
 , 
  
 opts 
 ... 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedkafka.NewManagedKafkaConnectClient got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 parent 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/connectClusters/%s" 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 connectClusterID 
 ) 
  
 // BigQuery Sink sample connector configuration 
  
 config 
  
 := 
  
 map 
 [ 
 string 
 ] 
 string 
 { 
  
 "name" 
 : 
  
 connectorID 
 , 
  
 "project" 
 : 
  
 projectID 
 , 
  
 "topics" 
 : 
  
 topics 
 , 
  
 "tasks.max" 
 : 
  
 tasksMax 
 , 
  
 "connector.class" 
 : 
  
 "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" 
 , 
  
 "key.converter" 
 : 
  
 keyConverter 
 , 
  
 "value.converter" 
 : 
  
 valueConverter 
 , 
  
 "value.converter.schemas.enable" 
 : 
  
 valueConverterSchemasEnable 
 , 
  
 "defaultDataset" 
 : 
  
 defaultDataset 
 , 
  
 } 
  
 connector 
  
 := 
  
& managedkafkapb 
 . 
 Connector 
 { 
  
 Name 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s/connectors/%s" 
 , 
  
 parent 
 , 
  
 connectorID 
 ), 
  
 Configs 
 : 
  
 config 
 , 
  
 } 
  
 req 
  
 := 
  
& managedkafkapb 
 . 
 CreateConnectorRequest 
 { 
  
 Parent 
 : 
  
 parent 
 , 
  
 ConnectorId 
 : 
  
 connectorID 
 , 
  
 Connector 
 : 
  
 connector 
 , 
  
 } 
  
 resp 
 , 
  
 err 
  
 := 
  
 client 
 . 
 CreateConnector 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "client.CreateConnector got err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created BigQuery sink connector: %s\n" 
 , 
  
 resp 
 . 
 Name 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  import 
  
 com.google.api.gax.rpc. ApiException 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ConnectClusterName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. Connector 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ConnectorName 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. CreateConnectorRequest 
 
 ; 
 import 
  
 com.google.cloud.managedkafka.v1. ManagedKafkaConnectClient 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 public 
  
 class 
 CreateBigQuerySinkConnector 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the example. 
  
 String 
  
 projectId 
  
 = 
  
 "my-project-id" 
 ; 
  
 String 
  
 region 
  
 = 
  
 "my-region" 
 ; 
  
 // e.g. us-east1 
  
 String 
  
 connectClusterId 
  
 = 
  
 "my-connect-cluster" 
 ; 
  
 String 
  
 connectorId 
  
 = 
  
 "my-bigquery-sink-connector" 
 ; 
  
 String 
  
 bigqueryProjectId 
  
 = 
  
 "my-bigquery-project-id" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "my-dataset" 
 ; 
  
 String 
  
 kafkaTopicName 
  
 = 
  
 "kafka-topic" 
 ; 
  
 String 
  
 maxTasks 
  
 = 
  
 "3" 
 ; 
  
 String 
  
 connectorClass 
  
 = 
  
 "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" 
 ; 
  
 String 
  
 keyConverter 
  
 = 
  
 "org.apache.kafka.connect.storage.StringConverter" 
 ; 
  
 String 
  
 valueConverter 
  
 = 
  
 "org.apache.kafka.connect.json.JsonConverter" 
 ; 
  
 String 
  
 valueSchemasEnable 
  
 = 
  
 "false" 
 ; 
  
 createBigQuerySinkConnector 
 ( 
  
 projectId 
 , 
  
 region 
 , 
  
 connectClusterId 
 , 
  
 connectorId 
 , 
  
 bigqueryProjectId 
 , 
  
 datasetName 
 , 
  
 kafkaTopicName 
 , 
  
 maxTasks 
 , 
  
 connectorClass 
 , 
  
 keyConverter 
 , 
  
 valueConverter 
 , 
  
 valueSchemasEnable 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createBigQuerySinkConnector 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 connectClusterId 
 , 
  
 String 
  
 connectorId 
 , 
  
 String 
  
 bigqueryProjectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 kafkaTopicName 
 , 
  
 String 
  
 maxTasks 
 , 
  
 String 
  
 connectorClass 
 , 
  
 String 
  
 keyConverter 
 , 
  
 String 
  
 valueConverter 
 , 
  
 String 
  
 valueSchemasEnable 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // Build the connector configuration 
  
 Map<String 
 , 
  
 String 
>  
 configMap 
  
 = 
  
 new 
  
 HashMap 
<> (); 
  
 configMap 
 . 
 put 
 ( 
 "name" 
 , 
  
 connectorId 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "project" 
 , 
  
 bigqueryProjectId 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "topics" 
 , 
  
 kafkaTopicName 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "tasks.max" 
 , 
  
 maxTasks 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "connector.class" 
 , 
  
 connectorClass 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "key.converter" 
 , 
  
 keyConverter 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "value.converter" 
 , 
  
 valueConverter 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "value.converter.schemas.enable" 
 , 
  
 valueSchemasEnable 
 ); 
  
 configMap 
 . 
 put 
 ( 
 "defaultDataset" 
 , 
  
 datasetName 
 ); 
  
  Connector 
 
  
 connector 
  
 = 
  
  Connector 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
  ConnectorName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 connectClusterId 
 , 
  
 connectorId 
 ). 
 toString 
 ()) 
  
 . 
 putAllConfigs 
 ( 
 configMap 
 ) 
  
 . 
 build 
 (); 
  
 try 
  
 ( 
  ManagedKafkaConnectClient 
 
  
 managedKafkaConnectClient 
  
 = 
  
  ManagedKafkaConnectClient 
 
 . 
 create 
 ()) 
  
 { 
  
  CreateConnectorRequest 
 
  
 request 
  
 = 
  
  CreateConnectorRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
  ConnectClusterName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 connectClusterId 
 ). 
 toString 
 ()) 
  
 . 
  setConnectorId 
 
 ( 
 connectorId 
 ) 
  
 . 
 setConnector 
 ( 
 connector 
 ) 
  
 . 
 build 
 (); 
  
 // This operation is being handled synchronously. 
  
  Connector 
 
  
 response 
  
 = 
  
 managedKafkaConnectClient 
 . 
 createConnector 
 ( 
 request 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Created BigQuery Sink connector: %s\n" 
 , 
  
 response 
 . 
  getName 
 
 ()); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
  ApiException 
 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 printf 
 ( 
 "managedKafkaConnectClient.createConnector got err: %s\n" 
 , 
  
 e 
 . 
  getMessage 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .

  from 
  
 google.api_core.exceptions 
  
 import 
 GoogleAPICallError 
 from 
  
 google.cloud.managedkafka_v1.services.managed_kafka_connect 
  
 import 
 ( 
 ManagedKafkaConnectClient 
 , 
 ) 
 from 
  
 google.cloud.managedkafka_v1.types 
  
 import 
 Connector 
 , 
 CreateConnectorRequest 
 connect_client 
 = 
 ManagedKafkaConnectClient 
 () 
 parent 
 = 
 connect_client 
 . 
 connect_cluster_path 
 ( 
 project_id 
 , 
 region 
 , 
 connect_cluster_id 
 ) 
 configs 
 = 
 { 
 "name" 
 : 
 connector_id 
 , 
 "project" 
 : 
 project_id 
 , 
 "topics" 
 : 
 topics 
 , 
 "tasks.max" 
 : 
 tasks_max 
 , 
 "connector.class" 
 : 
 "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" 
 , 
 "key.converter" 
 : 
 key_converter 
 , 
 "value.converter" 
 : 
 value_converter 
 , 
 "value.converter.schemas.enable" 
 : 
 value_converter_schemas_enable 
 , 
 "defaultDataset" 
 : 
 default_dataset 
 , 
 } 
 connector 
 = 
 Connector 
 () 
 connector 
 . 
 name 
 = 
 connector_id 
 connector 
 . 
 configs 
 = 
 configs 
 request 
 = 
 CreateConnectorRequest 
 ( 
 parent 
 = 
 parent 
 , 
 connector_id 
 = 
 connector_id 
 , 
 connector 
 = 
 connector 
 , 
 ) 
 try 
 : 
 operation 
 = 
 connect_client 
 . 
 create_connector 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Waiting for operation 
 { 
 operation 
 . 
 operation 
 . 
 name 
 } 
 to complete..." 
 ) 
 response 
 = 
 operation 
 . 
 result 
 () 
 print 
 ( 
 "Created Connector:" 
 , 
 response 
 ) 
 except 
 GoogleAPICallError 
 as 
 e 
 : 
 print 
 ( 
 f 
 "The operation failed with error: 
 { 
 e 
 } 
 " 
 ) 
 

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

Configure the connector

This section describes some configuration properties that you can set on the connector. For a complete list of the properties that are specific to this connector, see BigQuery Sink connector configs .

Table name

By default, the connector uses the topic name as the BigQuery table name. To use a different table name, set the topic2TableMap property with the following format:

  topic2TableMap= TOPIC_1 
: TABLE_1 
, TOPIC_2 
: TABLE_2 
,... 
 

Table creation

The BigQuery Sink connector can create the destination tables if they don't exist.

  • If autoCreateTables=true , the connector attempts to create any BigQuery tables that don't exist. This setting is the default behavior.

  • If autoCreateTables=false , the connector doesn't create any tables. If a destination table doesn't exist, then an error occurs.

When autoCreateTables is true , you can use the following configuration properties for more fine-grained control over how the connector creates and configures new tables:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

For information about these properties, see BigQuery Sink connector configs .

You can map additional data from Kafka such as metadata information and key information into the BigQuery table by configuring the kafkaDataFieldName and kafkaKeyFieldName fields respectively. Examples of metadata information include the Kafka topic, partition, offset, and insert time.

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Create a Mobile Website
View Site in Mobile | Classic
Share by: