This document details how to migrate data from a Pub/Sub Lite topic to a Managed Service for Apache Kafka topic.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Verify that you have the permissions required to complete this guide .
-
Enable the Pub/Sub Lite, Cloud Pub/Sub, and Managed Service for Apache Kafka APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles .gcloud services enable pubsublite.googleapis.com
pubsub.googleapis.com managedkafka.googleapis.com -
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Verify that you have the permissions required to complete this guide .
-
Enable the Pub/Sub Lite, Cloud Pub/Sub, and Managed Service for Apache Kafka APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles .gcloud services enable pubsublite.googleapis.com
pubsub.googleapis.com managedkafka.googleapis.com - Identify the Pub/Sub Lite topic and subscription to migrate.
- Create a Managed Service for Apache Kafka cluster in the same region as your Pub/Sub Lite topic. For more information, see Create a Kafka cluster .
- Create a Kafka topic to replace the Pub/Sub Lite topic and subscription.
- Create a Kafka Connect cluster, with the Kafka cluster as the primar cluster. For more information, see Create a Connect cluster .
-
Upgrade your dependencies to the latest version:
- Java: Use version 1.16.1 or higher of
java-pubsublite. - Go: Ensure the
pscompat.ManagedKafkabackend is available.
- Java: Use version 1.16.1 or higher of
Required roles and permissions
To get the permissions that you need to set up Pub/Sub Lite migration, ask your administrator to grant you the following IAM roles on project:
- Managed Kafka Client
(
roles/managedkafka.client) - Pub/Sub Lite Subscriber
(
roles/pubsublite.subscriber) - Pub/Sub Lite Viewer
(
roles/pubsublite.viewer) - Pub/Sub Editor
(
roles/pubsub.editor)
For more information about granting roles, see Manage access to projects, folders, and organizations .
You might also be able to get the required permissions through custom roles or other predefined roles .
Migration workflow
The migration has several phases that you complete in order:
-
Set up replication . Create a pipeline that moves messages published to Pub/Sub Lite into a Kafka topic.
-
Migrate consumers . Update your consumers to read from Kafka.
-
Migrate publishers . Update your publishers to write to Kafka.
-
Decommission . Delete the replication pipeline and the Pub/Sub Lite topic.
Throughout the migration, your application code uses the same Publisher
and Subscriber
types in Java, or the same pscompat.PublisherClient
and pscompat.SubscriberClient
types in Go. The switch to
Managed Service for Apache Kafka is accomplished by setting a flag in your
publisher and consumer code.
Set up replication
Replication uses the following components:
-
Pub/Sub Lite export subscription: Streams messages from a Pub/Sub Lite topic to a Pub/Sub topic. For more information, see Export Pub/Sub Lite messages to Pub/Sub .
-
Pub/Sub bridge topic and subscription: The staging area that Kafka Connect reads from.
-
Kafka Connect: Uses a Pub/Sub Source connector to pull messages from the Pub/Sub subscription and write them to Kafka. For more information, see Replicate Pub/Sub messages to Kafka .
Preserve ordering
To preserve message ordering, set kafka.key.attribute=orderingKey
on the
Pub/Sub Source connector. With this setting, the connector uses
the Pub/Sub ordering key as the Kafka message key.
Because Kafka routes messages to partitions by key hash, all messages sharing an
ordering key are written to the same Kafka partition. The export subscription
preserves the Pub/Sub Lite orderingKey
as the ordering key in
the Pub/Sub message, so this setting preserves the per-key
ordering established in Pub/Sub Lite.
Partition affinity
To ensure that messages from a given Pub/Sub Lite partition are always written to the same Kafka partition, do the following:
-
Create the Kafka topic with the same number of partitions as the Pub/Sub Lite topic.
-
Set
kafka.partition.scheme=hash_keyin the Pub/Sub Source connector.
The timestamp-based offset translation strategy for migrating consumers relies on this behavior.
Connector configuration
The following example shows the recommended configuration settings for the Pub/Sub Source connector:
{
"connector.class"
:
"com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
,
"cps.project"
:
" PROJECT_ID
"
,
"cps.subscription"
:
" BRIDGE_SUBSCRIPTION
"
,
"kafka.topic"
:
" KAFKA_TOPIC
"
,
"kafka.key.attribute"
:
"orderingKey"
,
"kafka.partition.scheme"
:
"hash_key"
,
"kafka.partition.count"
:
" PARTITION_COUNT
"
,
"kafka.record.headers"
:
"true"
,
"key.converter"
:
"org.apache.kafka.connect.storage.StringConverter"
,
"value.converter"
:
"org.apache.kafka.connect.converters.ByteArrayConverter"
,
"tasks.max"
:
" MAXIMUM_TASK_COUNT
"
}
Replace the following:
-
PROJECT_ID: The ID of the Google Cloud project that contains the Pub/Sub subscription. -
BRIDGE_SUBSCRIPTION: The ID of the Pub/Sub subscription. -
KAFKA_TOPIC: The name of the Kafka topic. -
PARTITION_COUNT: The number of Kafka topic partitions. Use the Pub/Sub Lite partition count. -
MAXIMUM_TASK_COUNT: The maximum number of parallel tasks in the connector. The value should be equal to or less thanPARTITION_COUNT.
Verify replication
Before migrating any clients, perform the following steps to confirm that the replication pipeline is working:
-
Run the
gcloud managed-kafka connectors describecommand.gcloud managed-kafka connectors describe CONNECTOR_NAME \ --connect_cluster = CONNECT_CLUSTER \ --location = REGION \ --project = PROJECT \ --format = "value(state)"Replace the following:
-
CONNECTOR_NAME: The name of the Pub/Sub Source connector. -
CONNECT_CLUSTER: The name of the Connect cluster. -
REGION: The location of the Connect cluster. -
PROJECT: The name of the Google Cloud project that contains the Connect cluster.
If the connector is running, the output is
RUNNING. Otherwise, it means there is a problem with the connector. For troubleshooting tips, see Troubleshoot a Pub/Sub connector . -
-
Publish a test message to the Pub/Sub Lite topic and read it back from the Kafka topic, by using a tool such as
kafka-console-consumeror a basic client application. The Kafka message key should equal the Pub/Sub Lite ordering key.
Migrate consumers
Based on your tolerance for duplicates and interruptions in publishing, use one the following strategies to migrate your consumers:
| Strategy | Duplication risk | Publishing interruption | Required consumer capability |
|---|---|---|---|
| None | Yes | Standard | |
| Duplicates might occur | No | Idempotent consumers |
Clean cut-over
Use this strategy when you can briefly stop publishing.
- Stop the publisher.
- Let Pub/Sub Lite consumers drain. Process all remaining Pub/Sub Lite messages.
- Stop the Pub/Sub Lite consumers.
- Wait for replication to finish:
- In the Pub/Sub bridge subscription,
num_undelivered_messagesreaches zero. - In the destination Kafka topic, offsets stop growing.
- In the Pub/Sub bridge subscription,
- Seek the Kafka consumer group to the end (latest) on every partition.
Timestamp-based offset translation
Use this strategy if you can't pause publishing. Consumers must be idempotent, because messages published between the earliest and latest committed Pub/Sub Lite timestamps are re-delivered.
-
Stop the Pub/Sub Lite consumers. ( Don't stop the publisher yet.)
-
For each partition, translate the Pub/Sub Lite committed offset (for example, T=980 ) to the corresponding Kafka offset, which is the first Kafka message at or after the Pub/Sub Lite publish time on the same partition. Use the Offset migration orchestrator to automate this step.
-
Set the Kafka consumer group to those offsets.
After the orchestrator commits offsets and validation passes, switch your consumers to Managed Service for Apache Kafka, as described in the next section. Consumers resume from the migrated position.
Switch consumers to Managed Service for Apache Kafka
The latest Pub/Sub Lite client libraries accept a flag to set the backend to Managed Service for Apache Kafka. By setting this flag and supplying the Kafka connection details, the same consumer that you used for Pub/Sub Lite becomes a Managed Service for Apache Kafka consumer.
During migration, run two sets of consumers, one that reads from Pub/Sub Lite, and one that reads from Managed Service for Apache Kafka. Compare what each consumer receives to confirm they have parity. At that point, you can decommission the Pub/Sub Lite consumers.
The following code examples show how to configure a consumer for Managed Service for Apache Kafka:
Java
import
com.google.cloud.pubsublite.*
;
import
com.google.cloud.pubsublite.cloudpubsub.*
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.util.Map
;
String
projectId
=
"PROJECT_ID"
;
String
region
=
"us-central1"
;
String
gmkCluster
=
"GMK_CLUSTER_ID"
;
String
gmkTopic
=
"GMK_TOPIC"
;
Map<String
,
Object
>
kafkaProperties
=
GmkUtils
.
buildGmkKafkaProperties
(
projectId
,
region
,
gmkCluster
);
// Optional: start from earliest if no group offset is committed yet.
kafkaProperties
.
put
(
"auto.offset.reset"
,
"earliest"
);
// In GMK mode the SubscriptionName is used as both the Kafka topic and the
// consumer group ID. Use a stable name; consumers sharing it share the load.
SubscriptionPath
subscriptionPath
=
SubscriptionPath
.
newBuilder
()
.
setProject
(
ProjectId
.
of
(
projectId
))
.
setLocation
(
CloudRegion
.
of
(
region
))
.
setName
(
SubscriptionName
.
of
(
gmkTopic
))
.
build
();
MessageReceiver
receiver
=
(
PubsubMessage
message
,
com
.
google
.
cloud
.
pubsub
.
v1
.
AckReplyConsumer
ack
)
-
>
{
System
.
out
.
println
(
"Received: "
+
message
.
getData
().
toStringUtf8
());
ack
.
ack
();
};
FlowControlSettings
flowControl
=
FlowControlSettings
.
builder
()
.
setBytesOutstanding
(
10L
*
1024
*
1024
)
.
setMessagesOutstanding
(
1000L
)
.
build
();
SubscriberSettings
settings
=
SubscriberSettings
.
newBuilder
()
.
setSubscriptionPath
(
subscriptionPath
)
.
setReceiver
(
receiver
)
.
setMessagingBackend
(
MessagingBackend
.
MANAGED_KAFKA
)
.
setKafkaProperties
(
kafkaProperties
)
.
setPerPartitionFlowControlSettings
(
flowControl
)
.
build
();
Subscriber
subscriber
=
Subscriber
.
create
(
settings
);
subscriber
.
startAsync
().
awaitRunning
();
// ... receive messages ...
// subscriber.stopAsync().awaitTerminated();
Go
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
"github.com/IBM/sarama"
)
ctx
:=
context
.
Background
()
bootstrap
:=
pscompat
.
BuildGMKBootstrapServer
(
"PROJECT_ID"
,
"us-central1"
,
"GMK_CLUSTER_ID"
)
saramaCfg
,
err
:=
pscompat
.
NewGMKSaramaConfig
(
ctx
)
if
err
!=
nil
{
panic
(
err
)
}
// Optional: read from earliest when no group offset is committed yet.
saramaCfg
.
Consumer
.
Offsets
.
Initial
=
sarama
.
OffsetOldest
settings
:=
pscompat
.
ReceiveSettings
{
Backend
:
pscompat
.
ManagedKafka
,
KafkaConfig
:
& pscompat
.
KafkaSubscribeConfig
{
BootstrapServers
:
bootstrap
,
TopicName
:
"GMK_TOPIC"
,
SubscriptionName
:
"GMK_CONSUMER_GROUP"
,
// also the Kafka group ID
SaramaConfig
:
saramaCfg
,
},
}
// The first arg is the PSL subscription path; it is unused when Backend is
// ManagedKafka, so any non-empty value is fine.
sub
,
err
:=
pscompat
.
NewSubscriberClientWithSettings
(
ctx
,
"unused"
,
settings
)
if
err
!=
nil
{
panic
(
err
)
}
err
=
sub
.
Receive
(
ctx
,
func
(
_
context
.
Context
,
m
*
pubsub
.
Message
)
{
fmt
.
Printf
(
"Received: %s (key=%s)\n"
,
string
(
m
.
Data
),
m
.
OrderingKey
)
m
.
Ack
()
})
if
err
!=
nil
{
panic
(
err
)
}
Migrate publishers
To migrate your publishers, set the backend to Managed Service for Apache Kafka, similar to the updates described in the previous section for consumers.
When you deploy the new publisher code, start with a non-production environment. Optionally, if your downstream consumers tolerate duplicate messages, run the Kafka publishers in parallel with the Pub/Sub Lite publishers and verify parity, before you decommission the Pub/Sub Lite publishers.
The following code examples show how to configure a publisher for Managed Service for Apache Kafka:
Java
import
com.google.cloud.pubsublite.*
;
import
com.google.cloud.pubsublite.cloudpubsub.*
;
import
com.google.protobuf. ByteString
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.util.Map
;
String
projectId
=
"PROJECT_ID"
;
String
region
=
"us-central1"
;
String
gmkCluster
=
"GMK_CLUSTER_ID"
;
String
gmkTopic
=
"GMK_TOPIC"
;
Map<String
,
Object
>
kafkaProperties
=
GmkUtils
.
buildGmkKafkaProperties
(
projectId
,
region
,
gmkCluster
);
TopicPath
topicPath
=
TopicPath
.
newBuilder
()
.
setProject
(
ProjectId
.
of
(
projectId
))
.
setLocation
(
CloudRegion
.
of
(
region
))
.
setName
(
TopicName
.
of
(
gmkTopic
))
.
build
();
PublisherSettings
settings
=
PublisherSettings
.
newBuilder
()
.
setTopicPath
(
topicPath
)
.
setMessagingBackend
(
MessagingBackend
.
MANAGED_KAFKA
)
.
setKafkaProperties
(
kafkaProperties
)
.
build
();
Publisher
publisher
=
Publisher
.
create
(
settings
);
publisher
.
startAsync
().
awaitRunning
();
PubsubMessage
message
=
PubsubMessage
.
newBuilder
()
.
setData
(
ByteString
.
copyFromUtf8
(
"hello from java-pubsublite/GMK"
))
.
setOrderingKey
(
"user-123"
)
// becomes the Kafka message key
.
putAttributes
(
"source"
,
"java-client"
)
.
build
();
String
messageId
=
publish
er .
publish
(
message
).
get
();
System
.
out
.
println
(
"Published: "
+
messageId
);
publisher
.
stopAsync
().
awaitTerminated
();
Go
import
(
"context"
"fmt"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
ctx
:=
context
.
Background
()
bootstrap
:=
pscompat
.
BuildGMKBootstrapServer
(
"PROJECT_ID"
,
"us-central1"
,
"GMK_CLUSTER_ID"
)
settings
:=
pscompat
.
PublishSettings
{
Backend
:
pscompat
.
ManagedKafka
,
KafkaConfig
:
& pscompat
.
KafkaPublishConfig
{
BootstrapServers
:
bootstrap
,
TopicName
:
"GMK_TOPIC"
,
},
}
// First arg is the PSL topic path; ignored in ManagedKafka mode.
pub
,
err
:=
pscompat
.
NewPublisherClientWithSettings
(
ctx
,
"unused"
,
settings
)
if
err
!=
nil
{
panic
(
err
)
}
defer
pub
.
Stop
()
result
:=
pub
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
[]
byte
(
"hello from pscompat/GMK"
),
OrderingKey
:
"user-123"
,
// becomes the Kafka message key
Attributes
:
map
[
string
]
string
{
"source"
:
"go-client"
},
})
id
,
err
:=
result
.
Get
(
ctx
)
if
err
!=
nil
{
panic
(
err
)
}
fmt
.
Println
(
"Published:"
,
id
)
Decommission Pub/Sub Lite resources
When every publisher and consumer is using Managed Service for Apache Kafka and you observe steady-state operation, decommission your Pub/Sub Lite resources as follows:
- Stop the Pub/Sub Source connector and delete it.
- Delete the Pub/Sub bridge topic and subscription.
- Delete the Pub/Sub Lite export subscription.
- Delete the Pub/Sub Lite topic and any remaining Pub/Sub Lite subscriptions.
Authentication
Both client libraries authenticate to
Managed Service for Apache Kafka with SASL_SSL
/ OAUTHBEARER
using Application Default Credentials
:
-
Java:
GmkUtils.buildGmkKafkaPropertiesreturns a property map preconfigured with:-
sasl.mechanism=OAUTHBEARER -
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler - The correct
sasl.jaas.config
Make sure the Kafka authorization callback handler from
managed-kafka-auth-login-handleris on your classpath. -
-
Go:
pscompat.NewGMKSaramaConfig(ctx)returns a*sarama.Configwith TLS enabled and a built-insarama.AccessTokenProviderbacked bygolang.org/x/oauth2/google.FindDefaultCredentials. You can pass this config toKafkaConfig.SaramaConfig, or leave itniland let the library build one.
The underlying principal must have the roles/managedkafka.client
role and any
topic-level ACLs on the Managed Service for Apache Kafka cluster.
Operational checklist
Use the following checklist to track the migration steps.
Offset migration orchestrator
Use the following code to implement the timestamp-based offset translation strategy for migrating consumers.
Java
import
com.google.cloud.pubsublite.*
;
import
com.google.cloud.pubsublite.cloudpubsub.GmkUtils
;
import
com.google.cloud.pubsublite.internal.*
;
import
com.google.cloud.pubsublite.internal.OffsetMigrationHelper.MigrationResult
;
import
java.util.*
;
String
projectId
=
"PROJECT_ID"
;
String
region
=
"us-central1"
;
String
pslTopic
=
"PSL_TOPIC"
;
String
pslSub
=
"PSL_SUBSCRIPTION"
;
String
gmkCluster
=
"GMK_CLUSTER_ID"
;
String
gmkTopic
=
"GMK_TOPIC"
;
int
partitionCount
=
3
;
CloudZone
pslZone
=
CloudZone
.
of
(
CloudRegion
.
of
(
region
),
'a'
);
TopicPath
pslTopicPath
=
TopicPath
.
newBuilder
()
.
setProject
(
ProjectId
.
of
(
projectId
)).
setLocation
(
pslZone
)
.
setName
(
TopicName
.
of
(
pslTopic
)).
build
();
SubscriptionPath
pslSubPath
=
SubscriptionPath
.
newBuilder
()
.
setProject
(
ProjectId
.
of
(
projectId
)).
setLocation
(
pslZone
)
.
setName
(
SubscriptionName
.
of
(
pslSub
)).
build
();
SubscriptionPath
gmkSubPath
=
SubscriptionPath
.
newBuilder
()
.
setProject
(
ProjectId
.
of
(
projectId
)).
setLocation
(
CloudRegion
.
of
(
region
))
.
setName
(
SubscriptionName
.
of
(
gmkTopic
)).
build
();
Map<String
,
Object
>
gmkProperties
=
GmkUtils
.
buildGmkKafkaProperties
(
projectId
,
region
,
gmkCluster
);
CursorClient
pslCursor
=
CursorClient
.
create
(
CursorClientSettings
.
newBuilder
().
setRegion
(
CloudRegion
.
of
(
region
)).
build
());
TopicStatsClient
pslStats
=
TopicStatsClient
.
create
(
TopicStatsClientSettings
.
newBuilder
().
setRegion
(
CloudRegion
.
of
(
region
)).
build
());
KafkaTopicStatsClient
kafkaStats
=
new
KafkaTopicStatsClient
(
CloudRegion
.
of
(
region
),
gmkProperties
);
KafkaCursorClient
kafkaCursor
=
new
KafkaCursorClient
(
CloudRegion
.
of
(
region
),
gmkProperties
);
MigrationOrchestrator
orchestrator
=
MigrationOrchestrator
.
newBuilder
()
.
setPslCursorClient
(
pslCursor
)
.
setPslTopicStatsClient
(
pslStats
)
.
setKafkaTopicStatsClient
(
kafkaStats
)
.
setKafkaCursorClient
(
kafkaCursor
)
.
setPslTopicPath
(
pslTopicPath
)
.
setPslSubscriptionPath
(
pslSubPath
)
.
setKafkaSubscriptionPath
(
gmkSubPath
)
.
setKafkaTopicName
(
gmkTopic
)
.
setPartitionCount
(
partitionCount
)
.
setDryRun
(
false
)
// set true to preview without committing
.
setValidate
(
true
)
// re-read committed offsets after reset
.
build
();
MigrationOrchestrator
.
Result
result
=
orchestrator
.
execute
();
System
.
out
.
println
(
result
.
getSummary
());
for
(
Map
.
Entry<Partition
,
MigrationResult
>
e
:
result
.
getPartitionResults
().
entrySet
())
{
MigrationResult
r
=
e
.
getValue
();
System
.
out
.
printf
(
"partition=%d pslOffset=%d kafkaOffset=%d status=%s validated=%b%n"
,
e
.
getKey
().
value
(),
r
.
getPslOffset
(),
r
.
getKafkaOffset
(),
r
.
getStatus
(),
r
.
isValidated
());
}
Go
import
(
"context"
"fmt"
"log"
vkit
"cloud.google.com/go/pubsublite/apiv1"
"cloud.google.com/go/pubsublite/pscompat"
"google.golang.org/api/option"
)
const
(
projectID
=
"PROJECT_ID"
region
=
"us-central1"
pslTopic
=
"PSL_TOPIC"
pslSub
=
"PSL_SUBSCRIPTION"
gmkCluster
=
"GMK_CLUSTER_ID"
gmkTopic
=
"GMK_TOPIC"
gmkGroupID
=
"GMK_CONSUMER_GROUP"
// your application's Kafka consumer group
partitions
=
3
)
ctx
:=
context
.
Background
()
pslZone
:=
region
+
"-a"
pslTopicPath
:=
fmt
.
Sprintf
(
"projects/%s/locations/%s/topics/%s"
,
projectID
,
pslZone
,
pslTopic
)
pslSubPath
:=
fmt
.
Sprintf
(
"projects/%s/locations/%s/subscriptions/%s"
,
projectID
,
pslZone
,
pslSub
)
bootstrap
:=
pscompat
.
BuildGMKBootstrapServer
(
projectID
,
region
,
gmkCluster
)
pslEndpoint
:=
fmt
.
Sprintf
(
"%s-pubsublite.googleapis.com:443"
,
region
)
pslCursor
,
_
:=
vkit
.
NewCursorClient
(
ctx
,
option
.
WithEndpoint
(
pslEndpoint
))
pslStats
,
_
:=
vkit
.
NewTopicStatsClient
(
ctx
,
option
.
WithEndpoint
(
pslEndpoint
))
kafkaStats
,
_
:=
pscompat
.
NewKafkaTopicStatsClient
(
ctx
,
& pscompat
.
KafkaTopicStatsClientConfig
{
BootstrapServers
:
bootstrap
})
kafkaCursor
,
_
:=
pscompat
.
NewKafkaCursorClient
(
ctx
,
& pscompat
.
KafkaCursorClientConfig
{
BootstrapServers
:
bootstrap
})
defer
pslCursor
.
Close
();
defer
pslStats
.
Close
()
defer
kafkaStats
.
Close
();
defer
kafkaCursor
.
Close
()
parts
:=
make
([]
int32
,
partitions
)
for
i
:=
range
parts
{
parts
[
i
]
=
int32
(
i
)
}
cfg
:=
& pscompat
.
MigrationConfig
{
PSLCursorClient
:
pslCursor
,
PSLTopicStatsClient
:
pslStats
,
PSLTopicPath
:
pslTopicPath
,
PSLSubscriptionPath
:
pslSubPath
,
KafkaTopicStats
:
kafkaStats
,
KafkaCursor
:
kafkaCursor
,
KafkaTopicName
:
gmkTopic
,
KafkaGroupID
:
gmkGroupID
,
Partitions
:
parts
,
DryRun
:
false
,
// set true to preview
Validate
:
true
,
// re-read committed offsets after reset
}
orch
:=
pscompat
.
NewMigrationOrchestrator
(
cfg
)
summary
,
err
:=
orch
.
Execute
(
ctx
)
if
err
!=
nil
{
log
.
Fatal
(
err
)
}
fmt
.
Print
(
summary
.
Summary
)
for
_
,
p
:=
range
parts
{
r
:=
summary
.
PartitionResults
[
p
]
fmt
.
Printf
(
"partition=%d pslOffset=%d kafkaOffset=%d status=%s validated=%t\n"
,
p
,
r
.
PSLOffset
,
r
.
KafkaOffset
,
r
.
Status
,
r
.
Validated
)
}

