/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.v2.templates
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption
;
import
com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager
;
import
com.google.cloud.teleport.v2.coders.FailsafeElementCoder
;
import
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger
;
import
com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO
;
import
com.google.cloud.teleport.v2.datastream.sources.DataStreamIO
;
import
com.google.cloud.teleport.v2.datastream.values.DmlInfo
;
import
com.google.cloud.teleport.v2.templates.DataStreamToSQL.Options
;
import
com.google.cloud.teleport.v2.transforms.CreateDml
;
import
com.google.cloud.teleport.v2.transforms.DLQWriteTransform
;
import
com.google.cloud.teleport.v2.transforms.ProcessDml
;
import
com.google.cloud.teleport.v2.utils.DatastreamToDML
;
import
com.google.cloud.teleport.v2.values.FailsafeElement
;
import
com.google.common.base.Splitter
;
import
java.sql.SQLException
;
import
java.util.HashMap
;
import
java.util.Map
;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.coders.KvCoder
;
import
org.apache.beam.sdk.coders.SerializableCoder
;
import
org.apache.beam.sdk.coders.StringUtf8Coder
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptions
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.options.StreamingOptions
;
import
org.apache.beam.sdk.transforms.DoFn
;
import
org.apache.beam.sdk.transforms.Flatten
;
import
org.apache.beam.sdk.transforms.ParDo
;
import
org.apache.beam.sdk.values.KV
;
import
org.apache.beam.sdk.values.PCollection
;
import
org.apache.beam.sdk.values.PCollectionList
;
import
org.apache.beam.sdk.values.PCollectionTuple
;
import
org.apache.beam.sdk.values.TupleTagList
;
import
org.joda.time.Instant
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* This pipeline ingests DataStream data from GCS. The data is then cleaned and converted from JSON
* objects into DML statements. The DML is applied to the desired target database, which can be one
* of MySQL or PostgreSQL. Replication maintains a 1:1 match between source and target by default.
* No DDL is supported in the current version of this pipeline.
*
* <p>Failures during SQL execution are captured and written to a Dead Letter Queue (DLQ) in GCS.
* The pipeline also reconsumes failed records from the DLQ for reprocessing.
*
* <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-sql/README_Cloud_Datastream_to_SQL.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template
(
name
=
"Cloud_Datastream_to_SQL"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Datastream to SQL"
,
description
=
{
"The Datastream to SQL template is a streaming pipeline that reads <a href=\"https://cloud.google.com/datastream/docs\">Datastream</a> data and replicates it into any MySQL or PostgreSQL database. "
+
"The template reads data from Cloud Storage using Pub/Sub notifications and replicates this data into SQL replica tables.\n"
,
"The template does not support data definition language (DDL) and expects that all tables already exist in the database. "
+
"Replication uses Dataflow stateful transforms to filter stale data and ensure consistency in out of order data. "
+
"For example, if a more recent version of a row has already passed through, a late arriving version of that row is ignored. "
+
"The data manipulation language (DML) that executes is a best attempt to perfectly replicate source to target data. The DML statements executed follow the following rules:\n"
,
"If a primary key exists, insert and update operations use upsert syntax (ie. <code>INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE</code>).\n"
+
"If primary keys exist, deletes are replicated as a delete DML.\n"
+
"If no primary key exists, both insert and update operations are inserted into the table.\n"
+
"If no primary keys exist, deletes are ignored.\n"
+
"If you are using the Oracle to Postgres utilities, add <code>ROWID</code> in SQL as the primary key when none exists."
},
optionsClass
=
Options
.
class
,
flexContainerName
=
"datastream-to-sql"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-sql"
,
contactInformation
=
"https://cloud.google.com/support"
,
preview
=
true
,
requirements
=
{
"A Datastream stream that is ready to or already replicating data."
,
"<a href=\"https://cloud.google.com/storage/docs/reporting-changes\">Cloud Storage Pub/Sub notifications</a> are enabled for the Datastream data."
,
"A PostgreSQL database was seeded with the required schema."
,
"Network access between Dataflow workers and PostgreSQL is set up."
},
streaming
=
true
,
supportsAtLeastOnce
=
true
)
public
class
DataStreamToSQL
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
DataStreamToSQL
.
class
);
private
static
final
String
AVRO_SUFFIX
=
"avro"
;
private
static
final
String
JSON_SUFFIX
=
"json"
;
/** String/String Coder for FailsafeElement. */
public
static
final
FailsafeElementCoder<String
,
String
>
FAILSAFE_ELEMENT_CODER
=
FailsafeElementCoder
.
of
(
StringUtf8Coder
.
of
(),
StringUtf8Coder
.
of
());
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public
interface
Options
extends
PipelineOptions
,
StreamingOptions
{
@TemplateParameter.GcsReadFile
(
order
=
1
,
groupName
=
"Source"
,
description
=
"File location for Datastream file input in Cloud Storage."
,
helpText
=
"The file location for the Datastream files in Cloud Storage to replicate. This file location is typically the root path for the stream."
)
String
getInputFilePattern
();
void
setInputFilePattern
(
String
value
);
@TemplateParameter.PubsubSubscription
(
order
=
2
,
optional
=
true
,
description
=
"The Pub/Sub subscription being used in a Cloud Storage notification policy."
,
helpText
=
"The Pub/Sub subscription with Datastream file notifications."
+
" For example, `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>`."
)
String
getGcsPubSubSubscription
();
void
setGcsPubSubSubscription
(
String
value
);
@TemplateParameter.Enum
(
order
=
3
,
enumOptions
=
{
@TemplateEnumOption
(
"avro"
),
@TemplateEnumOption
(
"json"
)},
optional
=
true
,
description
=
"Datastream output file format (avro/json)."
,
helpText
=
"The format of the output file produced by Datastream. For example, `avro` or `json`. Defaults to `avro`."
)
@Default.String
(
"avro"
)
String
getInputFileFormat
();
void
setInputFileFormat
(
String
value
);
@TemplateParameter.Text
(
order
=
4
,
groupName
=
"Source"
,
optional
=
true
,
description
=
"Name or template for the stream to poll for schema information."
,
helpText
=
"The name or template for the stream to poll for schema information. The default value is `{_metadata_stream}`."
)
String
getStreamName
();
void
setStreamName
(
String
value
);
@TemplateParameter.DateTime
(
order
=
5
,
optional
=
true
,
description
=
"The starting DateTime used to fetch from Cloud Storage "
+
"(https://tools.ietf.org/html/rfc3339)."
,
helpText
=
"The starting DateTime used to fetch from Cloud Storage "
+
"(https://tools.ietf.org/html/rfc3339)."
)
@Default.String
(
"1970-01-01T00:00:00.00Z"
)
String
getRfcStartDateTime
();
void
setRfcStartDateTime
(
String
value
);
// DataStream API Root Url (only used for testing)
@TemplateParameter.Text
(
order
=
6
,
optional
=
true
,
description
=
"Datastream API Root URL (only required for testing)"
,
helpText
=
"Datastream API Root URL"
)
@Default.String
(
"https://datastream.googleapis.com/"
)
String
getDataStreamRootUrl
();
void
setDataStreamRootUrl
(
String
value
);
// SQL Connection Parameters
@TemplateParameter.Enum
(
order
=
7
,
optional
=
true
,
enumOptions
=
{
@TemplateEnumOption
(
"postgres"
),
@TemplateEnumOption
(
"mysql"
)},
description
=
"SQL Database Type (postgres or mysql)."
,
helpText
=
"The database type to write to (for example, Postgres)."
)
@Default.String
(
"postgres"
)
String
getDatabaseType
();
void
setDatabaseType
(
String
value
);
@TemplateParameter.Text
(
order
=
8
,
groupName
=
"Target"
,
description
=
"Database Host to connect on."
,
helpText
=
"The SQL host to connect on."
)
String
getDatabaseHost
();
void
setDatabaseHost
(
String
value
);
@TemplateParameter.Text
(
order
=
9
,
groupName
=
"Target"
,
optional
=
true
,
description
=
"Database Port to connect on."
,
helpText
=
"The SQL database port to connect to. The default value is `5432`."
)
@Default.String
(
"5432"
)
String
getDatabasePort
();
void
setDatabasePort
(
String
value
);
@TemplateParameter.Text
(
order
=
10
,
description
=
"Database User to connect with."
,
helpText
=
"The SQL user with all required permissions to write to all tables in replication."
)
String
getDatabaseUser
();
void
setDatabaseUser
(
String
value
);
@TemplateParameter.Password
(
order
=
11
,
description
=
"Database Password for given user."
,
helpText
=
"The password for the SQL user."
)
String
getDatabasePassword
();
void
setDatabasePassword
(
String
value
);
@TemplateParameter.Text
(
order
=
12
,
groupName
=
"Target"
,
optional
=
true
,
description
=
"SQL Database Name."
,
helpText
=
"The name of the SQL database to connect to. The default value is `postgres`."
)
@Default.String
(
"postgres"
)
String
getDatabaseName
();
void
setDatabaseName
(
String
value
);
@TemplateParameter.Enum
(
order
=
13
,
optional
=
true
,
enumOptions
=
{
@TemplateEnumOption
(
"LOWERCASE"
),
@TemplateEnumOption
(
"UPPERCASE"
),
@TemplateEnumOption
(
"CAMEL"
),
@TemplateEnumOption
(
"SNAKE"
)
},
description
=
"Toggle for Table Casing"
,
helpText
=
"A Toggle for table casing behavior. For example,(ie."
+
"LOWERCASE = mytable -> mytable, UPPERCASE = mytable -> MYTABLE"
+
"CAMEL = my_table -> myTable, SNAKE = myTable -> my_table"
)
@Default.String
(
"LOWERCASE"
)
String
getDefaultCasing
();
void
setDefaultCasing
(
String
value
);
@TemplateParameter.Enum
(
order
=
14
,
optional
=
true
,
enumOptions
=
{
@TemplateEnumOption
(
"LOWERCASE"
),
@TemplateEnumOption
(
"UPPERCASE"
),
@TemplateEnumOption
(
"CAMEL"
),
@TemplateEnumOption
(
"SNAKE"
)
},
description
=
"Toggle for Column Casing"
,
helpText
=
"A toggle for target column name casing. "
+
"LOWERCASE (default): my_column -> my_column. "
+
"UPPERCASE: my_column -> MY_COLUMN. "
+
"CAMEL: my_column -> myColumn. "
+
"SNAKE: myColumn -> my_column."
)
@Default.String
(
"LOWERCASE"
)
String
getColumnCasing
();
void
setColumnCasing
(
String
value
);
@TemplateParameter.Text
(
order
=
15
,
optional
=
true
,
description
=
"A map of key/values used to dictate schema name changes"
,
helpText
=
"A map of key/values used to dictate schema and table name changes. "
+
"Examples: Schema to schema (SCHEMA1:SCHEMA2), "
+
"Table to table (SCHEMA1.table1:SCHEMA2.TABLE1), "
+
"or multiple mappings using the pipe '|' delimiter "
+
"(e.g. schema1.source:schema2.target|schema3.source:schema4.target)."
)
@Default.String
(
""
)
String
getSchemaMap
();
void
setSchemaMap
(
String
value
);
@TemplateParameter.Text
(
order
=
16
,
groupName
=
"Target"
,
optional
=
true
,
description
=
"Custom connection string."
,
helpText
=
"Optional connection string which will be used instead of the default database string."
)
@Default.String
(
""
)
String
getCustomConnectionString
();
void
setCustomConnectionString
(
String
value
);
@TemplateParameter.Integer
(
order
=
17
,
optional
=
true
,
description
=
"Number of threads to use for Format to DML step."
,
helpText
=
"Determines key parallelism of Format to DML step, specifically, the value is passed into Reshuffle.withNumBuckets."
)
@Default.Integer
(
100
)
int
getNumThreads
();
void
setNumThreads
(
int
value
);
@TemplateParameter.Integer
(
order
=
18
,
groupName
=
"Target"
,
optional
=
true
,
description
=
"Database login timeout in seconds."
,
helpText
=
"The timeout in seconds for database login attempts. This helps prevent connection hangs when multiple workers try to connect simultaneously."
)
Integer
getDatabaseLoginTimeout
();
void
setDatabaseLoginTimeout
(
Integer
value
);
@TemplateParameter.Boolean
(
order
=
19
,
optional
=
true
,
description
=
"Order by configurations for data should include prioritizing data which is not deleted."
,
helpText
=
"Order by configurations for data should include prioritizing data which is not deleted."
)
@Default.Boolean
(
false
)
Boolean
getOrderByIncludesIsDeleted
();
void
setOrderByIncludesIsDeleted
(
Boolean
value
);
@TemplateParameter.Text
(
order
=
20
,
optional
=
true
,
description
=
"Datastream source type override"
,
helpText
=
"Override the source type detection for Datastream CDC data. When specified, this value will be used instead of deriving the source type from the read_method field. Valid values include 'mysql', 'postgresql', 'oracle', etc. This parameter is useful when the read_method field contains 'cdc' and the actual source type cannot be determined automatically."
)
String
getDatastreamSourceType
();
void
setDatastreamSourceType
(
String
value
);
@TemplateParameter.Text
(
order
=
21
,
optional
=
true
,
description
=
"Dead letter queue directory."
,
helpText
=
"The path that Dataflow uses to write the dead-letter queue output. This path must not be in the same path as the Datastream file output. Defaults to `empty`."
)
@Default.String
(
""
)
String
getDeadLetterQueueDirectory
();
void
setDeadLetterQueueDirectory
(
String
value
);
@TemplateParameter.Integer
(
order
=
22
,
optional
=
true
,
description
=
"The number of minutes between DLQ Retries."
,
helpText
=
"The number of minutes between DLQ Retries. Defaults to `10`."
)
@Default.Integer
(
10
)
Integer
getDlqRetryMinutes
();
void
setDlqRetryMinutes
(
Integer
value
);
@TemplateParameter.Integer
(
order
=
23
,
optional
=
true
,
description
=
"The maximum number of times to retry a failed record from the DLQ."
,
helpText
=
"The maximum number of times to retry a failed record from the DLQ before marking it as a permanent failure. Defaults to 5."
)
@Default.Integer
(
5
)
Integer
getDlqMaxRetries
();
void
setDlqMaxRetries
(
Integer
value
);
@TemplateParameter.Integer
(
order
=
24
,
optional
=
true
,
description
=
"The number of minutes to cache table schemas."
,
helpText
=
"The number of minutes to cache table schemas. Defaults to 1440 (24 hours)."
)
@Default.Integer
(
1440
)
Integer
getSchemaCacheRefreshMinutes
();
void
setSchemaCacheRefreshMinutes
(
Integer
value
);
@TemplateParameter.Enum
(
order
=
25
,
optional
=
true
,
description
=
"Run mode - currently supported are : regular or retryDLQ"
,
enumOptions
=
{
@TemplateEnumOption
(
"regular"
),
@TemplateEnumOption
(
"retryDLQ"
)},
helpText
=
"This is the run mode type, whether regular or with retryDLQ."
)
@Default.String
(
"regular"
)
String
getRunMode
();
void
setRunMode
(
String
value
);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public
static
void
main
(
String
[]
args
)
{
UncaughtExceptionLogger
.
register
();
LOG
.
info
(
"Starting Datastream to SQL"
);
Options
options
=
PipelineOptionsFactory
.
fromArgs
(
args
).
withValidation
().
as
(
Options
.
class
);
options
.
setStreaming
(
true
);
run
(
options
);
}
/**
* Build the DataSourceConfiguration for the target SQL database. Using the pipeline options,
* determine the database type and create the correct jdbc connection for the requested DB.
*
* @param options The execution parameters to the pipeline.
*/
public
static
CdcJdbcIO
.
DataSourceConfiguration
getDataSourceConfiguration
(
Options
options
)
{
String
jdbcDriverName
;
String
jdbcDriverConnectionString
;
switch
(
options
.
getDatabaseType
())
{
case
"postgres"
:
jdbcDriverName
=
"org.postgresql.Driver"
;
jdbcDriverConnectionString
=
String
.
format
(
"jdbc:postgresql://%s:%s/%s"
,
options
.
getDatabaseHost
(),
options
.
getDatabasePort
(),
options
.
getDatabaseName
());
break
;
case
"mysql"
:
jdbcDriverName
=
"com.mysql.cj.jdbc.Driver"
;
jdbcDriverConnectionString
=
String
.
format
(
"jdbc:mysql://%s:%s/%s"
,
options
.
getDatabaseHost
(),
options
.
getDatabasePort
(),
options
.
getDatabaseName
());
break
;
default
:
throw
new
IllegalArgumentException
(
String
.
format
(
"Database Type %s is not supported."
,
options
.
getDatabaseType
()));
}
if
(
!
options
.
getCustomConnectionString
().
isEmpty
())
{
jdbcDriverConnectionString
=
options
.
getCustomConnectionString
();
}
CdcJdbcIO
.
DataSourceConfiguration
dataSourceConfiguration
=
CdcJdbcIO
.
DataSourceConfiguration
.
create
(
jdbcDriverName
,
jdbcDriverConnectionString
)
.
withUsername
(
options
.
getDatabaseUser
())
.
withPassword
(
options
.
getDatabasePassword
())
.
withMaxIdleConnections
(
new
Integer
(
0
));
if
(
options
.
getDatabaseLoginTimeout
()
!=
null
)
{
dataSourceConfiguration
=
dataSourceConfiguration
.
withLoginTimeout
(
options
.
getDatabaseLoginTimeout
());
}
return
dataSourceConfiguration
;
}
/**
* Validate the options supplied match expected values. We will also validate that connectivity is
* working correctly for the target SQL database.
*
* @param options The execution parameters to the pipeline.
* @param dataSourceConfiguration The JDBC datasource configuration.
*/
public
static
void
validateOptions
(
Options
options
,
CdcJdbcIO
.
DataSourceConfiguration
dataSourceConfiguration
)
{
try
{
if
(
options
.
getDatabaseHost
()
!=
null
)
{
dataSourceConfiguration
.
buildDatasource
().
getConnection
().
close
();
}
}
catch
(
SQLException
e
)
{
throw
new
IllegalArgumentException
(
e
);
}
}
/**
* Parses a single map string and resolves it into schema and table mappings, intelligently
* inferring a schema-to-schema mapping if only table-specific rules are provided.
*
* @param mappingString A comma-separated string of mapping rules.
*/
public
static
Map<String
,
Map<String
,
String
>>
parseMappings
(
String
mappingString
)
{
Map<String
,
String
>
schemaMappings
=
new
HashMap
<> ();
Map<String
,
String
>
tableMappings
=
new
HashMap
<> ();
if
(
mappingString
!=
null
&&
!
mappingString
.
isEmpty
())
{
Map<String
,
String
>
allMappings
=
Splitter
.
on
(
"|"
).
withKeyValueSeparator
(
":"
).
split
(
mappingString
);
// Strictly separate rules based on the presence of a dot.
for
(
Map
.
Entry<String
,
String
>
entry
:
allMappings
.
entrySet
())
{
if
(
entry
.
getKey
().
contains
(
"."
))
{
tableMappings
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
else
{
schemaMappings
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
Map<String
,
Map<String
,
String
>>
mappings
=
new
HashMap
<> ();
mappings
.
put
(
"schemas"
,
schemaMappings
);
mappings
.
put
(
"tables"
,
tableMappings
);
return
mappings
;
}
public
static
class
FailsafeDlqJsonFormatter
extends
DoFn<FailsafeElement<String
,
String
> ,
String
>
{
private
static
final
ObjectMapper
MAPPER
=
new
ObjectMapper
();
@ProcessElement
public
void
processElement
(
ProcessContext
context
)
{
try
{
FailsafeElement<String
,
String
>
element
=
context
.
element
();
ObjectNode
jsonWrapper
=
MAPPER
.
createObjectNode
();
// FIX: Parse nested object
JsonNode
messageNode
=
MAPPER
.
readTree
(
element
.
getOriginalPayload
());
jsonWrapper
.
set
(
"message"
,
messageNode
);
jsonWrapper
.
put
(
"error_message"
,
element
.
getErrorMessage
());
jsonWrapper
.
put
(
"stacktrace"
,
element
.
getStacktrace
());
jsonWrapper
.
put
(
"timestamp"
,
Instant
.
now
().
toString
());
context
.
output
(
MAPPER
.
writeValueAsString
(
jsonWrapper
));
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Failed to format failsafe DLQ record"
,
e
);
}
}
}
/**
* The {@link FailsafeDmlInfoDlqJsonFormatter} class formats a FailsafeElement containing DML info
* into a JSON string for the DLQ.
*/
public
static
class
FailsafeDmlInfoDlqJsonFormatter
extends
DoFn<FailsafeElement<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>> ,
String
>
{
private
static
final
ObjectMapper
MAPPER
=
new
ObjectMapper
();
@ProcessElement
public
void
processElement
(
ProcessContext
context
)
{
try
{
FailsafeElement<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>>
element
=
context
.
element
();
ObjectNode
jsonWrapper
=
MAPPER
.
createObjectNode
();
JsonNode
messageNode
=
MAPPER
.
readTree
(
element
.
getOriginalPayload
().
getValue
().
getOriginalPayload
());
jsonWrapper
.
set
(
"message"
,
messageNode
);
jsonWrapper
.
put
(
"error_message"
,
element
.
getErrorMessage
());
jsonWrapper
.
put
(
"stacktrace"
,
element
.
getStacktrace
());
jsonWrapper
.
put
(
"timestamp"
,
Instant
.
now
().
toString
());
context
.
output
(
MAPPER
.
writeValueAsString
(
jsonWrapper
));
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Failed to format failsafe DLQ record"
,
e
);
}
}
}
/**
* The {@link ExecuteDmlFn} class executes DML statements on a SQL database.
*
* <p>This DoFn connects to a database and executes DML statements. On failure, it outputs a
* FailsafeElement containing the original record and error information.
*/
private
static
class
ExecuteDmlFn
extends
DoFn<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ExecuteDmlFn
.
class
);
private
final
CdcJdbcIO
.
DataSourceConfiguration
dataSourceConfiguration
;
private
transient
javax
.
sql
.
DataSource
dataSource
;
private
transient
java
.
sql
.
Connection
connection
;
public
static
final
org
.
apache
.
beam
.
sdk
.
values
.
TupleTag<KV<String
,
DmlInfo
>>
SUCCESS_TAG
=
new
org
.
apache
.
beam
.
sdk
.
values
.
TupleTag<KV<String
,
DmlInfo
>> ()
{};
public
static
final
org
.
apache
.
beam
.
sdk
.
values
.
TupleTag
<
FailsafeElement<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>>>
FAILURE_TAG
=
new
org
.
apache
.
beam
.
sdk
.
values
.
TupleTag
<
FailsafeElement<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>>> ()
{};
public
ExecuteDmlFn
(
CdcJdbcIO
.
DataSourceConfiguration
dataSourceConfiguration
)
{
this
.
dataSourceConfiguration
=
dataSourceConfiguration
;
}
@Setup
public
void
setup
()
throws
SQLException
{
dataSource
=
dataSourceConfiguration
.
buildDatasource
();
connection
=
dataSource
.
getConnection
();
}
@Teardown
public
void
teardown
()
throws
SQLException
{
if
(
connection
!=
null
)
{
connection
.
close
();
}
}
@ProcessElement
public
void
processElement
(
ProcessContext
c
)
{
KV<String
,
DmlInfo
>
dmlInfo
=
c
.
element
();
try
(
java
.
sql
.
Statement
statement
=
connection
.
createStatement
())
{
LOG
.
debug
(
"Executing SQL: {}"
,
dmlInfo
.
getValue
().
getDmlSql
());
statement
.
execute
(
dmlInfo
.
getValue
().
getDmlSql
());
c
.
output
(
SUCCESS_TAG
,
dmlInfo
);
}
catch
(
SQLException
e
)
{
LOG
.
error
(
"Failed to execute DML: "
+
dmlInfo
.
getValue
().
getDmlSql
(),
e
);
c
.
output
(
FAILURE_TAG
,
FailsafeElement
.
of
(
dmlInfo
,
dmlInfo
)
.
setErrorMessage
(
e
.
getMessage
())
.
setStacktrace
(
getStackTraceAsString
(
e
)));
}
}
private
String
getStackTraceAsString
(
Throwable
throwable
)
{
java
.
io
.
StringWriter
stringWriter
=
new
java
.
io
.
StringWriter
();
java
.
io
.
PrintWriter
printWriter
=
new
java
.
io
.
PrintWriter
(
stringWriter
);
throwable
.
printStackTrace
(
printWriter
);
return
stringWriter
.
toString
();
}
}
private
static
DeadLetterQueueManager
buildDlqManager
(
Options
options
)
{
String
tempLocation
=
options
.
as
(
DataflowPipelineOptions
.
class
).
getTempLocation
().
endsWith
(
"/"
)
?
options
.
as
(
DataflowPipelineOptions
.
class
).
getTempLocation
()
:
options
.
as
(
DataflowPipelineOptions
.
class
).
getTempLocation
()
+
"/"
;
String
dlqDirectory
=
options
.
getDeadLetterQueueDirectory
().
isEmpty
()
?
tempLocation
+
"dlq/"
:
options
.
getDeadLetterQueueDirectory
();
LOG
.
info
(
"Dead-letter queue directory: {}"
,
dlqDirectory
);
if
(
"regular"
.
equals
(
options
.
getRunMode
()))
{
return
DeadLetterQueueManager
.
create
(
dlqDirectory
,
options
.
getDlqMaxRetries
());
}
else
{
String
retryDlqUri
=
org
.
apache
.
beam
.
sdk
.
io
.
FileSystems
.
matchNewResource
(
dlqDirectory
,
true
).
toString
();
return
DeadLetterQueueManager
.
create
(
dlqDirectory
,
retryDlqUri
,
options
.
getDlqMaxRetries
());
}
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public
static
PipelineResult
run
(
Options
options
)
{
/*
* Stages:
* 1) Ingest and Normalize Data to FailsafeElement with JSON Strings
* 2) Write JSON Strings to SQL DML Objects
* 3) Filter stale rows using stateful PK transform
* 4) Write DML statements to SQL Database via jdbc
* 5) Write Failures to GCS Dead Letter Queue
*/
Pipeline
pipeline
=
Pipeline
.
create
(
options
);
CdcJdbcIO
.
DataSourceConfiguration
dataSourceConfiguration
=
getDataSourceConfiguration
(
options
);
validateOptions
(
options
,
dataSourceConfiguration
);
Map<String
,
Map<String
,
String
>>
mappings
=
parseMappings
(
options
.
getSchemaMap
());
Map<String
,
String
>
schemaMap
=
mappings
.
get
(
"schemas"
);
Map<String
,
String
>
tableNameMap
=
mappings
.
get
(
"tables"
);
LOG
.
info
(
"Parsed schema map: {}"
,
schemaMap
);
LOG
.
info
(
"Parsed table name map: {}"
,
tableNameMap
);
DeadLetterQueueManager
dlqManager
=
buildDlqManager
(
options
);
/*
* Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
* a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
* b) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
* (dlqJsonRecords)
* c) Flatten DataStream and DLQ Streams (allJsonRecords)
*/
PCollection<FailsafeElement<String
,
String
>>
datastreamJsonRecords
=
pipeline
.
apply
(
new
DataStreamIO
(
options
.
getStreamName
(),
options
.
getInputFilePattern
(),
options
.
getInputFileFormat
(),
options
.
getGcsPubSubSubscription
(),
options
.
getRfcStartDateTime
())
.
withRenameColumnValue
(
"_metadata_row_id"
,
"rowid"
)
.
withHashRowId
()
.
withDatastreamSourceType
(
options
.
getDatastreamSourceType
()));
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
// of building pieces of the DLQ.
PCollection<String>
dlqStrings
=
pipeline
.
apply
(
"DLQ Consumer/reader"
,
dlqManager
.
dlqReconsumer
(
options
.
getDlqRetryMinutes
()));
PCollectionTuple
reconsumedElements
=
dlqManager
.
getReconsumerDataTransform
(
dlqStrings
);
PCollection<FailsafeElement<String
,
String
>>
dlqJsonRecords
=
reconsumedElements
.
get
(
DeadLetterQueueManager
.
RETRYABLE_ERRORS
)
.
setCoder
(
FAILSAFE_ELEMENT_CODER
);
reconsumedElements
.
get
(
DeadLetterQueueManager
.
PERMANENT_ERRORS
)
.
setCoder
(
FAILSAFE_ELEMENT_CODER
)
.
apply
(
"Format Permanent Errors"
,
ParDo
.
of
(
new
FailsafeDlqJsonFormatter
()))
.
setCoder
(
StringUtf8Coder
.
of
())
.
apply
(
"Write Permanent Errors to DLQ"
,
DLQWriteTransform
.
WriteDLQ
.
newBuilder
()
.
withDlqDirectory
(
dlqManager
.
getSevereDlqDirectoryWithDateTime
())
.
withTmpDirectory
(
dlqManager
.
getSevereDlqDirectory
()
+
"tmp/"
)
.
setIncludePaneInfo
(
true
)
.
build
());
PCollection<FailsafeElement<String
,
String
>>
allJsonRecords
;
if
(
"regular"
.
equals
(
options
.
getRunMode
()))
{
allJsonRecords
=
PCollectionList
.
of
(
datastreamJsonRecords
)
.
and
(
dlqJsonRecords
)
.
apply
(
"Merge Datastream & DLQ"
,
Flatten
.
pCollections
());
}
else
{
allJsonRecords
=
dlqJsonRecords
;
}
/*
* Stage 2: Write JSON Strings to SQL Insert Strings
* a) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
*/
PCollectionTuple
dmlResults
=
allJsonRecords
.
apply
(
"Format to DML"
,
CreateDml
.
of
(
dataSourceConfiguration
)
.
withDefaultCasing
(
options
.
getDefaultCasing
())
.
withSchemaMap
(
schemaMap
)
.
withTableNameMap
(
tableNameMap
)
.
withColumnCasing
(
options
.
getColumnCasing
())
.
withOrderByIncludesIsDeleted
(
options
.
getOrderByIncludesIsDeleted
())
.
withNumThreads
(
options
.
getNumThreads
())
.
withSchemaCacheRefreshMinutes
(
options
.
getSchemaCacheRefreshMinutes
()));
PCollection<KV<String
,
DmlInfo
>>
dmlStatements
=
dmlResults
.
get
(
CreateDml
.
DML_MAIN_TAG
)
/*
* Stage 3) Filter stale rows using stateful PK transform
*/
.
apply
(
"DML Stateful Processing"
,
ProcessDml
.
statefulOrderByPK
());
// Errors from DML conversion are severe and should not be retried.
dmlResults
.
get
(
DatastreamToDML
.
ERROR_TAG
)
.
setCoder
(
FAILSAFE_ELEMENT_CODER
)
.
apply
(
"Format Conversion Errors"
,
ParDo
.
of
(
new
FailsafeDlqJsonFormatter
()))
.
setCoder
(
StringUtf8Coder
.
of
())
.
apply
(
"Write Conversion Errors to DLQ"
,
DLQWriteTransform
.
WriteDLQ
.
newBuilder
()
.
withDlqDirectory
(
dlqManager
.
getSevereDlqDirectoryWithDateTime
())
.
withTmpDirectory
(
dlqManager
.
getSevereDlqDirectory
()
+
"tmp/"
)
.
setIncludePaneInfo
(
true
)
.
build
());
/*
* Stage 4: Write Inserts to CloudSQL
*/
PCollectionTuple
sqlWriteResults
=
dmlStatements
.
apply
(
"Write to SQL"
,
ParDo
.
of
(
new
ExecuteDmlFn
(
dataSourceConfiguration
))
.
withOutputTags
(
ExecuteDmlFn
.
SUCCESS_TAG
,
TupleTagList
.
of
(
ExecuteDmlFn
.
FAILURE_TAG
)));
// Errors from the SQL sink are retryable
PCollection<FailsafeElement<KV<String
,
DmlInfo
> ,
KV<String
,
DmlInfo
>>>
sqlWriteFailures
=
sqlWriteResults
.
get
(
ExecuteDmlFn
.
FAILURE_TAG
)
.
setCoder
(
FailsafeElementCoder
.
of
(
KvCoder
.
of
(
StringUtf8Coder
.
of
(),
SerializableCoder
.
of
(
DmlInfo
.
class
)),
KvCoder
.
of
(
StringUtf8Coder
.
of
(),
SerializableCoder
.
of
(
DmlInfo
.
class
))));
sqlWriteFailures
.
apply
(
"Format Retryable Errors"
,
ParDo
.
of
(
new
FailsafeDmlInfoDlqJsonFormatter
()))
.
setCoder
(
StringUtf8Coder
.
of
())
.
apply
(
"Write Retryable Errors to DLQ"
,
DLQWriteTransform
.
WriteDLQ
.
newBuilder
()
.
withDlqDirectory
(
dlqManager
.
getRetryDlqDirectoryWithDateTime
())
.
withTmpDirectory
(
dlqManager
.
getRetryDlqDirectory
()
+
"tmp/"
)
.
setIncludePaneInfo
(
true
)
.
build
());
// Execute the pipeline and return the result.
return
pipeline
.
run
();
}
}