BigQuery Sink connectors let you stream data from Kafka into BigQuery, enabling real-time data ingestion and analysis within BigQuery. A BigQuery Sink connector consumes records from one or more Kafka topics, and writes the data to one or more tables within a single BigQuery dataset.
Before you begin
Before creating a BigQuery Sink connector, ensure you have the following:
-
Create a Managed Service for Apache Kafka cluster for your Connect cluster. This cluster is the primary Kafka cluster associated with the Connect cluster. This cluster is also the source cluster that forms one end of the BigQuery Sink connector pipeline.
-
Create a Connect cluster to host your BigQuery Sink connector.
-
Create a BigQuery dataset to store the data streamed from Kafka.
-
Create and configure a Kafka topic within the source cluster. Data moves from this Kafka topic to the destination BigQuery dataset.
Required roles and permissions
To get the permissions that
you need to create a BigQuery Sink connector,
ask your administrator to grant you the Managed Kafka Connector Editor
( roles/managedkafka.connectorEditor
)
IAM role on your project.
For more information about granting roles, see Manage access to projects, folders, and organizations
.
This predefined role contains the permissions required to create a BigQuery Sink connector. To see the exact permissions that are required, expand the Required permissionssection:
Required permissions
The following permissions are required to create a BigQuery Sink connector:
- Grant the create a connector permission on the parent Connect cluster:
managedkafka.connectors.create
You might also be able to get these permissions with custom roles or other predefined roles .
For more information about the Managed Kafka Connector Editorrole, see Managed Service for Apache Kafka predefined roles .
If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the cluster is in a different project, refer to Create a Connect Cluster in a different project .
Grant permissions to write to the BigQuery table
The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com
,
requires permission to write to the BigQuery table. To do so,
grant the BigQuery Data Editor( roles/bigquery.dataEditor
)
role to the Connect cluster service account on the project containing the
BigQuery table.
Schemas for a BigQuery Sink connector
The BigQuery Sink connector uses the configured value converter
( value.converter
) to parse Kafka record values into fields. Then it writes the
fields to columns of the same name in the BigQuery table.
The connector requires a schema to operate. The schema can be provided in the following ways:
- Message-based schema: The schema is included as part of each message.
- Table-based schema: The connector infers the message schema from the BigQuery table schema.
- Schema registry: The connector reads the schema from a schema registry, such as Managed Service for Apache Kafka schema registry ( Preview ).
The next sections describe these options.
Message-based schema
In this mode, each Kafka record includes a JSON schema. The connector uses the schema to write the record data as a BigQuery table row.
To use message-based schemas, set the following properties on the connector:
-
value.converter=org.apache.kafka.connect.json.JsonConverter -
value.converter.schemas.enable=true
Example Kafka record value:
{
"schema"
:
{
"type"
:
"struct"
,
"fields"
:
[
{
"field"
:
"user"
,
"type"
:
"string"
,
"optional"
:
false
},
{
"field"
:
"age"
,
"type"
:
"int64"
,
"optional"
:
false
}
]
},
"payload"
:
{
"user"
:
"userId"
,
"age"
:
30
}
}
If the destination table already exists, the BigQuery table
schema must be compatible with the embedded message schema. If autoCreateTables=true
, the connector automatically creates the destination
table if needed. For more information, see Table creation
.
If you want the connector to update the BigQuery table schema as
message schemas change, set allowNewBigQueryFields
, allowSchemaUnionization
,
or allowBigQueryRequiredFieldRelaxation
to true
.
Table-based schema
In this mode, the Kafka records contain plain JSON data without an explicit schema. The connector infers the schema from the destination table.
Requirements:
- The BigQuery table must already exist.
- The Kafka record data must be compatible with the table schema.
- This mode doesn't support dynamic schema updates based on incoming messages.
To use table-based schemas, set the following properties on the connector:
-
value.converter=org.apache.kafka.connect.json.JsonConverter -
value.converter.schemas.enable=false -
bigQueryPartitionDecorator=false
If the BigQuery table uses time-based partitioning
with
daily partitioning, bigQueryPartitionDecorator
can be true
. Otherwise, set
this property to false
.
Example Kafka record value:
{
"user"
:
"userId"
,
"age"
:
30
}
Schema registry
In this mode, each Kafka record contains Apache Avro data, and the message schema is stored in a schema registry.
To use the BigQuery Sink connector with a schema registry, set the following properties on the connector:
-
value.converter=io.confluent.connect.avro.AvroConverter -
value.converter.schema.registry.url= SCHEMA_REGISTRY_URL
Replace SCHEMA_REGISTRY_URL
with the URL of the schema
registry.
To use the connector with Managed Service for Apache Kafka schema registry, set the following property:
-
value.converter.bearer.auth.credentials.source=GCP
For more information, see Use Kafka Connect with schema registry .
BigLake tables for Apache Iceberg in BigQuery
The BigQuery Sink connector supports BigLake tables for Apache Iceberg in BigQuery (hereafter, BigLake Iceberg tables in BigQuery ) as a sink target.
BigLake Iceberg tables in BigQuery provide the foundation for building open-format lakehouses on Google Cloud. BigLake Iceberg tables in BigQuery offer the same fully managed experience as BigQuery tables, but store data in customer-owned storage buckets using Parquet to be interoperable with Apache Iceberg open table formats.
For information on how to create an Apache Iceberg table, see Create an Apache Iceberg table .
Create a BigQuery Sink connector
Console
-
In the Google Cloud console, go to the Connect Clusterspage.
-
Click the Connect cluster where you want to create the connector.
-
Click Create connector.
-
For the connector name, enter a string.
For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource .
-
For Connector plugin, select BigQuery Sink.
-
In the Topicssection, specify the Kafka topics to read from. You can specify a list of topics or a regular expression to match against topic names.
-
Option 1: Choose Select a list of Kafka topics. In the Kafka topicslist, select one or more topics. Click OK.
-
Option 2: Choose Use a topic regex. In the Topic regexfield, enter a regular expression.
-
-
Click Datasetand specify a BigQuery dataset. You can choose an existing dataset or create a new one.
-
Optional: In the Configurationsbox, add configuration properties or edit the default properties. For more information, see Configure the connector .
-
Select the Task restart policy. For more information, see Task restart policy .
-
Click Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Run the
gcloud managed-kafka connectors createcommand:gcloud managed-kafka connectors create CONNECTOR_ID \ --location = LOCATION \ --connect-cluster = CONNECT_CLUSTER_ID \ --config-file = CONFIG_FILEReplace the following:
-
CONNECTOR_ID : The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource . The name of a connector is immutable.
-
LOCATION : The location where you create the connector. This must be the same location where you created the Connect cluster .
-
CONNECT_CLUSTER_ID : The ID of the Connect cluster where the connector is created.
-
CONFIG_FILE : The path to the YAML configuration file for the BigQuery Sink connector.
Here is an example of a configuration file for the BigQuery Sink connector:
name : " BQ_SINK_CONNECTOR_ID " project : " GCP_PROJECT_ID " topics : " GMK_TOPIC_ID " tasks.max : 3 connector.class : "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter : "org.apache.kafka.connect.storage.StringConverter" value.converter : "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable : "false" defaultDataset : " BQ_DATASET_ID "Replace the following:
-
BQ_SINK_CONNECTOR_ID : The ID or name of the BigQuery Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource . The name of a connector is immutable.
-
GCP_PROJECT_ID : The ID of the Google Cloud project where your BigQuery dataset resides.
-
GMK_TOPIC_ID : The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.
-
BQ_DATASET_ID : The ID of the BigQuery dataset that acts as the sink for the pipeline.
-
Terraform
You can use a Terraform resource to create a connector .
To learn how to apply or remove a Terraform configuration, see Basic Terraform commands .
Go
Before trying this sample, follow the Go setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Go API reference documentation .
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment .
Java
Before trying this sample, follow the Java setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Java API reference documentation .
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .
Python
Before trying this sample, follow the Python setup instructions in Install the client libraries . For more information, see the Managed Service for Apache Kafka Python API reference documentation .
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment .
After you create a connector, you can edit, delete, pause, stop, or restart the connector.
Configure the connector
This section describes some configuration properties that you can set on the connector. For a complete list of the properties that are specific to this connector, see BigQuery Sink connector configs .
Table name
By default, the connector uses the topic name as the BigQuery
table name. To use a different table name, set the topic2TableMap
property
with the following format:
topic2TableMap= TOPIC_1
: TABLE_1
, TOPIC_2
: TABLE_2
,...
Table creation
The BigQuery Sink connector can create the destination tables if they don't exist.
-
If
autoCreateTables=true, the connector attempts to create any BigQuery tables that don't exist. This setting is the default behavior. -
If
autoCreateTables=false, the connector doesn't create any tables. If a destination table doesn't exist, then an error occurs.
When autoCreateTables
is true
, you can use the following configuration
properties for more fine-grained control over how the connector creates and
configures new tables:
-
allBQFieldsNullable -
clusteringPartitionFieldNames -
convertDoubleSpecialValues -
partitionExpirationMs -
sanitizeFieldNames -
sanitizeTopics -
timestampPartitionFieldName
For information about these properties, see BigQuery Sink connector configs .
Kafka metadata
You can map additional data from Kafka such as metadata information and
key information into the BigQuery table by configuring the kafkaDataFieldName
and kafkaKeyFieldName
fields respectively. Examples of
metadata information include the Kafka topic, partition, offset, and insert
time.

