The BigQuery to Elasticsearch template is a batch pipeline that ingests data from a BigQuery table into Elasticsearch as documents. The template can either read the entire table or read specific records using a supplied query.
Pipeline requirements
- The source BigQuery table must exist.
- A Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or later. Must be accessible from the Dataflow worker machines.
Template parameters
Required parameters
- connectionUrl: The Elasticsearch URL in the format
https://hostname:[port]
. If using Elastic Cloud, specify the CloudID. For example,https://elasticsearch-host:9200
. - apiKey: The Base64-encoded API key to use for authentication.
- index: The Elasticsearch index that the requests are issued to. For example,
my-index
.
Optional parameters
- inputTableSpec: The BigQuery table to read from. If you specify
inputTableSpec
, the template reads the data directly from BigQuery storage by using the BigQuery Storage Read API ( https://cloud.google.com/bigquery/docs/reference/storage ). For information about limitations in the Storage Read API, see https://cloud.google.com/bigquery/docs/reference/storage#limitations . You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. For example,<BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
. - outputDeadletterTable: The BigQuery table for messages that failed to reach the output table. If a table doesn't exist, it is created during pipeline execution. If not specified,
<outputTableSpec>_error_records
is used. For example,<PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
. - query: The SQL query to use to read data from BigQuery. If the BigQuery dataset is in a different project than the Dataflow job, specify the full dataset name in the SQL query, for example: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. By default, the
query
parameter uses GoogleSQL ( https://cloud.google.com/bigquery/docs/introduction-sql ), unlessuseLegacySql
istrue
. You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. For example,select * from sampledb.sample_table
. - useLegacySql: Set to
true
to use legacy SQL. This parameter only applies when using thequery
parameter. Defaults tofalse
. - queryLocation: Needed when reading from an authorized view without underlying table's permission. For example,
US
. - queryTempDataset: With this option, you can set an existing dataset to create the temporary table to store the results of the query. For example,
temp_dataset
. - KMSEncryptionKey: If reading from BigQuery using query source, use this Cloud KMS key to encrypt any temporary tables created. For example,
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
. - elasticsearchUsername: The Elasticsearch username to authenticate with. If specified, the value of
apiKey
is ignored. - elasticsearchPassword: The Elasticsearch password to authenticate with. If specified, the value of
apiKey
is ignored. - batchSize: The batch size in number of documents. Defaults to
1000
. - batchSizeBytes: The batch size in number of bytes. Defaults to
5242880
(5mb). - maxRetryAttempts: The maximum number of retry attempts. Must be greater than zero. Defaults to
no retries
. - maxRetryDuration: The maximum retry duration in milliseconds. Must be greater than zero. Defaults to
no retries
. - propertyAsIndex: The property in the document being indexed whose value specifies
_index
metadata to include with the document in bulk requests. Takes precedence over an_index
UDF. Defaults tonone
. - javaScriptIndexFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIndexFnName: The name of the UDF JavaScript function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - propertyAsId: A property in the document being indexed whose value specifies
_id
metadata to include with the document in bulk requests. Takes precedence over an_id
UDF. Defaults tonone
. - javaScriptIdFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that specifies
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIdFnName: The name of the UDF JavaScript function that specifies the
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptTypeFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_type
metadata to include with documents in bulk requests. Defaults tonone
. - javaScriptTypeFnName: The name of the UDF JavaScript function that specifies the
_type
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIsDeleteFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - javaScriptIsDeleteFnName: The name of the UDF JavaScript function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - usePartialUpdate: Whether to use partial updates (update rather than create or index, allowing partial documents) with Elasticsearch requests. Defaults to
false
. - bulkInsertMethod: Whether to use
INDEX
(index, allows upserts) orCREATE
(create, errors on duplicate _id) with Elasticsearch bulk requests. Defaults toCREATE
. - trustSelfSignedCerts: Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to true to by-pass the validation on SSL certificate. (Defaults to:
false
). - disableCertificateValidation: If
true
, trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter totrue
. Defaults tofalse
. - apiKeyKMSEncryptionKey: The Cloud KMS key to decrypt the API key. This parameter is required if the
apiKeySource
is set toKMS
. If this parameter is provided, pass in an encryptedapiKey
string. Encrypt parameters using the KMS API encrypt endpoint. For the key, use the formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt For example,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: The Secret Manager secret ID for the apiKey. If the
apiKeySource
is set toSECRET_MANAGER
, provide this parameter. Use the formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: The source of the API key. Allowed values are
PLAINTEXT
,KMS
orandSECRET_MANAGER
. This parameter is required when you use Secret Manager or KMS. IfapiKeySource
is set toKMS
,apiKeyKMSEncryptionKey
and encrypted apiKey must be provided. IfapiKeySource
is set toSECRET_MANAGER
,apiKeySecretId
must be provided. IfapiKeySource
is set toPLAINTEXT
,apiKey
must be provided. Defaults to: PLAINTEXT. - socketTimeout: If set, overwrites the default max retry timeout and default socket timeout (30000ms) in the Elastic RestClient.
- javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples ( https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples ).
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates .
Index function
Returns the index to which the document belongs.
Template parameters:
-
javaScriptIndexFnGcsPath
: the Cloud Storage URI of the JavaScript file. -
javaScriptIndexFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_index
metadata field.
Document ID function
Returns the document ID.
Template parameters:
-
javaScriptIdFnGcsPath
: the Cloud Storage URI of the JavaScript file. -
javaScriptIdFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_id
metadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX
and provide a document ID function
.
Template parameters:
-
javaScriptIsDeleteFnGcsPath
: the Cloud Storage URI of the JavaScript file. -
javaScriptIsDeleteFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"
to delete the document, or"false"
to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
-
javaScriptTypeFnGcsPath
: the Cloud Storage URI of the JavaScript file. -
javaScriptTypeFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_type
metadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint
, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations .
- From the Dataflow template drop-down menu, select the BigQuery to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job .
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project = PROJECT_ID \ --region = REGION_NAME \ --template-file-gcs-location = gs://dataflow-templates- REGION_NAME / VERSION /flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec = INPUT_TABLE_SPEC , \ connectionUrl = CONNECTION_URL , \ apiKey = APIKEY , \ index = INDEX
Replace the following:
-
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow job -
JOB_NAME
: a unique job name of your choice -
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
-
VERSION
: the version of the template that you want to useYou can use the following values:
-
latest
to use the latest version of the template, which is available in the non-datedparent folder in the bucket— gs://dataflow-templates- REGION_NAME /latest/ - the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates- REGION_NAME /
-
-
INPUT_TABLE_SPEC
: your BigQuery table name. -
CONNECTION_URL
: your Elasticsearch URL. -
APIKEY
: your base64 encoded API key for authentication. -
INDEX
: your Elasticsearch index.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see projects.templates.launch
.
POST h tt ps : //dataflow.googleapis.com/v1b3/projects/ PROJECT_ID /locations/ LOCATION /flexTemplates:launch { "launch_parameter" : { "jobName" : " JOB_NAME " , "parameters" : { "inputTableSpec" : " INPUT_TABLE_SPEC " , "connectionUrl" : " CONNECTION_URL " , "apiKey" : " APIKEY " , "index" : " INDEX " }, "containerSpecGcsPath" : "gs://dataflow-templates- LOCATION / VERSION /flex/BigQuery_to_Elasticsearch" , } }
Replace the following:
-
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow job -
JOB_NAME
: a unique job name of your choice -
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
-
VERSION
: the version of the template that you want to useYou can use the following values:
-
latest
to use the latest version of the template, which is available in the non-datedparent folder in the bucket— gs://dataflow-templates- REGION_NAME /latest/ - the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates- REGION_NAME /
-
-
INPUT_TABLE_SPEC
: your BigQuery table name. -
CONNECTION_URL
: your Elasticsearch URL. -
APIKEY
: your base64 encoded API key for authentication. -
INDEX
: your Elasticsearch index.
What's next
- Learn about Dataflow templates .
- See the list of Google-provided templates .