/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.v2.templates
;
import
com.google.api.client.json. JsonFactory
;
import
com.google.api.services.bigquery.model.TableRow
;
import
com.google.cloud.teleport.metadata.MultiTemplate
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.v2.coders.FailsafeElementCoder
;
import
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger
;
import
com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions
;
import
com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.TextToBigQueryStreamingOptions
;
import
com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow
;
import
com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors
;
import
com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf
;
import
com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer
;
import
com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn
;
import
com.google.cloud.teleport.v2.utils.BigQueryIOUtils
;
import
com.google.cloud.teleport.v2.utils.GCSUtils
;
import
com.google.cloud.teleport.v2.utils.ResourceUtils
;
import
com.google.cloud.teleport.v2. values
.FailsafeElement
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.ImmutableList
;
import
java.io.IOException
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.coders.CoderRegistry
;
import
org.apache.beam.sdk.coders.StringUtf8Coder
;
import
org.apache.beam.sdk.extensions.gcp.util.Transport
;
import
org.apache.beam.sdk.io.TextIO
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError
;
import
org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy
;
import
org.apache.beam.sdk.io.gcp.bigquery.WriteResult
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
;
import
org.apache.beam.sdk.transforms.Flatten
;
import
org.apache.beam.sdk.transforms.MapElements
;
import
org.apache.beam.sdk.transforms.ParDo
;
import
org.apache.beam.sdk.transforms.Watch.Growth
;
import
org.apache.beam.sdk. values
.PCollection
;
import
org.apache.beam.sdk. values
.PCollectionList
;
import
org.apache.beam.sdk. values
.PCollectionTuple
;
import
org.apache.beam.sdk. values
.TupleTag
;
import
org.apache.beam.sdk. values
.TupleTagList
;
import
org.apache.commons.lang3.StringUtils
;
import
org.joda.time.Duration
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline
* that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline
* continuously polls for new files, reads them row-by-row and processes each record into BigQuery.
* The polling interval is set at 10 seconds.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Stream_GCS_Text_to_BigQuery_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@MultiTemplate
({
@Template
(
name
=
"Stream_GCS_Text_to_BigQuery_Flex"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Cloud Storage Text to BigQuery (Stream)"
,
description
=
{
"The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.\n"
,
"The pipeline runs indefinitely and needs to be terminated manually via a\n"
+
" <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
+
" <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
+
" <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
+
" draining."
},
skipOptions
=
{
"pythonExternalTextTransfromGcsPath"
,
"pythonExternalTextTransformFunctionName"
},
optionsClass
=
TextToBigQueryStreamingOptions
.
class
,
flexContainerName
=
"text-to-bigquery-streaming"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream"
,
contactInformation
=
"https://cloud.google.com/support"
,
requirements
=
{
"Create a JSON file that describes the schema of your output table in BigQuery.\n"
+
" <p>\n"
+
" Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
+
" contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
+
" For example:\n"
+
" </p>\n"
+
"<pre class=\"prettyprint lang-json\">\n"
+
"{\n"
+
" \"fields\": [\n"
+
" {\n"
+
" \"name\": \"location\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"name\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"age\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"color\",\n"
+
" \"type\": \"STRING\",\n"
+
" \"mode\": \"REQUIRED\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"coffee\",\n"
+
" \"type\": \"STRING\",\n"
+
" \"mode\": \"REQUIRED\"\n"
+
" }\n"
+
" ]\n"
+
"}\n"
+
"</pre>"
,
"Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
+
" to transform the lines of text. Note that your function must return a JSON string.\n"
+
" <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
+
" transforming the values.</p>\n"
+
"<pre class=\"prettyprint\" suppresswarning>\n"
+
"function transform(line) {\n"
+
"var values = line.split(',');\n"
+
"\n"
+
"var obj = new Object();\n"
+
"obj.location = values[0];\n"
+
"obj.name = values[1];\n"
+
"obj.age = values[2];\n"
+
"obj.color = values[3];\n"
+
"obj.coffee = values[4];\n"
+
"var jsonString = JSON.stringify(obj);\n"
+
"\n"
+
"return jsonString;\n"
+
"}\n"
+
"</pre>"
},
streaming
=
true
,
supportsAtLeastOnce
=
true
),
@Template
(
name
=
"Stream_GCS_Text_to_BigQuery_Xlang"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Cloud Storage Text to BigQuery (Stream) with Python UDF"
,
type
=
Template
.
TemplateType
.
XLANG
,
description
=
{
"The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to BigQuery.\n"
,
"The pipeline runs indefinitely and needs to be terminated manually via a\n"
+
" <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
+
" <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
+
" <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
+
" draining."
},
skipOptions
=
{
"javascriptTextTransformGcsPath"
,
"javascriptTextTransformFunctionName"
,
"javascriptTextTransformReloadIntervalMinutes"
},
optionsClass
=
TextToBigQueryStreamingOptions
.
class
,
flexContainerName
=
"text-to-bigquery-streaming-xlang"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream"
,
contactInformation
=
"https://cloud.google.com/support"
,
requirements
=
{
"Create a JSON file that describes the schema of your output table in BigQuery.\n"
+
" <p>\n"
+
" Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
+
" contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
+
" For example:\n"
+
" </p>\n"
+
"<pre class=\"prettyprint lang-json\">\n"
+
"{\n"
+
" \"fields\": [\n"
+
" {\n"
+
" \"name\": \"location\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"name\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"age\",\n"
+
" \"type\": \"STRING\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"color\",\n"
+
" \"type\": \"STRING\",\n"
+
" \"mode\": \"REQUIRED\"\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"coffee\",\n"
+
" \"type\": \"STRING\",\n"
+
" \"mode\": \"REQUIRED\"\n"
+
" }\n"
+
" ]\n"
+
"}\n"
+
"</pre>"
,
"Create a Python (<code>.js</code>) file with your UDF function that supplies the logic\n"
+
" to transform the lines of text. Note that your function must return a JSON string.\n"
+
" <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
+
" transforming the values.</p>\n"
+
"<pre class=\"prettyprint\" suppresswarning>\n"
+
"import json\n"
+
"def transform(line): \n"
+
" values = line.split(',')\n"
+
"\n"
+
" obj = {\n"
+
" 'location' : values[0],\n"
+
" 'name' : values[1],\n"
+
" 'age' : values[2],\n"
+
" 'color' : values[3],\n"
+
" 'coffee' : values[4]\n"
+
" }\n"
+
" jsonString = JSON.dumps(obj);\n"
+
"\n"
+
" return jsonString;\n"
+
"\n"
+
"</pre>"
},
streaming
=
true
,
supportsAtLeastOnce
=
true
)
})
public
class
TextToBigQueryStreaming
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TextToBigQueryStreaming
.
class
);
/** The tag for the main output for the UDF. */
private
static
final
TupleTag<FailsafeElement<String
,
String
>>
UDF_OUT
=
new
TupleTag<FailsafeElement<String
,
String
>> ()
{};
/** The tag for the dead-letter output of the udf. */
private
static
final
TupleTag<FailsafeElement<String
,
String
>>
UDF_DEADLETTER_OUT
=
new
TupleTag<FailsafeElement<String
,
String
>> ()
{};
/** The tag for the main output of the json transformation. */
private
static
final
TupleTag<TableRow>
TRANSFORM_OUT
=
new
TupleTag<TableRow>
()
{};
/** The tag for the dead-letter output of the json to table row transform. */
private
static
final
TupleTag<FailsafeElement<String
,
String
>>
TRANSFORM_DEADLETTER_OUT
=
new
TupleTag<FailsafeElement<String
,
String
>> ()
{};
/** The default suffix for error tables if dead letter table is not specified. */
private
static
final
String
DEFAULT_DEADLETTER_TABLE_SUFFIX
=
"_error_records"
;
/** Default interval for polling files in GCS. */
private
static
final
Duration
DEFAULT_POLL_INTERVAL
=
Duration
.
standardSeconds
(
10
);
/** Coder for FailsafeElement. */
private
static
final
FailsafeElementCoder<String
,
String
>
FAILSAFE_ELEMENT_CODER
=
FailsafeElementCoder
.
of
(
StringUtf8Coder
.
of
(),
StringUtf8Coder
.
of
());
private
static
final
JsonFactory
JSON_FACTORY
=
Transport
.
getJsonFactory
();
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public
static
void
main
(
String
[]
args
)
{
UncaughtExceptionLogger
.
register
();
// Parse the user options passed from the command-line
TextToBigQueryStreamingOptions
options
=
PipelineOptionsFactory
.
fromArgs
(
args
)
.
withValidation
()
.
as
(
TextToBigQueryStreamingOptions
.
class
);
BigQueryIOUtils
.
validateBQStorageApiOptionsStreaming
(
options
);
run
(
options
);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public
static
PipelineResult
run
(
TextToBigQueryStreamingOptions
options
)
{
// Create the pipeline
Pipeline
pipeline
=
Pipeline
.
create
(
options
);
// Register the coder for pipeline
FailsafeElementCoder<String
,
String
>
coder
=
FailsafeElementCoder
.
of
(
StringUtf8Coder
.
of
(),
StringUtf8Coder
.
of
());
CoderRegistry
coderRegistry
=
pipeline
.
getCoderRegistry
();
coderRegistry
.
registerCoderForType
(
coder
.
getEncodedTypeDescriptor
(),
coder
);
// Determine if we are using Python UDFs or JS UDFs based on the provided options.
boolean
useJavascriptUdf
=
!
Strings
.
isNullOrEmpty
(
options
.
getJavascriptTextTransformGcsPath
());
boolean
usePythonUdf
=
!
Strings
.
isNullOrEmpty
(
options
.
getPythonExternalTextTransformGcsPath
());
if
(
useJavascriptUdf
&&
usePythonUdf
)
{
throw
new
IllegalArgumentException
(
"Either javascript or Python gcs path must be provided, but not both."
);
}
/*
* Steps:
* 1) Read from the text source continuously.
* 2) Convert to FailsafeElement.
* 3) Apply Javascript udf transformation.
* - Tag records that were successfully transformed and those
* that failed transformation.
* 4) Convert records to TableRow.
* - Tag records that were successfully converted and those
* that failed conversion.
* 5) Insert successfully converted records into BigQuery.
* - Errors encountered while streaming will be sent to deadletter table.
* 6) Insert records that failed into deadletter table.
*/
PCollection<String>
sourceRead
=
pipeline
.
apply
(
TextIO
.
read
()
.
from
(
options
.
getInputFilePattern
())
.
watchForNewFiles
(
DEFAULT_POLL_INTERVAL
,
Growth
.
never
()));
PCollectionTuple
transformedOutput
;
if
(
usePythonUdf
)
{
transformedOutput
=
sourceRead
.
apply
(
"MapToRecord"
,
PythonExternalTextTransformer
.
FailsafeRowPythonExternalUdf
.
stringMappingFunction
())
.
setRowSchema
(
PythonExternalTextTransformer
.
FailsafeRowPythonExternalUdf
.
ROW_SCHEMA
)
.
apply
(
"InvokeUDF"
,
PythonExternalTextTransformer
.
FailsafePythonExternalUdf
.
newBuilder
()
.
setFileSystemPath
(
options
.
getPythonExternalTextTransformGcsPath
())
.
setFunctionName
(
options
.
getPythonExternalTextTransformFunctionName
())
.
build
())
.
apply
(
ParDo
.
of
(
new
RowToStringFailsafeElementFn
(
UDF_OUT
,
UDF_DEADLETTER_OUT
))
.
withOutputTags
(
UDF_OUT
,
TupleTagList
.
of
(
UDF_DEADLETTER_OUT
)));
}
else
{
transformedOutput
=
pipeline
// 1) Read from the text source continuously.
.
apply
(
"ReadFromSource"
,
TextIO
.
read
()
.
from
(
options
.
getInputFilePattern
())
.
watchForNewFiles
(
DEFAULT_POLL_INTERVAL
,
Growth
.
never
()))
// 2) Convert to FailsafeElement.
.
apply
(
"ConvertToFailsafeElement"
,
MapElements
.
into
(
FAILSAFE_ELEMENT_CODER
.
getEncodedTypeDescriptor
())
.
via
(
input
-
>
FailsafeElement
.
of
(
input
,
input
)))
// 3) Apply Javascript udf transformation.
.
apply
(
"ApplyUDFTransformation"
,
FailsafeJavascriptUdf
.
< String>newBuilder
()
.
setFileSystemPath
(
options
.
getJavascriptTextTransformGcsPath
())
.
setFunctionName
(
options
.
getJavascriptTextTransformFunctionName
())
.
setReloadIntervalMinutes
(
options
.
getJavascriptTextTransformReloadIntervalMinutes
())
.
setSuccessTag
(
UDF_OUT
)
.
setFailureTag
(
UDF_DEADLETTER_OUT
)
.
build
());
}
PCollectionTuple
convertedTableRows
=
transformedOutput
// 4) Convert records to TableRow.
.
get
(
UDF_OUT
)
.
apply
(
"ConvertJSONToTableRow"
,
FailsafeJsonToTableRow
.
< String>newBuilder
()
.
setSuccessTag
(
TRANSFORM_OUT
)
.
setFailureTag
(
TRANSFORM_DEADLETTER_OUT
)
.
build
());
WriteResult
writeResult
=
convertedTableRows
// 5) Insert successfully converted records into BigQuery.
.
get
(
TRANSFORM_OUT
)
.
apply
(
"InsertIntoBigQuery"
,
BigQueryIO
.
writeTableRows
()
.
withJsonSchema
(
GCSUtils
.
getGcsFileAsString
(
options
.
getJSONPath
()))
.
to
(
options
.
getOutputTable
())
.
withExtendedErrorInfo
()
.
withoutValidation
()
.
withCreateDisposition
(
CreateDisposition
.
CREATE_IF_NEEDED
)
.
withWriteDisposition
(
WriteDisposition
.
WRITE_APPEND
)
.
withFailedInsertRetryPolicy
(
InsertRetryPolicy
.
retryTransientErrors
())
.
withCustomGcsTempLocation
(
StaticValueProvider
.
of
(
options
.
getBigQueryLoadingTemporaryDirectory
())));
// Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
PCollection<FailsafeElement<String
,
String
>>
failedInserts
=
BigQueryIOUtils
.
writeResultToBigQueryInsertErrors
(
writeResult
,
options
)
.
apply
(
"WrapInsertionErrors"
,
MapElements
.
into
(
FAILSAFE_ELEMENT_CODER
.
getEncodedTypeDescriptor
())
.
via
(
TextToBigQueryStreaming
::
wrapBigQueryInsertError
));
// 6) Insert records that failed transformation or conversion into deadletter table
PCollectionList
.
of
(
ImmutableList
.
of
(
transformedOutput
.
get
(
UDF_DEADLETTER_OUT
),
convertedTableRows
.
get
(
TRANSFORM_DEADLETTER_OUT
),
failedInserts
))
.
apply
(
"Flatten"
,
Flatten
.
pCollections
())
.
apply
(
"WriteFailedRecords"
,
WriteStringMessageErrors
.
newBuilder
()
.
setErrorRecordsTable
(
StringUtils
.
isNotEmpty
(
options
.
getOutputDeadletterTable
())
?
options
.
getOutputDeadletterTable
()
:
options
.
getOutputTable
()
+
DEFAULT_DEADLETTER_TABLE_SUFFIX
)
.
setErrorRecordsTableSchema
(
ResourceUtils
.
getDeadletterTableSchemaJson
())
.
build
());
return
pipeline
.
run
();
}
/**
* Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
*
* @param insertError BigQueryInsert error.
* @return FailsafeElement object.
* @throws IOException
*/
static
FailsafeElement<String
,
String
>
wrapBigQueryInsertError
(
BigQueryInsertError
insertError
)
{
FailsafeElement<String
,
String
>
failsafeElement
;
try
{
String
rowPayload
=
JSON_FACTORY
.
toString
(
insertError
.
getRow
());
String
errorMessage
=
JSON_FACTORY
.
toString
(
insertError
.
getError
());
failsafeElement
=
FailsafeElement
.
of
(
rowPayload
,
rowPayload
);
failsafeElement
.
setErrorMessage
(
errorMessage
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
return
failsafeElement
;
}
/**
* The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
* by the executor at the command-line.
*/
public
interface
TextToBigQueryStreamingOptions
extends
TextIOToBigQuery
.
Options
,
BigQueryStorageApiStreamingOptions
{
@TemplateParameter.BigQueryTable
(
order
=
1
,
optional
=
true
,
description
=
"The dead-letter table name to output failed messages to BigQuery"
,
helpText
=
"Table for messages that failed to reach the output table. If a table doesn't exist, it is created during "
+
"pipeline execution. If not specified, `<outputTableSpec>_error_records` is used."
,
example
=
"<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>"
)
String
getOutputDeadletterTable
();
void
setOutputDeadletterTable
(
String
value
);
// Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
// on when pipeline is running on ALO mode and using the Storage Write API
@TemplateParameter.Boolean
(
order
=
2
,
optional
=
true
,
parentName
=
"useStorageWriteApi"
,
parentTriggerValues
=
{
"true"
},
description
=
"Use at at-least-once semantics in BigQuery Storage Write API"
,
helpText
=
"This parameter takes effect only if `Use BigQuery Storage Write API` is enabled. If"
+
" enabled the at-least-once semantics will be used for Storage Write API, otherwise"
+
" exactly-once semantics will be used."
,
hiddenUi
=
true
)
@Default.Boolean
(
false
)
@Override
Boolean
getUseStorageWriteApiAtLeastOnce
();
void
setUseStorageWriteApiAtLeastOnce
(
Boolean
value
);
}
}