/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.v2.templates
;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument
;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull
;
import
com.github.vincentrussell.json.datagenerator.JsonDataGenerator
;
import
com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException
;
import
com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption
;
import
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger
;
import
com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToJdbc
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToKafka
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub
;
import
com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToSpanner
;
import
com.google.cloud.teleport.v2.utils.DurationUtils
;
import
com.google.cloud.teleport.v2.utils.GCSUtils
;
import
com.google.cloud.teleport.v2.utils.MetadataValidator
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
javax.annotation.Nonnull
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.io.FileSystems
;
import
org.apache.beam.sdk.io.GenerateSequence
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptions
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.options.Validation.Required
;
import
org.apache.beam.sdk.transforms.DoFn
;
import
org.apache.beam.sdk.transforms.PTransform
;
import
org.apache.beam.sdk.transforms.ParDo
;
import
org.apache.beam.sdk.transforms.windowing.FixedWindows
;
import
org.apache.beam.sdk.transforms.windowing.Window
;
import
org.apache.beam.sdk.values.PCollection
;
import
org.apache.beam.sdk.values.PDone
;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting
;
import
org.joda.time.Duration
;
import
org.joda.time.Instant
;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
* specified rate to either Pub/Sub, BigQuery, GCS, JDBC, or Spanner. The messages are generated
* according to a schema template which instructs the pipeline how to populate the messages with
* fake data compliant to constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/streaming-data-generator/README_Streaming_Data_Generator.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template
(
name
=
"Streaming_Data_Generator"
,
category
=
TemplateCategory
.
UTILITIES
,
displayName
=
"Streaming Data Generator"
,
description
=
"A pipeline to publish messages at specified QPS.This template can be used to benchmark"
+
" performance of streaming pipelines."
,
optionsClass
=
StreamingDataGeneratorOptions
.
class
,
flexContainerName
=
"streaming-data-generator"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/streaming-data-generator"
,
contactInformation
=
"https://cloud.google.com/support"
,
streaming
=
true
,
supportsAtLeastOnce
=
true
)
public
class
StreamingDataGenerator
{
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
* the executor at the command-line.
*/
public
interface
StreamingDataGeneratorOptions
extends
PipelineOptions
{
@TemplateParameter.Long
(
order
=
1
,
description
=
"Required output rate"
,
helpText
=
"Indicates rate of messages per second to be published to Pub/Sub"
)
@Required
Long
getQps
();
void
setQps
(
Long
value
);
@TemplateParameter.Enum
(
order
=
2
,
enumOptions
=
{
@TemplateEnumOption
(
"GAME_EVENT"
)},
optional
=
true
,
description
=
"Schema template to generate fake data"
,
helpText
=
"Pre-existing schema template to use. The value must be one of: [GAME_EVENT]"
)
SchemaTemplate
getSchemaTemplate
();
void
setSchemaTemplate
(
SchemaTemplate
value
);
@TemplateParameter.GcsReadFile
(
order
=
3
,
optional
=
true
,
description
=
"Location of Schema file to generate fake data"
,
helpText
=
"Cloud Storage path of schema location."
,
example
=
"gs://<bucket-name>/prefix"
)
String
getSchemaLocation
();
void
setSchemaLocation
(
String
value
);
@TemplateParameter.PubsubTopic
(
order
=
4
,
optional
=
true
,
description
=
"Output Pub/Sub topic"
,
helpText
=
"The name of the topic to which the pipeline should publish data."
,
example
=
"projects/<project-id>/topics/<topic-name>"
)
String
getTopic
();
void
setTopic
(
String
value
);
@TemplateParameter.Long
(
order
=
5
,
optional
=
true
,
description
=
"Maximum number of output Messages"
,
helpText
=
"Indicates maximum number of output messages to be generated. 0 means unlimited."
)
@Default.Long
(
0
L
)
Long
getMessagesLimit
();
void
setMessagesLimit
(
Long
value
);
@TemplateParameter.Enum
(
order
=
6
,
enumOptions
=
{
@TemplateEnumOption
(
"AVRO"
),
@TemplateEnumOption
(
"JSON"
),
@TemplateEnumOption
(
"PARQUET"
)
},
optional
=
true
,
description
=
"Output Encoding Type"
,
helpText
=
"The message Output type. Default is JSON."
)
@Default.Enum
(
"JSON"
)
OutputType
getOutputType
();
void
setOutputType
(
OutputType
value
);
@TemplateParameter.GcsReadFile
(
order
=
7
,
optional
=
true
,
parentName
=
"outputType"
,
parentTriggerValues
=
{
"AVRO"
,
"PARQUET"
},
description
=
"Location of Avro Schema file"
,
helpText
=
"Cloud Storage path of Avro schema location. Mandatory when output type is AVRO or"
+
" PARQUET."
,
example
=
"gs://your-bucket/your-path/schema.avsc"
)
String
getAvroSchemaLocation
();
void
setAvroSchemaLocation
(
String
value
);
@TemplateParameter.Enum
(
order
=
8
,
enumOptions
=
{
@TemplateEnumOption
(
"BIGQUERY"
),
@TemplateEnumOption
(
"GCS"
),
@TemplateEnumOption
(
"PUBSUB"
),
@TemplateEnumOption
(
"JDBC"
),
@TemplateEnumOption
(
"SPANNER"
),
@TemplateEnumOption
(
"KAFKA"
)
},
optional
=
true
,
description
=
"Output Sink Type"
,
helpText
=
"The message Sink type. Default is PUBSUB"
)
@Default.Enum
(
"PUBSUB"
)
SinkType
getSinkType
();
void
setSinkType
(
SinkType
value
);
@TemplateParameter.BigQueryTable
(
order
=
9
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"BIGQUERY"
},
description
=
"Output BigQuery table"
,
helpText
=
"Output BigQuery table. Mandatory when sinkType is BIGQUERY"
,
example
=
"<project>:<dataset>.<table_name>"
)
String
getOutputTableSpec
();
void
setOutputTableSpec
(
String
value
);
@TemplateParameter.Enum
(
order
=
10
,
enumOptions
=
{
@TemplateEnumOption
(
"WRITE_APPEND"
),
@TemplateEnumOption
(
"WRITE_EMPTY"
),
@TemplateEnumOption
(
"WRITE_TRUNCATE"
)
},
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"BIGQUERY"
},
description
=
"Write Disposition to use for BigQuery"
,
helpText
=
"BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE."
)
@Default.String
(
"WRITE_APPEND"
)
String
getWriteDisposition
();
void
setWriteDisposition
(
String
writeDisposition
);
@TemplateParameter.BigQueryTable
(
order
=
11
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"BIGQUERY"
},
description
=
"The dead-letter table name to output failed messages to BigQuery"
,
helpText
=
"Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
+
" schema, malformed json) are written to this table. If it doesn't exist, it will"
+
" be created during pipeline execution."
,
example
=
"your-project-id:your-dataset.your-table-name"
)
String
getOutputDeadletterTable
();
void
setOutputDeadletterTable
(
String
outputDeadletterTable
);
@TemplateParameter.Duration
(
order
=
12
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"GCS"
},
description
=
"Window duration"
,
helpText
=
"The window duration/size in which data will be written to Cloud Storage. Allowed"
+
" formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh"
+
" (for hours, example: 2h)."
,
example
=
"1m"
)
@Default.String
(
"1m"
)
String
getWindowDuration
();
void
setWindowDuration
(
String
windowDuration
);
@TemplateParameter.GcsWriteFolder
(
order
=
13
,
optional
=
true
,
description
=
"Output file directory in Cloud Storage"
,
helpText
=
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+
" formatting is used to parse directory path for date & time formatters."
,
example
=
"gs://your-bucket/your-path/"
)
String
getOutputDirectory
();
void
setOutputDirectory
(
String
outputDirectory
);
@TemplateParameter.Text
(
order
=
14
,
optional
=
true
,
description
=
"Output filename prefix of the files to write"
,
helpText
=
"The prefix to place on each windowed file."
,
example
=
"output-"
)
@Default.String
(
"output-"
)
String
getOutputFilenamePrefix
();
void
setOutputFilenamePrefix
(
String
outputFilenamePrefix
);
@TemplateParameter.Integer
(
order
=
15
,
optional
=
true
,
description
=
"Maximum output shards"
,
helpText
=
"The maximum number of output shards produced when writing. A higher number of shards"
+
" means higher throughput for writing to Cloud Storage, but potentially higher"
+
" data aggregation cost across shards when processing output Cloud Storage files."
+
" Default value is decided by Dataflow."
)
@Default.Integer
(
0
)
Integer
getNumShards
();
void
setNumShards
(
Integer
numShards
);
@TemplateParameter.Text
(
order
=
16
,
optional
=
true
,
regexes
=
{
"^.+$"
},
description
=
"JDBC driver class name."
,
helpText
=
"JDBC driver class name to use."
,
example
=
"com.mysql.jdbc.Driver"
)
String
getDriverClassName
();
void
setDriverClassName
(
String
driverClassName
);
@TemplateParameter.Text
(
order
=
17
,
optional
=
true
,
regexes
=
{
"(^jdbc:[a-zA-Z0-9/:@.?_+!*=&-;]+$)|(^([A-Za-z0-9+/]{4}){1,}([A-Za-z0-9+/]{0,3})={0,3})"
},
description
=
"JDBC connection URL string."
,
helpText
=
"Url connection string to connect to the JDBC source."
,
example
=
"jdbc:mysql://some-host:3306/sampledb"
)
String
getConnectionUrl
();
void
setConnectionUrl
(
String
connectionUrl
);
@TemplateParameter.Text
(
order
=
18
,
optional
=
true
,
regexes
=
{
"^.+$"
},
description
=
"JDBC connection username."
,
helpText
=
"User name to be used for the JDBC connection."
)
String
getUsername
();
void
setUsername
(
String
username
);
@TemplateParameter.Password
(
order
=
19
,
optional
=
true
,
description
=
"JDBC connection password."
,
helpText
=
"Password to be used for the JDBC connection."
)
String
getPassword
();
void
setPassword
(
String
password
);
@TemplateParameter.Text
(
order
=
20
,
optional
=
true
,
regexes
=
{
"^[a-zA-Z0-9_;!*&=@#-:\\/]+$"
},
description
=
"JDBC connection property string."
,
helpText
=
"Properties string to use for the JDBC connection. Format of the string must be"
+
" [propertyName=property;]*."
,
example
=
"unicode=true;characterEncoding=UTF-8"
)
String
getConnectionProperties
();
void
setConnectionProperties
(
String
connectionProperties
);
@TemplateParameter.Text
(
order
=
21
,
optional
=
true
,
regexes
=
{
"^.+$"
},
description
=
"Statement which will be executed against the database."
,
helpText
=
"SQL statement which will be executed to write to the database. The statement must"
+
" specify the column names of the table in any order. Only the values of the"
+
" specified column names will be read from the json and added to the statement."
,
example
=
"INSERT INTO tableName (column1, column2) VALUES (?,?)"
)
String
getStatement
();
void
setStatement
(
String
statement
);
@TemplateParameter.ProjectId
(
order
=
22
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
description
=
"GCP Project Id of where the Spanner table lives."
,
helpText
=
"GCP Project Id of where the Spanner table lives."
)
String
getProjectId
();
void
setProjectId
(
String
projectId
);
@TemplateParameter.Text
(
order
=
23
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
regexes
=
{
"^.+$"
},
description
=
"Cloud Spanner instance name."
,
helpText
=
"Cloud Spanner instance name."
)
String
getSpannerInstanceName
();
void
setSpannerInstanceName
(
String
spannerInstanceName
);
@TemplateParameter.Text
(
order
=
24
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
regexes
=
{
"^.+$"
},
description
=
"Cloud Spanner database name."
,
helpText
=
"Cloud Spanner database name."
)
String
getSpannerDatabaseName
();
void
setSpannerDatabaseName
(
String
spannerDBName
);
@TemplateParameter.Text
(
order
=
25
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
regexes
=
{
"^.+$"
},
description
=
"Cloud Spanner table name."
,
helpText
=
"Cloud Spanner table name."
)
String
getSpannerTableName
();
void
setSpannerTableName
(
String
spannerTableName
);
@TemplateParameter.Long
(
order
=
26
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
description
=
"Max mutatated cells per batch."
,
helpText
=
"Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000"
)
Long
getMaxNumMutations
();
void
setMaxNumMutations
(
Long
value
);
@TemplateParameter.Long
(
order
=
27
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
description
=
"Max rows per batch."
,
helpText
=
"Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 1000"
)
Long
getMaxNumRows
();
void
setMaxNumRows
(
Long
value
);
@TemplateParameter.Long
(
order
=
28
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
description
=
"Max batch size in bytes."
,
helpText
=
"Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB"
)
Long
getBatchSizeBytes
();
void
setBatchSizeBytes
(
Long
value
);
@TemplateParameter.Long
(
order
=
29
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"SPANNER"
},
description
=
"Commit deadline in seconds for write requests."
,
helpText
=
"Specifies the deadline in seconds for the Commit API call."
)
Long
getCommitDeadlineSeconds
();
void
setCommitDeadlineSeconds
(
Long
value
);
@TemplateParameter.Text
(
order
=
30
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"KAFKA"
},
regexes
=
{
"[,:a-zA-Z0-9._-]+"
},
description
=
"Output Kafka Bootstrap Server"
,
helpText
=
"Kafka Bootstrap Server "
,
example
=
"localhost:9092"
)
String
getBootstrapServer
();
void
setBootstrapServer
(
String
bootstrapServer
);
@TemplateParameter.Text
(
order
=
31
,
optional
=
true
,
parentName
=
"sinkType"
,
parentTriggerValues
=
{
"KAFKA"
},
regexes
=
{
"[a-zA-Z0-9._-]+"
},
description
=
"Kafka topic to write to"
,
helpText
=
"Kafka topic to write to."
,
example
=
"topic"
)
String
getKafkaTopic
();
void
setKafkaTopic
(
String
outputTopic
);
}
/** Allowed list of existing schema templates. */
public
enum
SchemaTemplate
{
GAME_EVENT
(
"{\n"
+
" \"eventId\": \"{{uuid()}}\",\n"
+
" \"eventTimestamp\": {{timestamp()}},\n"
+
" \"ipv4\": \"{{ipv4()}}\",\n"
+
" \"ipv6\": \"{{ipv6()}}\",\n"
+
" \"country\": \"{{country()}}\",\n"
+
" \"username\": \"{{username()}}\",\n"
+
" \"quest\": \"{{random(\"A Break In the Ice\", \"Ghosts of Perdition\", \"Survive"
+ " the Low Road\")}}\",\n"
+
" \"score\": {{integer(100, 10000)}},\n"
+
" \"completed\": {{bool()}}\n"
+
"}"
),
LOG_ENTRY
(
"{\n"
+
" \"logName\": \"{{alpha(10,20)}}\",\n"
+
" \"resource\": {\n"
+
" \"type\": \"{{alpha(5,10)}}\"\n"
+
" },\n"
+
" \"timestamp\": {{timestamp()}},\n"
+
" \"receiveTimestamp\": {{timestamp()}},\n"
+
" \"severity\": \"{{random(\"DEFAULT\", \"DEBUG\", \"INFO\", \"NOTICE\","
+ " \"WARNING\", \"ERROR\", \"CRITICAL\", \"ERROR\")}}\",\n"
+
" \"insertId\": \"{{uuid()}}\",\n"
+
" \"trace\": \"{{uuid()}}\",\n"
+
" \"spanId\": \"{{uuid()}}\",\n"
+
" \"jsonPayload\": {\n"
+
" \"bytes_sent\": {{integer(1000,20000)}},\n"
+
" \"connection\": {\n"
+
" \"dest_ip\": \"{{ipv4()}}\",\n"
+
" \"dest_port\": {{integer(0,65000)}},\n"
+
" \"protocol\": {{integer(0,6)}},\n"
+
" \"src_ip\": \"{{ipv4()}}\",\n"
+
" \"src_port\": {{integer(0,65000)}}\n"
+
" },\n"
+
" \"dest_instance\": {\n"
+
" \"project_id\": \"{{concat(\"PROJECT\", integer(0,3))}}\",\n"
+
" \"region\": \"{{country()}}\",\n"
+
" \"vm_name\": \"{{username()}}\",\n"
+
" \"zone\": \"{{state()}}\"\n"
+
" },\n"
+
" \"end_time\": {{timestamp()}},\n"
+
" \"packets_sent\": {{integer(100,400)}},\n"
+
" \"reporter\": \"{{random(\"SRC\", \"DEST\")}}\",\n"
+
" \"rtt_msec\": {{integer(0,20)}},\n"
+
" \"start_time\": {{timestamp()}}\n"
+
" }\n"
+
"}"
);
private
final
String
schema
;
SchemaTemplate
(
String
schema
)
{
this
.
schema
=
schema
;
}
public
String
getSchema
()
{
return
schema
;
}
}
/** Allowed list of message encoding types. */
public
enum
OutputType
{
JSON
(
".json"
),
AVRO
(
".avro"
),
PARQUET
(
".parquet"
);
private
final
String
fileExtension
;
/** Sets file extension associated with output type. */
OutputType
(
String
fileExtension
)
{
this
.
fileExtension
=
fileExtension
;
}
/** Returns file extension associated with output type. */
public
String
getFileExtension
()
{
return
fileExtension
;
}
}
/** Allowed list of sink types. */
public
enum
SinkType
{
PUBSUB
,
BIGQUERY
,
GCS
,
JDBC
,
SPANNER
,
KAFKA
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args command-line args passed by the executor.
*/
public
static
void
main
(
String
[]
args
)
{
UncaughtExceptionLogger
.
register
();
StreamingDataGeneratorOptions
options
=
PipelineOptionsFactory
.
fromArgs
(
args
)
.
withValidation
()
.
as
(
StreamingDataGeneratorOptions
.
class
);
run
(
options
);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options the execution options.
* @return the pipeline result.
*/
public
static
PipelineResult
run
(
@Nonnull
StreamingDataGeneratorOptions
options
)
{
checkNotNull
(
options
,
"options argument to run method cannot be null."
);
MetadataValidator
.
validate
(
options
);
// FileSystems does not set the default configuration in workers till Pipeline.run
// Explicitly registering standard file systems.
FileSystems
.
setDefaultPipelineOptions
(
options
);
String
schema
=
getSchema
(
options
.
getSchemaTemplate
(),
options
.
getSchemaLocation
());
// Create the pipeline
Pipeline
pipeline
=
Pipeline
.
create
(
options
);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to appropriate Sink
*/
PCollection<byte
[]
>
generatedMessages
=
pipeline
.
apply
(
"Trigger"
,
createTrigger
(
options
))
.
apply
(
"Generate Fake Messages"
,
ParDo
.
of
(
new
MessageGeneratorFn
(
schema
)));
if
(
options
.
getSinkType
().
equals
(
SinkType
.
GCS
))
{
generatedMessages
=
generatedMessages
.
apply
(
options
.
getWindowDuration
()
+
" Window"
,
Window
.
into
(
FixedWindows
.
of
(
DurationUtils
.
parseDuration
(
options
.
getWindowDuration
()))));
}
generatedMessages
.
apply
(
"Write To "
+
options
.
getSinkType
().
name
(),
createSink
(
options
,
schema
));
return
pipeline
.
run
();
}
/**
* Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
*
* @param options the pipeline options.
*/
private
static
GenerateSequence
createTrigger
(
@Nonnull
StreamingDataGeneratorOptions
options
)
{
checkNotNull
(
options
,
"options argument to createTrigger method cannot be null."
);
GenerateSequence
generateSequence
=
GenerateSequence
.
from
(
0
L
)
.
withRate
(
options
.
getQps
(),
/* periodLength= */
Duration
.
standardSeconds
(
1L
));
return
options
.
getMessagesLimit
()
>
0
?
generateSequence
.
to
(
options
.
getMessagesLimit
())
:
generateSequence
;
}
/**
* The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
@VisibleForTesting
static
class
MessageGeneratorFn
extends
DoFn<Long
,
byte
[]
>
{
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private
transient
JsonDataGenerator
dataGenerator
;
private
final
String
schema
;
MessageGeneratorFn
(
String
schema
)
{
this
.
schema
=
schema
;
}
@Setup
public
void
setup
()
{
dataGenerator
=
new
JsonDataGeneratorImpl
();
}
@ProcessElement
public
void
processElement
(
@Element
Long
element
,
@Timestamp
Instant
timestamp
,
OutputReceiver<byte
[]
>
receiver
,
ProcessContext
context
)
throws
IOException
,
JsonDataGeneratorException
{
byte
[]
payload
;
// Generate the fake JSON according to the schema.
try
(
ByteArrayOutputStream
byteArrayOutputStream
=
new
ByteArrayOutputStream
())
{
dataGenerator
.
generateTestDataJson
(
schema
,
byteArrayOutputStream
);
payload
=
byteArrayOutputStream
.
toByteArray
();
}
receiver
.
output
(
payload
);
}
}
/**
* Creates appropriate sink based on sinkType pipeline option.
*
* @param options the pipeline options.
*/
@VisibleForTesting
static
PTransform<PCollection<byte
[]
> ,
PDone
>
createSink
(
@Nonnull
StreamingDataGeneratorOptions
options
,
@Nonnull
String
schema
)
{
checkNotNull
(
options
,
"options argument to createSink method cannot be null."
);
checkNotNull
(
schema
,
"schema argument to createSink method cannot be null."
);
switch
(
options
.
getSinkType
())
{
case
PUBSUB
:
checkArgument
(
options
.
getTopic
()
!=
null
,
String
.
format
(
"Missing required value --topic for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToPubSub
.
Writer
.
builder
(
options
,
schema
).
build
();
case
BIGQUERY
:
checkArgument
(
options
.
getOutputTableSpec
()
!=
null
,
String
.
format
(
"Missing required value --outputTableSpec in format"
+
" <project>:<dataset>.<table_name> for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToBigQuery
.
builder
(
options
).
build
();
case
GCS
:
checkArgument
(
options
.
getOutputDirectory
()
!=
null
,
String
.
format
(
"Missing required value --outputDirectory in format gs:// for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToGcs
.
builder
(
options
).
build
();
case
JDBC
:
checkArgument
(
options
.
getDriverClassName
()
!=
null
,
String
.
format
(
"Missing required value --driverClassName for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getConnectionUrl
()
!=
null
,
String
.
format
(
"Missing required value --connectionUrl for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getStatement
()
!=
null
,
String
.
format
(
"Missing required value --statement for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToJdbc
.
builder
(
options
).
build
();
case
SPANNER
:
checkArgument
(
options
.
getProjectId
()
!=
null
,
String
.
format
(
"Missing required value --projectId for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getSpannerInstanceName
()
!=
null
,
String
.
format
(
"Missing required value --spannerInstanceName for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getSpannerDatabaseName
()
!=
null
,
String
.
format
(
"Missing required value --spannerDatabaseName for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getSpannerTableName
()
!=
null
,
String
.
format
(
"Missing required value --spannerTableName for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToSpanner
.
builder
(
options
).
build
();
case
KAFKA
:
checkArgument
(
options
.
getBootstrapServer
()
!=
null
,
String
.
format
(
"Missing required value --bootstrapServer for %s sink type"
,
options
.
getSinkType
().
name
()));
checkArgument
(
options
.
getKafkaTopic
()
!=
null
,
String
.
format
(
"Missing required value --kafkaTopic for %s sink type"
,
options
.
getSinkType
().
name
()));
return
StreamingDataGeneratorWriteToKafka
.
Writer
.
builder
(
options
).
build
();
default
:
throw
new
IllegalArgumentException
(
"Unsupported Sink."
);
}
}
private
static
String
getSchema
(
SchemaTemplate
schemaTemplate
,
String
schemaLocation
)
{
checkArgument
(
schemaTemplate
!=
null
||
schemaLocation
!=
null
,
"Either schemaTemplate or schemaLocation argument of MessageGeneratorFn class must be"
+
" provided."
);
if
(
schemaLocation
!=
null
)
{
return
GCSUtils
.
getGcsFileAsString
(
schemaLocation
);
}
else
{
return
schemaTemplate
.
getSchema
();
}
}
}