/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.v2.templates
;
import static
com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError
;
import
com.google.api.services.bigquery.model.TableRow
;
import
com.google.cloud.teleport.metadata.MultiTemplate
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.v2.coders.FailsafeElementCoder
;
import
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger
;
import
com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions
;
import
com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options
;
import
com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow
;
import
com.google.cloud.teleport.v2.transforms.ErrorConverters
;
import
com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf
;
import
com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer
;
import
com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions
;
import
com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn
;
import
com.google.cloud.teleport.v2.utils.BigQueryIOUtils
;
import
com.google.cloud.teleport.v2.utils.ResourceUtils
;
import
com.google.cloud.teleport.v2.values.FailsafeElement
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.ImmutableList
;
import
java.nio.charset.StandardCharsets
;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.coders.CoderRegistry
;
import
org.apache.beam.sdk.coders.StringUtf8Coder
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError
;
import
org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy
;
import
org.apache.beam.sdk.io.gcp.bigquery.WriteResult
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptions
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.transforms.DoFn
;
import
org.apache.beam.sdk.transforms.Flatten
;
import
org.apache.beam.sdk.transforms.MapElements
;
import
org.apache.beam.sdk.transforms.PTransform
;
import
org.apache.beam.sdk.transforms.ParDo
;
import
org.apache.beam.sdk.values.PCollection
;
import
org.apache.beam.sdk.values.PCollectionList
;
import
org.apache.beam.sdk.values.PCollectionTuple
;
import
org.apache.beam.sdk.values.Row
;
import
org.apache.beam.sdk.values.TupleTag
;
import
org.apache.beam.sdk.values.TupleTagList
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@MultiTemplate
({
@Template
(
name
=
"PubSub_to_BigQuery_Flex"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Pub/Sub to BigQuery"
,
description
=
"The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
+
"You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
+
"The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements."
,
optionsClass
=
Options
.
class
,
skipOptions
=
{
"pythonExternalTextTransformGcsPath"
,
"pythonExternalTextTransformFunctionName"
,
},
flexContainerName
=
"pubsub-to-bigquery"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery"
,
contactInformation
=
"https://cloud.google.com/support"
,
requirements
=
{
"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type."
,
"The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
},
streaming
=
true
,
supportsAtLeastOnce
=
true
,
supportsExactlyOnce
=
true
),
@Template
(
name
=
"PubSub_to_BigQuery_Xlang"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Pub/Sub to BigQuery with Python UDFs"
,
type
=
Template
.
TemplateType
.
XLANG
,
description
=
"The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
+
"You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
+
"The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements."
,
optionsClass
=
Options
.
class
,
skipOptions
=
{
"javascriptTextTransformGcsPath"
,
"javascriptTextTransformFunctionName"
,
"javascriptTextTransformReloadIntervalMinutes"
},
flexContainerName
=
"pubsub-to-bigquery-xlang"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery"
,
contactInformation
=
"https://cloud.google.com/support"
,
requirements
=
{
"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type."
,
"The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
},
streaming
=
true
,
supportsAtLeastOnce
=
true
,
supportsExactlyOnce
=
true
)
})
public
class
PubSubToBigQuery
{
/** The log to output status messages to. */
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
PubSubToBigQuery
.
class
);
/** The tag for the main output for the UDF. */
public
static
final
TupleTag<FailsafeElement<PubsubMessage
,
String
>>
UDF_OUT
=
new
TupleTag<FailsafeElement<PubsubMessage
,
String
>> ()
{};
/** The tag for the main output of the json transformation. */
public
static
final
TupleTag<TableRow>
TRANSFORM_OUT
=
new
TupleTag<TableRow>
()
{};
/** The tag for the dead-letter output of the udf. */
public
static
final
TupleTag<FailsafeElement<PubsubMessage
,
String
>>
UDF_DEADLETTER_OUT
=
new
TupleTag<FailsafeElement<PubsubMessage
,
String
>> ()
{};
/** The tag for the dead-letter output of the json to table row transform. */
public
static
final
TupleTag<FailsafeElement<PubsubMessage
,
String
>>
TRANSFORM_DEADLETTER_OUT
=
new
TupleTag<FailsafeElement<PubsubMessage
,
String
>> ()
{};
/** The default suffix for error tables if dead letter table is not specified. */
public
static
final
String
DEFAULT_DEADLETTER_TABLE_SUFFIX
=
"_error_records"
;
/** Pubsub message/string coder for pipeline. */
public
static
final
FailsafeElementCoder<PubsubMessage
,
String
>
CODER
=
FailsafeElementCoder
.
of
(
PubsubMessageWithAttributesCoder
.
of
(),
StringUtf8Coder
.
of
());
/** String/String Coder for FailsafeElement. */
public
static
final
FailsafeElementCoder<String
,
String
>
FAILSAFE_ELEMENT_CODER
=
FailsafeElementCoder
.
of
(
StringUtf8Coder
.
of
(),
StringUtf8Coder
.
of
());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public
interface
Options
extends
PipelineOptions
,
BigQueryStorageApiStreamingOptions
,
PythonExternalTextTransformerOptions
,
DataflowPipelineWorkerPoolOptions
{
@TemplateParameter.BigQueryTable
(
order
=
1
,
groupName
=
"Target"
,
description
=
"BigQuery output table"
,
helpText
=
"The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`."
)
String
getOutputTableSpec
();
void
setOutputTableSpec
(
String
value
);
@TemplateParameter.PubsubTopic
(
order
=
2
,
groupName
=
"Source"
,
optional
=
true
,
description
=
"Input Pub/Sub topic"
,
helpText
=
"The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`."
)
String
getInputTopic
();
void
setInputTopic
(
String
value
);
@TemplateParameter.PubsubSubscription
(
order
=
3
,
groupName
=
"Source"
,
optional
=
true
,
description
=
"Pub/Sub input subscription"
,
helpText
=
"The Pub/Sub subscription to read from, "
+
"formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`."
)
String
getInputSubscription
();
void
setInputSubscription
(
String
value
);
@TemplateParameter.BigQueryTable
(
order
=
4
,
optional
=
true
,
description
=
"Table for messages failed to reach the output table (i.e., Deadletter table)"
,
helpText
=
"The BigQuery table to use for messages that failed to reach the output table, "
+
"formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "
+
"doesn't exist, it is created when the pipeline runs. "
+
"If this parameter is not specified, "
+
"the value `OUTPUT_TABLE_SPEC_error_records` is used instead."
)
String
getOutputDeadletterTable
();
void
setOutputDeadletterTable
(
String
value
);
@TemplateParameter.Boolean
(
order
=
5
,
optional
=
true
,
parentName
=
"useStorageWriteApi"
,
parentTriggerValues
=
{
"true"
},
description
=
"Use at at-least-once semantics in BigQuery Storage Write API"
,
helpText
=
"When using the Storage Write API, specifies the write semantics. "
+
"To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"
+
", set this parameter to true. "
+
"To use exactly-once semantics, set the parameter to `false`. "
+
"This parameter applies only when `useStorageWriteApi` is `true`. "
+
"The default value is `false`."
)
@Default.Boolean
(
false
)
@Override
Boolean
getUseStorageWriteApiAtLeastOnce
();
void
setUseStorageWriteApiAtLeastOnce
(
Boolean
value
);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public
static
void
main
(
String
[]
args
)
{
UncaughtExceptionLogger
.
register
();
Options
options
=
PipelineOptionsFactory
.
fromArgs
(
args
).
withValidation
().
as
(
Options
.
class
);
BigQueryIOUtils
.
validateBQStorageApiOptionsStreaming
(
options
);
// options.setWorkerDiskType(
//
// "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");
run
(
options
);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public
static
PipelineResult
run
(
Options
options
)
{
boolean
useInputSubscription
=
!
Strings
.
isNullOrEmpty
(
options
.
getInputSubscription
());
boolean
useInputTopic
=
!
Strings
.
isNullOrEmpty
(
options
.
getInputTopic
());
if
(
useInputSubscription
==
useInputTopic
)
{
throw
new
IllegalArgumentException
(
"Either input topic or input subscription must be provided, but not both."
);
}
Pipeline
pipeline
=
Pipeline
.
create
(
options
);
CoderRegistry
coderRegistry
=
pipeline
.
getCoderRegistry
();
coderRegistry
.
registerCoderForType
(
CODER
.
getEncodedTypeDescriptor
(),
CODER
);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage>
messages
=
null
;
if
(
useInputSubscription
)
{
messages
=
pipeline
.
apply
(
"ReadPubSubSubscription"
,
PubsubIO
.
readMessagesWithAttributes
()
.
fromSubscription
(
options
.
getInputSubscription
()));
}
else
{
messages
=
pipeline
.
apply
(
"ReadPubSubTopic"
,
PubsubIO
.
readMessagesWithAttributes
().
fromTopic
(
options
.
getInputTopic
()));
}
PCollectionTuple
convertedTableRows
=
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.
apply
(
"ConvertMessageToTableRow"
,
new
PubsubMessageToTableRow
(
options
));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult
writeResult
=
convertedTableRows
.
get
(
TRANSFORM_OUT
)
.
apply
(
"WriteSuccessfulRecords"
,
BigQueryIO
.
writeTableRows
()
.
withoutValidation
()
.
withCreateDisposition
(
CreateDisposition
.
CREATE_NEVER
)
.
withWriteDisposition
(
WriteDisposition
.
WRITE_APPEND
)
.
withExtendedErrorInfo
()
.
withFailedInsertRetryPolicy
(
InsertRetryPolicy
.
retryTransientErrors
())
.
to
(
options
.
getOutputTableSpec
()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String
,
String
>>
failedInserts
=
BigQueryIOUtils
.
writeResultToBigQueryInsertErrors
(
writeResult
,
options
)
.
apply
(
"WrapInsertionErrors"
,
MapElements
.
into
(
FAILSAFE_ELEMENT_CODER
.
getEncodedTypeDescriptor
())
.
via
((
BigQueryInsertError
e
)
-
>
wrapBigQueryInsertError
(
e
)))
.
setCoder
(
FAILSAFE_ELEMENT_CODER
);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList
.
of
(
ImmutableList
.
of
(
convertedTableRows
.
get
(
UDF_DEADLETTER_OUT
),
convertedTableRows
.
get
(
TRANSFORM_DEADLETTER_OUT
)))
.
apply
(
"Flatten"
,
Flatten
.
pCollections
())
.
apply
(
"WriteFailedRecords"
,
ErrorConverters
.
WritePubsubMessageErrors
.
newBuilder
()
.
setErrorRecordsTable
(
!
Strings
.
isNullOrEmpty
(
options
.
getOutputDeadletterTable
())
?
options
.
getOutputDeadletterTable
()
:
options
.
getOutputTableSpec
()
+
DEFAULT_DEADLETTER_TABLE_SUFFIX
)
.
setErrorRecordsTableSchema
(
ResourceUtils
.
getDeadletterTableSchemaJson
())
.
build
());
// 5) Insert records that failed insert into deadletter table
failedInserts
.
apply
(
"WriteFailedRecords"
,
ErrorConverters
.
WriteStringMessageErrors
.
newBuilder
()
.
setErrorRecordsTable
(
!
Strings
.
isNullOrEmpty
(
options
.
getOutputDeadletterTable
())
?
options
.
getOutputDeadletterTable
()
:
options
.
getOutputTableSpec
()
+
DEFAULT_DEADLETTER_TABLE_SUFFIX
)
.
setErrorRecordsTableSchema
(
ResourceUtils
.
getDeadletterTableSchemaJson
())
.
build
());
return
pipeline
.
run
();
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static
class
PubsubMessageToTableRow
extends
PTransform<PCollection<PubsubMessage>
,
PCollectionTuple
>
{
private
final
Options
options
;
PubsubMessageToTableRow
(
Options
options
)
{
this
.
options
=
options
;
}
@Override
public
PCollectionTuple
expand
(
PCollection<PubsubMessage>
input
)
{
boolean
useJavascriptUdf
=
!
Strings
.
isNullOrEmpty
(
options
.
getJavascriptTextTransformGcsPath
());
boolean
usePythonUdf
=
!
Strings
.
isNullOrEmpty
(
options
.
getPythonExternalTextTransformGcsPath
());
if
(
useJavascriptUdf
&&
usePythonUdf
)
{
throw
new
IllegalArgumentException
(
"Either javascript or Python gcs path must be provided, but not both."
);
}
PCollectionTuple
udfOut
;
if
(
usePythonUdf
)
{
PCollection<Row>
udfRowsOut
=
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.
apply
(
"MapToRecord"
,
PythonExternalTextTransformer
.
FailsafeRowPythonExternalUdf
.
pubSubMappingFunction
())
.
setRowSchema
(
PythonExternalTextTransformer
.
FailsafeRowPythonExternalUdf
.
ROW_SCHEMA
)
.
apply
(
"InvokeUDF"
,
PythonExternalTextTransformer
.
FailsafePythonExternalUdf
.
newBuilder
()
.
setFileSystemPath
(
options
.
getPythonExternalTextTransformGcsPath
())
.
setFunctionName
(
options
.
getPythonExternalTextTransformFunctionName
())
.
build
());
udfOut
=
udfRowsOut
.
apply
(
"MapRowsToFailsafeElements"
,
ParDo
.
of
(
new
RowToPubSubFailsafeElementFn
(
UDF_OUT
,
UDF_DEADLETTER_OUT
))
.
withOutputTags
(
UDF_OUT
,
TupleTagList
.
of
(
UDF_DEADLETTER_OUT
)));
}
else
{
udfOut
=
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.
apply
(
"MapToRecord"
,
ParDo
.
of
(
new
PubsubMessageToFailsafeElementFn
()))
.
apply
(
"InvokeUDF"
,
FailsafeJavascriptUdf
.
< PubsubMessage>newBuilder
()
.
setFileSystemPath
(
options
.
getJavascriptTextTransformGcsPath
())
.
setFunctionName
(
options
.
getJavascriptTextTransformFunctionName
())
.
setReloadIntervalMinutes
(
options
.
getJavascriptTextTransformReloadIntervalMinutes
())
.
setSuccessTag
(
UDF_OUT
)
.
setFailureTag
(
UDF_DEADLETTER_OUT
)
.
build
());
}
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple
jsonToTableRowOut
=
udfOut
.
get
(
UDF_OUT
)
.
apply
(
"JsonToTableRow"
,
FailsafeJsonToTableRow
.
< PubsubMessage>newBuilder
()
.
setSuccessTag
(
TRANSFORM_OUT
)
.
setFailureTag
(
TRANSFORM_DEADLETTER_OUT
)
.
build
());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return
PCollectionTuple
.
of
(
UDF_OUT
,
udfOut
.
get
(
UDF_OUT
))
.
and
(
UDF_DEADLETTER_OUT
,
udfOut
.
get
(
UDF_DEADLETTER_OUT
))
.
and
(
TRANSFORM_OUT
,
jsonToTableRowOut
.
get
(
TRANSFORM_OUT
))
.
and
(
TRANSFORM_DEADLETTER_OUT
,
jsonToTableRowOut
.
get
(
TRANSFORM_DEADLETTER_OUT
));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static
class
PubsubMessageToFailsafeElementFn
extends
DoFn<PubsubMessage
,
FailsafeElement<PubsubMessage
,
String
>>
{
@ProcessElement
public
void
processElement
(
ProcessContext
context
)
{
PubsubMessage
message
=
context
.
element
();
context
.
output
(
FailsafeElement
.
of
(
message
,
new
String
(
message
.
getPayload
(),
StandardCharsets
.
UTF_8
)));
}
}
}