/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.v2.templates
;
import static
com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.HASH_SINK
;
import static
com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.LOGGING_SINK
;
import static
com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STREAMS_SINK
;
import static
com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STRING_SINK
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption
;
import
com.google.cloud.teleport.v2.coders.FailsafeElementCoder
;
import
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger
;
import
com.google.cloud.teleport.v2.templates.io.RedisHashIO
;
import
com.google.cloud.teleport.v2.templates.transforms.MessageTransformation
;
import
com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage
;
import
com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer
;
import
com.google.cloud.teleport.v2.values.FailsafeElement
;
import
com.google.common.base.Strings
;
import
java.nio.charset.StandardCharsets
;
import
java.util.Map
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.coders.CoderRegistry
;
import
org.apache.beam.sdk.coders.StringUtf8Coder
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder
;
import
org.apache.beam.sdk.io.redis.RedisConnectionConfiguration
;
import
org.apache.beam.sdk.io.redis.RedisIO
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.options.Validation
;
import
org.apache.beam.sdk.options.ValueProvider
;
import
org.apache.beam.sdk.transforms.DoFn
;
import
org.apache.beam.sdk.transforms.MapElements
;
import
org.apache.beam.sdk.transforms.ParDo
;
import
org.apache.beam.sdk.values.KV
;
import
org.apache.beam.sdk.values.PCollection
;
import
org.apache.beam.sdk.values.PCollectionTuple
;
import
org.apache.beam.sdk.values.TupleTag
;
import
org.apache.beam.sdk.values.TypeDescriptors
;
import
org.apache.commons.lang3.ArrayUtils
;
import
org.checkerframework.checker.initialization.qual.Initialized
;
import
org.checkerframework.checker.nullness.qual.NonNull
;
import
org.checkerframework.checker.nullness.qual.UnknownKeyFor
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* The {@link PubSubToRedis} pipeline is a streaming pipeline which ingests data in Bytes from
* PubSub, and inserts resulting records as KV in Redis.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The PubSub topic and subscriptions exist
* <li>The Redis is up and running
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_NAME=my-project
* BUCKET_NAME=my-bucket
* INPUT_SUBSCRIPTION=my-subscription
* REDIS_HOST=my-host
* REDIS_PORT=my-port
* REDIS_PASSWORD=my-pwd
*
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToRedis \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_NAME} \
* --stagingLocation=gs://${BUCKET_NAME}/staging \
* --tempLocation=gs://${BUCKET_NAME}/temp \
* --runner=DataflowRunner \
* --inputSubscription=${INPUT_SUBSCRIPTION} \
* --redisHost=${REDIS_HOST}
* --redisPort=${REDIS_PORT}
* --redisPassword=${REDIS_PASSWORD}"
* </pre>
*/
@Template
(
name
=
"Cloud_PubSub_to_Redis"
,
category
=
TemplateCategory
.
STREAMING
,
displayName
=
"Pub/Sub to Redis"
,
description
=
{
"The Pub/Sub to Redis template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
+
"writes the message payload to Redis. The most common use case of this template is to export logs to Redis "
+
"Enterprise for advanced search-based log analysis in real time."
,
"Before writing to Redis, you can apply a JavaScript user-defined function to the message payload. Any "
+
"messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further "
+
"troubleshooting and reprocessing."
,
"For added security, enable an SSL connection when setting up your database endpoint connection."
},
optionsClass
=
PubSubToRedis
.
PubSubToRedisOptions
.
class
,
flexContainerName
=
"pubsub-to-redis"
,
contactInformation
=
"https://github.com/GoogleCloudPlatform/DataflowTemplates/issues"
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-redis"
,
requirements
=
{
"The source Pub/Sub subscription must exist prior to running the pipeline."
,
"The Pub/Sub unprocessed (dead-letter) topic must exist prior to running the pipeline if using a JavaScript UDF."
,
"The Redis database endpoint must be accessible from the Dataflow workers' subnetwork."
,
},
preview
=
true
,
streaming
=
true
,
supportsAtLeastOnce
=
true
)
public
class
PubSubToRedis
{
/*
* Options supported by {@link PubSubToRedis}
*
* <p>Inherits standard configuration options.
*/
/** The log to output status messages to. */
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
PubSubToRedis
.
class
);
/** The tag for the main output for the UDF. */
public
static
final
TupleTag<FailsafeElement<PubsubMessage
,
String
>>
UDF_OUT
=
new
TupleTag<FailsafeElement<PubsubMessage
,
String
>> ()
{};
/** The tag for the dead-letter output of the udf. */
public
static
final
TupleTag<FailsafeElement<PubsubMessage
,
String
>>
UDF_DEADLETTER_OUT
=
new
TupleTag<FailsafeElement<PubsubMessage
,
String
>> ()
{};
/** Pubsub message/string coder for FailsafeElement. */
public
static
final
FailsafeElementCoder<PubsubMessage
,
String
>
FAILSAFE_ELEMENT_CODER
=
FailsafeElementCoder
.
of
(
PubsubMessageWithAttributesCoder
.
of
(),
StringUtf8Coder
.
of
());
/**
* The {@link PubSubToRedisOptions} class provides the custom execution options passed by the
* executor at the command-line.
*
* <p>Inherits standard configuration options.
*/
public
interface
PubSubToRedisOptions
extends
JavascriptTextTransformer
.
JavascriptTextTransformerOptions
{
@TemplateParameter.PubsubSubscription
(
order
=
1
,
groupName
=
"Source"
,
description
=
"Pub/Sub input subscription"
,
helpText
=
"The Pub/Sub subscription to read the input from."
,
example
=
"projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>"
)
String
getInputSubscription
();
void
setInputSubscription
(
String
value
);
@TemplateParameter.Text
(
order
=
2
,
groupName
=
"Target"
,
description
=
"Redis DB Host"
,
helpText
=
"The Redis database host."
,
example
=
"your.cloud.db.redislabs.com"
)
@Default.String
(
"127.0.0.1"
)
@Validation.Required
String
getRedisHost
();
void
setRedisHost
(
String
redisHost
);
@TemplateParameter.Integer
(
order
=
3
,
groupName
=
"Target"
,
description
=
"Redis DB Port"
,
helpText
=
"The Redis database port."
,
example
=
"12345"
)
@Default.Integer
(
6379
)
@Validation.Required
int
getRedisPort
();
void
setRedisPort
(
int
redisPort
);
@TemplateParameter.Password
(
order
=
4
,
groupName
=
"Target"
,
description
=
"Redis DB Password"
,
helpText
=
"The Redis database password. Defaults to `empty`."
)
@Default.String
(
""
)
@Validation.Required
String
getRedisPassword
();
void
setRedisPassword
(
String
redisPassword
);
@TemplateParameter.Boolean
(
order
=
5
,
optional
=
true
,
description
=
"Redis ssl enabled"
,
helpText
=
"The Redis database SSL parameter."
)
@Default.Boolean
(
false
)
@UnknownKeyFor
@NonNull
@Initialized
ValueProvider
< @UnknownKeyFor
@NonNull
@Initialized
Boolean
>
getSslEnabled
();
void
setSslEnabled
(
ValueProvider<Boolean>
sslEnabled
);
@TemplateParameter.Enum
(
order
=
6
,
optional
=
true
,
enumOptions
=
{
@TemplateEnumOption
(
"STRING_SINK"
),
@TemplateEnumOption
(
"HASH_SINK"
),
@TemplateEnumOption
(
"STREAMS_SINK"
),
@TemplateEnumOption
(
"LOGGING_SINK"
)
},
description
=
"Redis sink to write"
,
helpText
=
"The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`."
,
example
=
"STRING_SINK"
)
@Default.Enum
(
"STRING_SINK"
)
RedisSinkType
getRedisSinkType
();
void
setRedisSinkType
(
RedisSinkType
redisSinkType
);
@TemplateParameter.Integer
(
order
=
7
,
optional
=
true
,
description
=
"Redis connection timeout in milliseconds"
,
helpText
=
"The Redis connection timeout in milliseconds. "
,
example
=
"2000"
)
@Default.Integer
(
2000
)
int
getConnectionTimeout
();
void
setConnectionTimeout
(
int
timeout
);
@TemplateParameter.Long
(
order
=
8
,
optional
=
true
,
parentName
=
"redisSinkType"
,
parentTriggerValues
=
{
"HASH_SINK"
,
"LOGGING_SINK"
},
description
=
"Hash key expiration time in sec (ttl), supported only for HASH_SINK and LOGGING_SINK"
,
helpText
=
"The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires."
)
@Default.Long
(
-
1L
)
Long
getTtl
();
void
setTtl
(
Long
ttl
);
@TemplateParameter.PubsubTopic
(
order
=
9
,
optional
=
true
,
description
=
"Output deadletter Pub/Sub topic"
,
helpText
=
"The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation are forwarded here, Required if using a JavaScript UDF."
,
example
=
"projects/<PROJECT_ID>/topics/<TOPIC_NAME>"
)
String
getOutputDeadletterTopic
();
void
setOutputDeadletterTopic
(
String
outputDeadletterTopic
);
}
/** Allowed list of sink types. */
public
enum
RedisSinkType
{
HASH_SINK
,
LOGGING_SINK
,
STREAMS_SINK
,
STRING_SINK
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public
static
void
main
(
String
[]
args
)
{
UncaughtExceptionLogger
.
register
();
// Parse the user options passed from the command-line.
PubSubToRedisOptions
options
=
PipelineOptionsFactory
.
fromArgs
(
args
).
withValidation
().
as
(
PubSubToRedisOptions
.
class
);
run
(
options
);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public
static
PipelineResult
run
(
PubSubToRedisOptions
options
)
{
// Create the pipeline
Pipeline
pipeline
=
Pipeline
.
create
(
options
);
// Register the coders for pipeline
CoderRegistry
coderRegistry
=
pipeline
.
getCoderRegistry
();
coderRegistry
.
registerCoderForType
(
FAILSAFE_ELEMENT_CODER
.
getEncodedTypeDescriptor
(),
FAILSAFE_ELEMENT_CODER
);
RedisConnectionConfiguration
redisConnectionConfiguration
=
RedisConnectionConfiguration
.
create
()
.
withHost
(
options
.
getRedisHost
())
.
withPort
(
options
.
getRedisPort
())
.
withAuth
(
options
.
getRedisPassword
())
.
withTimeout
(
options
.
getConnectionTimeout
())
.
withSSL
(
options
.
getSslEnabled
());
/*
* Steps: 1) Read PubSubMessage with attributes and messageId from input PubSub subscription.
* 2) Apply JavaScript UDF transformation to message payload (if configured).
* 3) Extract PubSubMessage message to appropriate Redis format.
* 4) Write to Redis using the configured sink type.
*
*/
LOG
.
info
(
"Starting PubSub-To-Redis Pipeline. Reading from subscription: {}"
,
options
.
getInputSubscription
());
PCollection<PubsubMessage>
messages
=
pipeline
.
apply
(
"Read PubSub Events"
,
MessageTransformation
.
readFromPubSub
(
options
.
getInputSubscription
()));
PCollection<PubsubMessage>
maybeTransformed
=
applyUdf
(
messages
,
options
);
if
(
options
.
getRedisSinkType
().
equals
(
STRING_SINK
))
{
PCollection<String>
pCollectionString
=
maybeTransformed
.
apply
(
"Map to Redis String"
,
ParDo
.
of
(
new
MessageTransformation
.
MessageToRedisString
()));
PCollection<KV<String
,
String
>>
kvStringCollection
=
pCollectionString
.
apply
(
"Transform to String KV"
,
MapElements
.
into
(
TypeDescriptors
.
kvs
(
TypeDescriptors
.
strings
(),
TypeDescriptors
.
strings
()))
.
via
(
record
-
>
KV
.
of
(
MessageTransformation
.
key
,
record
)));
kvStringCollection
.
apply
(
"Write to "
+
STRING_SINK
.
name
(),
RedisIO
.
write
()
.
withMethod
(
RedisIO
.
Write
.
Method
.
SET
)
.
withConnectionConfiguration
(
redisConnectionConfiguration
));
}
if
(
options
.
getRedisSinkType
().
equals
(
HASH_SINK
))
{
PCollection<KV<String
,
KV<String
,
String
>>>
pCollectionHash
=
maybeTransformed
.
apply
(
"Map to Redis Hash"
,
ParDo
.
of
(
new
MessageTransformation
.
MessageToRedisHash
()));
pCollectionHash
.
apply
(
"Write to "
+
HASH_SINK
.
name
(),
RedisHashIO
.
write
()
.
withConnectionConfiguration
(
redisConnectionConfiguration
)
.
withTtl
(
options
.
getTtl
()));
}
if
(
options
.
getRedisSinkType
().
equals
(
LOGGING_SINK
))
{
PCollection<KV<String
,
KV<String
,
String
>>>
pCollectionHash
=
maybeTransformed
.
apply
(
"Map to Redis Logs"
,
ParDo
.
of
(
new
MessageTransformation
.
MessageToRedisLogs
()));
pCollectionHash
.
apply
(
"Write to "
+
LOGGING_SINK
.
name
(),
RedisHashIO
.
write
()
.
withConnectionConfiguration
(
redisConnectionConfiguration
)
.
withTtl
(
options
.
getTtl
()));
}
if
(
options
.
getRedisSinkType
().
equals
(
STREAMS_SINK
))
{
PCollection<KV<String
,
Map<String
,
String
>>>
pCollectionStreams
=
maybeTransformed
.
apply
(
"Map to Redis Streams"
,
ParDo
.
of
(
new
MessageTransformation
.
MessageToRedisStreams
()));
pCollectionStreams
.
apply
(
"Write to "
+
STREAMS_SINK
.
name
(),
RedisIO
.
writeStreams
().
withConnectionConfiguration
(
redisConnectionConfiguration
));
}
// Execute the pipeline and return the result.
return
pipeline
.
run
();
}
/**
* Applies the JavaScript UDF to messages if configured, and writes UDF failures to the
* dead-letter topic.
*
* <p>If no UDF is configured, returns the input messages unchanged.
*
* @param messages the input PubSub messages
* @param options the pipeline options
* @return the (possibly transformed) PubSub messages
*/
// This follows the same pattern as PubsubProtoToBigQuery.runUdf
static
PCollection<PubsubMessage>
applyUdf
(
PCollection<PubsubMessage>
messages
,
PubSubToRedisOptions
options
)
{
boolean
useJavascriptUdf
=
!
Strings
.
isNullOrEmpty
(
options
.
getJavascriptTextTransformGcsPath
());
if
(
!
useJavascriptUdf
)
{
return
messages
;
}
if
(
Strings
.
isNullOrEmpty
(
options
.
getJavascriptTextTransformFunctionName
()))
{
throw
new
IllegalArgumentException
(
"JavaScript function name cannot be null or empty if file is set"
);
}
if
(
Strings
.
isNullOrEmpty
(
options
.
getOutputDeadletterTopic
()))
{
throw
new
IllegalArgumentException
(
"A dead-letter Pub/Sub topic (--outputDeadletterTopic) must be provided when using a JavaScript UDF."
);
}
// Map incoming messages to FailsafeElement so we can recover from failures
// across multiple transforms.
PCollection<FailsafeElement<PubsubMessage
,
String
>>
failsafeElements
=
messages
.
apply
(
"MapToRecord"
,
ParDo
.
of
(
new
PubsubMessageToFailsafeElementFn
()));
PCollectionTuple
udfResult
=
failsafeElements
.
apply
(
"InvokeUDF"
,
JavascriptTextTransformer
.
FailsafeJavascriptUdf
.
< PubsubMessage>newBuilder
()
.
setFileSystemPath
(
options
.
getJavascriptTextTransformGcsPath
())
.
setFunctionName
(
options
.
getJavascriptTextTransformFunctionName
())
.
setReloadIntervalMinutes
(
options
.
getJavascriptTextTransformReloadIntervalMinutes
())
.
setSuccessTag
(
UDF_OUT
)
.
setFailureTag
(
UDF_DEADLETTER_OUT
)
.
build
());
// Write UDF failures to the dead-letter topic using the shared
// ConvertFailsafeElementToPubsubMessage transform, following the pattern
// from PubsubProtoToBigQuery.
udfResult
.
get
(
UDF_DEADLETTER_OUT
)
.
setCoder
(
FAILSAFE_ELEMENT_CODER
)
.
apply
(
"Get UDF Failures"
,
ConvertFailsafeElementToPubsubMessage
.
< PubsubMessage
,
String>builder
()
.
setOriginalPayloadSerializeFn
(
msg
-
>
ArrayUtils
.
toObject
(
msg
.
getPayload
()))
.
setErrorMessageAttributeKey
(
"udfErrorMessage"
)
.
build
())
.
apply
(
"Write Failed UDF"
,
writeUdfFailures
(
options
));
// Extract the successfully transformed messages
return
udfResult
.
get
(
UDF_OUT
)
.
apply
(
"Extract Transformed Payload"
,
ParDo
.
of
(
new
DoFn<FailsafeElement<PubsubMessage
,
String
> ,
PubsubMessage
> ()
{
@ProcessElement
public
void
processElement
(
ProcessContext
c
)
{
FailsafeElement<PubsubMessage
,
String
>
element
=
c
.
element
();
c
.
output
(
new
PubsubMessage
(
element
.
getPayload
().
getBytes
(
StandardCharsets
.
UTF_8
),
element
.
getOriginalPayload
().
getAttributeMap
(),
element
.
getOriginalPayload
().
getMessageId
()));
}
}));
}
/**
* Returns a {@link PubsubIO.Write} configured to write UDF failures to the dead-letter topic.
*
* <p>Follows the same pattern as {@code PubsubProtoToBigQuery.writeUdfFailures}.
*/
private
static
PubsubIO
.
Write<PubsubMessage>
writeUdfFailures
(
PubSubToRedisOptions
options
)
{
return
PubsubIO
.
writeMessages
().
to
(
options
.
getOutputDeadletterTopic
());
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a dead-letter output.
*/
static
class
PubsubMessageToFailsafeElementFn
extends
DoFn<PubsubMessage
,
FailsafeElement<PubsubMessage
,
String
>>
{
@ProcessElement
public
void
processElement
(
ProcessContext
context
)
{
PubsubMessage
message
=
context
.
element
();
context
.
output
(
FailsafeElement
.
of
(
message
,
new
String
(
message
.
getPayload
(),
StandardCharsets
.
UTF_8
)));
}
}
}