Migrate from Pub/Sub Lite to Google Cloud Managed Service for Apache Kafka

This document details how to migrate data from a Pub/Sub Lite topic to a Managed Service for Apache Kafka topic.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  4. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  5. Verify that you have the permissions required to complete this guide .

  6. Enable the Pub/Sub Lite, Cloud Pub/Sub, and Managed Service for Apache Kafka APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin ), which contains the serviceusage.services.enable permission. Learn how to grant roles .

    gcloud  
    services  
     enable 
      
    pubsublite.googleapis.com  
     pubsub.googleapis.com  
     managedkafka.googleapis.com
  7. Install the Google Cloud CLI.

  8. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  9. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  10. Verify that you have the permissions required to complete this guide .

  11. Enable the Pub/Sub Lite, Cloud Pub/Sub, and Managed Service for Apache Kafka APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin ), which contains the serviceusage.services.enable permission. Learn how to grant roles .

    gcloud  
    services  
     enable 
      
    pubsublite.googleapis.com  
     pubsub.googleapis.com  
     managedkafka.googleapis.com
  12. Identify the Pub/Sub Lite topic and subscription to migrate.
  13. Create a Managed Service for Apache Kafka cluster in the same region as your Pub/Sub Lite topic. For more information, see Create a Kafka cluster .
  14. Create a Kafka topic to replace the Pub/Sub Lite topic and subscription.
  15. Create a Kafka Connect cluster, with the Kafka cluster as the primar cluster. For more information, see Create a Connect cluster .
  16. Upgrade your dependencies to the latest version:

    • Java: Use version 1.16.1 or higher of java-pubsublite .
    • Go: Ensure the pscompat.ManagedKafka backend is available.

Required roles and permissions

To get the permissions that you need to set up Pub/Sub Lite migration, ask your administrator to grant you the following IAM roles on project:

For more information about granting roles, see Manage access to projects, folders, and organizations .

You might also be able to get the required permissions through custom roles or other predefined roles .

Migration workflow

The migration has several phases that you complete in order:

  1. Set up replication . Create a pipeline that moves messages published to Pub/Sub Lite into a Kafka topic.

  2. Migrate consumers . Update your consumers to read from Kafka.

  3. Migrate publishers . Update your publishers to write to Kafka.

  4. Decommission . Delete the replication pipeline and the Pub/Sub Lite topic.

Throughout the migration, your application code uses the same Publisher and Subscriber types in Java, or the same pscompat.PublisherClient and pscompat.SubscriberClient types in Go. The switch to Managed Service for Apache Kafka is accomplished by setting a flag in your publisher and consumer code.

Set up replication

Replication uses the following components:

Preserve ordering

To preserve message ordering, set kafka.key.attribute=orderingKey on the Pub/Sub Source connector. With this setting, the connector uses the Pub/Sub ordering key as the Kafka message key.

Because Kafka routes messages to partitions by key hash, all messages sharing an ordering key are written to the same Kafka partition. The export subscription preserves the Pub/Sub Lite orderingKey as the ordering key in the Pub/Sub message, so this setting preserves the per-key ordering established in Pub/Sub Lite.

Partition affinity

To ensure that messages from a given Pub/Sub Lite partition are always written to the same Kafka partition, do the following:

  • Create the Kafka topic with the same number of partitions as the Pub/Sub Lite topic.

  • Set kafka.partition.scheme=hash_key in the Pub/Sub Source connector.

The timestamp-based offset translation strategy for migrating consumers relies on this behavior.

Connector configuration

The following example shows the recommended configuration settings for the Pub/Sub Source connector:

  { 
  
 "connector.class" 
 : 
  
 "com.google.pubsub.kafka.source.CloudPubSubSourceConnector" 
 , 
  
 "cps.project" 
 : 
  
 " PROJECT_ID 
" 
 , 
  
 "cps.subscription" 
 : 
  
 " BRIDGE_SUBSCRIPTION 
" 
 , 
  
 "kafka.topic" 
 : 
  
 " KAFKA_TOPIC 
" 
 , 
  
 "kafka.key.attribute" 
 : 
  
 "orderingKey" 
 , 
  
 "kafka.partition.scheme" 
 : 
  
 "hash_key" 
 , 
  
 "kafka.partition.count" 
 : 
  
 " PARTITION_COUNT 
" 
 , 
  
 "kafka.record.headers" 
 : 
  
 "true" 
 , 
  
 "key.converter" 
 : 
  
 "org.apache.kafka.connect.storage.StringConverter" 
 , 
  
 "value.converter" 
 : 
  
 "org.apache.kafka.connect.converters.ByteArrayConverter" 
 , 
  
 "tasks.max" 
 : 
  
 " MAXIMUM_TASK_COUNT 
" 
 } 
 

Replace the following:

  • PROJECT_ID : The ID of the Google Cloud project that contains the Pub/Sub subscription.

  • BRIDGE_SUBSCRIPTION : The ID of the Pub/Sub subscription.

  • KAFKA_TOPIC : The name of the Kafka topic.

  • PARTITION_COUNT : The number of Kafka topic partitions. Use the Pub/Sub Lite partition count.

  • MAXIMUM_TASK_COUNT : The maximum number of parallel tasks in the connector. The value should be equal to or less than PARTITION_COUNT .

Verify replication

Before migrating any clients, perform the following steps to confirm that the replication pipeline is working:

  1. Run the gcloud managed-kafka connectors describe command.

     gcloud  
    managed-kafka  
    connectors  
    describe  
     CONNECTOR_NAME 
      
     \ 
      
    --connect_cluster = 
     CONNECT_CLUSTER 
      
     \ 
      
    --location = 
     REGION 
      
     \ 
      
    --project = 
     PROJECT 
      
     \ 
      
    --format = 
     "value(state)" 
     
    

    Replace the following:

    • CONNECTOR_NAME : The name of the Pub/Sub Source connector.

    • CONNECT_CLUSTER : The name of the Connect cluster.

    • REGION : The location of the Connect cluster.

    • PROJECT : The name of the Google Cloud project that contains the Connect cluster.

    If the connector is running, the output is RUNNING . Otherwise, it means there is a problem with the connector. For troubleshooting tips, see Troubleshoot a Pub/Sub connector .

  2. Publish a test message to the Pub/Sub Lite topic and read it back from the Kafka topic, by using a tool such as kafka-console-consumer or a basic client application. The Kafka message key should equal the Pub/Sub Lite ordering key.

Migrate consumers

Based on your tolerance for duplicates and interruptions in publishing, use one the following strategies to migrate your consumers:

Strategy Duplication risk Publishing interruption Required consumer capability
None Yes Standard
Duplicates might occur No Idempotent consumers

Clean cut-over

Use this strategy when you can briefly stop publishing.

  1. Stop the publisher.
  2. Let Pub/Sub Lite consumers drain. Process all remaining Pub/Sub Lite messages.
  3. Stop the Pub/Sub Lite consumers.
  4. Wait for replication to finish:
    • In the Pub/Sub bridge subscription, num_undelivered_messages reaches zero.
    • In the destination Kafka topic, offsets stop growing.
  5. Seek the Kafka consumer group to the end (latest) on every partition.

Timestamp-based offset translation

Use this strategy if you can't pause publishing. Consumers must be idempotent, because messages published between the earliest and latest committed Pub/Sub Lite timestamps are re-delivered.

  1. Stop the Pub/Sub Lite consumers. ( Don't stop the publisher yet.)

  2. For each partition, translate the Pub/Sub Lite committed offset (for example, T=980 ) to the corresponding Kafka offset, which is the first Kafka message at or after the Pub/Sub Lite publish time on the same partition. Use the Offset migration orchestrator to automate this step.

  3. Set the Kafka consumer group to those offsets.

After the orchestrator commits offsets and validation passes, switch your consumers to Managed Service for Apache Kafka, as described in the next section. Consumers resume from the migrated position.

Switch consumers to Managed Service for Apache Kafka

The latest Pub/Sub Lite client libraries accept a flag to set the backend to Managed Service for Apache Kafka. By setting this flag and supplying the Kafka connection details, the same consumer that you used for Pub/Sub Lite becomes a Managed Service for Apache Kafka consumer.

During migration, run two sets of consumers, one that reads from Pub/Sub Lite, and one that reads from Managed Service for Apache Kafka. Compare what each consumer receives to confirm they have parity. At that point, you can decommission the Pub/Sub Lite consumers.

The following code examples show how to configure a consumer for Managed Service for Apache Kafka:

Java

  import 
  
 com.google.cloud.pubsublite.* 
 ; 
 import 
  
 com.google.cloud.pubsublite.cloudpubsub.* 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.Map 
 ; 
 String 
  
 projectId 
  
 = 
  
 "PROJECT_ID" 
 ; 
 String 
  
 region 
  
 = 
  
 "us-central1" 
 ; 
 String 
  
 gmkCluster 
  
 = 
  
 "GMK_CLUSTER_ID" 
 ; 
 String 
  
 gmkTopic 
  
 = 
  
 "GMK_TOPIC" 
 ; 
 Map<String 
 , 
  
 Object 
>  
 kafkaProperties 
  
 = 
  
 GmkUtils 
 . 
 buildGmkKafkaProperties 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 gmkCluster 
 ); 
 // Optional: start from earliest if no group offset is committed yet. 
 kafkaProperties 
 . 
 put 
 ( 
 "auto.offset.reset" 
 , 
  
 "earliest" 
 ); 
 // In GMK mode the SubscriptionName is used as both the Kafka topic and the 
 // consumer group ID. Use a stable name; consumers sharing it share the load. 
  SubscriptionPath 
 
  
 subscriptionPath 
  
 = 
  
  SubscriptionPath 
 
 . 
 newBuilder 
 () 
  
 . 
 setProject 
 ( 
  ProjectId 
 
 . 
 of 
 ( 
 projectId 
 )) 
  
 . 
 setLocation 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 )) 
  
 . 
 setName 
 ( 
  SubscriptionName 
 
 . 
 of 
 ( 
 gmkTopic 
 )) 
  
 . 
 build 
 (); 
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 com 
 . 
 google 
 . 
 cloud 
 . 
 pubsub 
 . 
 v1 
 . 
 AckReplyConsumer 
  
 ack 
 ) 
  
 - 
>  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Received: " 
  
 + 
  
  message 
 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 ack 
 . 
 ack 
 (); 
 }; 
  FlowControlSettings 
 
  
 flowControl 
  
 = 
  
  FlowControlSettings 
 
 . 
 builder 
 () 
  
 . 
 setBytesOutstanding 
 ( 
 10L 
  
 * 
  
 1024 
  
 * 
  
 1024 
 ) 
  
 . 
 setMessagesOutstanding 
 ( 
 1000L 
 ) 
  
 . 
 build 
 (); 
  SubscriberSettings 
 
  
 settings 
  
 = 
  
  SubscriberSettings 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscriptionPath 
 ( 
 subscriptionPath 
 ) 
  
 . 
 setReceiver 
 ( 
 receiver 
 ) 
  
 . 
 setMessagingBackend 
 ( 
 MessagingBackend 
 . 
 MANAGED_KAFKA 
 ) 
  
 . 
 setKafkaProperties 
 ( 
 kafkaProperties 
 ) 
  
 . 
 setPerPartitionFlowControlSettings 
 ( 
 flowControl 
 ) 
  
 . 
 build 
 (); 
  Subscriber 
 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 create 
 ( 
 settings 
 ); 
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
 // ... receive messages ... 
 // subscriber.stopAsync().awaitTerminated(); 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
  
 "github.com/IBM/sarama" 
 ) 
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
 bootstrap 
  
 := 
  
 pscompat 
 . 
 BuildGMKBootstrapServer 
 ( 
 "PROJECT_ID" 
 , 
  
 "us-central1" 
 , 
  
 "GMK_CLUSTER_ID" 
 ) 
 saramaCfg 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewGMKSaramaConfig 
 ( 
 ctx 
 ) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 panic 
 ( 
 err 
 ) 
  
 } 
 // Optional: read from earliest when no group offset is committed yet. 
 saramaCfg 
 . 
 Consumer 
 . 
 Offsets 
 . 
 Initial 
  
 = 
  
 sarama 
 . 
 OffsetOldest 
 settings 
  
 := 
  
 pscompat 
 . 
 ReceiveSettings 
 { 
  
 Backend 
 : 
  
 pscompat 
 . 
 ManagedKafka 
 , 
  
 KafkaConfig 
 : 
  
& pscompat 
 . 
 KafkaSubscribeConfig 
 { 
  
 BootstrapServers 
 : 
  
 bootstrap 
 , 
  
 TopicName 
 : 
  
 "GMK_TOPIC" 
 , 
  
 SubscriptionName 
 : 
  
 "GMK_CONSUMER_GROUP" 
 , 
  
 // also the Kafka group ID 
  
 SaramaConfig 
 : 
  
 saramaCfg 
 , 
  
 }, 
 } 
 // The first arg is the PSL subscription path; it is unused when Backend is 
 // ManagedKafka, so any non-empty value is fine. 
 sub 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewSubscriberClientWithSettings 
 ( 
 ctx 
 , 
  
 "unused" 
 , 
  
 settings 
 ) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 panic 
 ( 
 err 
 ) 
  
 } 
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 _ 
  
 context 
 . 
 Context 
 , 
  
 m 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Printf 
 ( 
 "Received: %s (key=%s)\n" 
 , 
  
 string 
 ( 
 m 
 . 
 Data 
 ), 
  
 m 
 . 
 OrderingKey 
 ) 
  
 m 
 . 
 Ack 
 () 
 }) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 panic 
 ( 
 err 
 ) 
  
 } 
 

Migrate publishers

To migrate your publishers, set the backend to Managed Service for Apache Kafka, similar to the updates described in the previous section for consumers.

When you deploy the new publisher code, start with a non-production environment. Optionally, if your downstream consumers tolerate duplicate messages, run the Kafka publishers in parallel with the Pub/Sub Lite publishers and verify parity, before you decommission the Pub/Sub Lite publishers.

The following code examples show how to configure a publisher for Managed Service for Apache Kafka:

Java

  import 
  
 com.google.cloud.pubsublite.* 
 ; 
 import 
  
 com.google.cloud.pubsublite.cloudpubsub.* 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.Map 
 ; 
 String 
  
 projectId 
  
 = 
  
 "PROJECT_ID" 
 ; 
 String 
  
 region 
  
 = 
  
 "us-central1" 
 ; 
 String 
  
 gmkCluster 
  
 = 
  
 "GMK_CLUSTER_ID" 
 ; 
 String 
  
 gmkTopic 
  
 = 
  
 "GMK_TOPIC" 
 ; 
 Map<String 
 , 
  
 Object 
>  
 kafkaProperties 
  
 = 
  
 GmkUtils 
 . 
 buildGmkKafkaProperties 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 gmkCluster 
 ); 
  TopicPath 
 
  
 topicPath 
  
 = 
  
  TopicPath 
 
 . 
 newBuilder 
 () 
  
 . 
 setProject 
 ( 
  ProjectId 
 
 . 
 of 
 ( 
 projectId 
 )) 
  
 . 
 setLocation 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 )) 
  
 . 
 setName 
 ( 
  TopicName 
 
 . 
 of 
 ( 
 gmkTopic 
 )) 
  
 . 
 build 
 (); 
  PublisherSettings 
 
  
 settings 
  
 = 
  
  PublisherSettings 
 
 . 
 newBuilder 
 () 
  
 . 
 setTopicPath 
 ( 
 topicPath 
 ) 
  
 . 
 setMessagingBackend 
 ( 
 MessagingBackend 
 . 
 MANAGED_KAFKA 
 ) 
  
 . 
 setKafkaProperties 
 ( 
 kafkaProperties 
 ) 
  
 . 
 build 
 (); 
  Publisher 
 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 create 
 ( 
 settings 
 ); 
 publisher 
 . 
 startAsync 
 (). 
 awaitRunning 
 (); 
  PubsubMessage 
 
  
 message 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 () 
  
 . 
 setData 
 ( 
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 "hello from java-pubsublite/GMK" 
 )) 
  
 . 
  setOrderingKey 
 
 ( 
 "user-123" 
 ) 
  
 // becomes the Kafka message key 
  
 . 
 putAttributes 
 ( 
 "source" 
 , 
  
 "java-client" 
 ) 
  
 . 
 build 
 (); 
 String 
  
 messageId 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 message 
 ). 
 get 
 (); 
 System 
 . 
 out 
 . 
 println 
 ( 
 "Published: " 
  
 + 
  
 messageId 
 ); 
 publisher 
 . 
  stopAsync 
 
 (). 
 awaitTerminated 
 (); 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "cloud.google.com/go/pubsub" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
 ) 
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
 bootstrap 
  
 := 
  
 pscompat 
 . 
 BuildGMKBootstrapServer 
 ( 
 "PROJECT_ID" 
 , 
  
 "us-central1" 
 , 
  
 "GMK_CLUSTER_ID" 
 ) 
 settings 
  
 := 
  
 pscompat 
 . 
 PublishSettings 
 { 
  
 Backend 
 : 
  
 pscompat 
 . 
 ManagedKafka 
 , 
  
 KafkaConfig 
 : 
  
& pscompat 
 . 
 KafkaPublishConfig 
 { 
  
 BootstrapServers 
 : 
  
 bootstrap 
 , 
  
 TopicName 
 : 
  
 "GMK_TOPIC" 
 , 
  
 }, 
 } 
 // First arg is the PSL topic path; ignored in ManagedKafka mode. 
 pub 
 , 
  
 err 
  
 := 
  
 pscompat 
 . 
 NewPublisherClientWithSettings 
 ( 
 ctx 
 , 
  
 "unused" 
 , 
  
 settings 
 ) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 panic 
 ( 
 err 
 ) 
  
 } 
 defer 
  
 pub 
 . 
 Stop 
 () 
 result 
  
 := 
  
 pub 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 "hello from pscompat/GMK" 
 ), 
  
 OrderingKey 
 : 
  
 "user-123" 
 , 
  
 // becomes the Kafka message key 
  
 Attributes 
 : 
  
 map 
 [ 
 string 
 ] 
 string 
 { 
 "source" 
 : 
  
 "go-client" 
 }, 
 }) 
 id 
 , 
  
 err 
  
 := 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 panic 
 ( 
 err 
 ) 
  
 } 
 fmt 
 . 
 Println 
 ( 
 "Published:" 
 , 
  
 id 
 ) 
 

Decommission Pub/Sub Lite resources

When every publisher and consumer is using Managed Service for Apache Kafka and you observe steady-state operation, decommission your Pub/Sub Lite resources as follows:

  1. Stop the Pub/Sub Source connector and delete it.
  2. Delete the Pub/Sub bridge topic and subscription.
  3. Delete the Pub/Sub Lite export subscription.
  4. Delete the Pub/Sub Lite topic and any remaining Pub/Sub Lite subscriptions.

Authentication

Both client libraries authenticate to Managed Service for Apache Kafka with SASL_SSL / OAUTHBEARER using Application Default Credentials :

  • Java: GmkUtils.buildGmkKafkaProperties returns a property map preconfigured with:

    • sasl.mechanism=OAUTHBEARER
    • sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    • The correct sasl.jaas.config

    Make sure the Kafka authorization callback handler from managed-kafka-auth-login-handler is on your classpath.

  • Go: pscompat.NewGMKSaramaConfig(ctx) returns a *sarama.Config with TLS enabled and a built-in sarama.AccessTokenProvider backed by golang.org/x/oauth2/google.FindDefaultCredentials . You can pass this config to KafkaConfig.SaramaConfig , or leave it nil and let the library build one.

The underlying principal must have the roles/managedkafka.client role and any topic-level ACLs on the Managed Service for Apache Kafka cluster.

Operational checklist

Use the following checklist to track the migration steps.

Offset migration orchestrator

Use the following code to implement the timestamp-based offset translation strategy for migrating consumers.

Java

  import 
  
 com.google.cloud.pubsublite.* 
 ; 
 import 
  
 com.google.cloud.pubsublite.cloudpubsub.GmkUtils 
 ; 
 import 
  
 com.google.cloud.pubsublite.internal.* 
 ; 
 import 
  
 com.google.cloud.pubsublite.internal.OffsetMigrationHelper.MigrationResult 
 ; 
 import 
  
 java.util.* 
 ; 
 String 
  
 projectId 
  
 = 
  
 "PROJECT_ID" 
 ; 
 String 
  
 region 
  
 = 
  
 "us-central1" 
 ; 
 String 
  
 pslTopic 
  
 = 
  
 "PSL_TOPIC" 
 ; 
 String 
  
 pslSub 
  
 = 
  
 "PSL_SUBSCRIPTION" 
 ; 
 String 
  
 gmkCluster 
  
 = 
  
 "GMK_CLUSTER_ID" 
 ; 
 String 
  
 gmkTopic 
  
 = 
  
 "GMK_TOPIC" 
 ; 
 int 
  
 partitionCount 
  
 = 
  
 3 
 ; 
  CloudZone 
 
  
 pslZone 
  
 = 
  
  CloudZone 
 
 . 
 of 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 ), 
  
 'a' 
 ); 
  TopicPath 
 
  
 pslTopicPath 
  
 = 
  
  TopicPath 
 
 . 
 newBuilder 
 () 
  
 . 
 setProject 
 ( 
  ProjectId 
 
 . 
 of 
 ( 
 projectId 
 )). 
 setLocation 
 ( 
 pslZone 
 ) 
  
 . 
 setName 
 ( 
  TopicName 
 
 . 
 of 
 ( 
 pslTopic 
 )). 
 build 
 (); 
  SubscriptionPath 
 
  
 pslSubPath 
  
 = 
  
  SubscriptionPath 
 
 . 
 newBuilder 
 () 
  
 . 
 setProject 
 ( 
  ProjectId 
 
 . 
 of 
 ( 
 projectId 
 )). 
 setLocation 
 ( 
 pslZone 
 ) 
  
 . 
 setName 
 ( 
  SubscriptionName 
 
 . 
 of 
 ( 
 pslSub 
 )). 
 build 
 (); 
  SubscriptionPath 
 
  
 gmkSubPath 
  
 = 
  
  SubscriptionPath 
 
 . 
 newBuilder 
 () 
  
 . 
 setProject 
 ( 
  ProjectId 
 
 . 
 of 
 ( 
 projectId 
 )). 
 setLocation 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 )) 
  
 . 
 setName 
 ( 
  SubscriptionName 
 
 . 
 of 
 ( 
 gmkTopic 
 )). 
 build 
 (); 
 Map<String 
 , 
  
 Object 
>  
 gmkProperties 
  
 = 
  
 GmkUtils 
 . 
 buildGmkKafkaProperties 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 gmkCluster 
 ); 
  CursorClient 
 
  
 pslCursor 
  
 = 
  
  CursorClient 
 
 . 
 create 
 ( 
  
  CursorClientSettings 
 
 . 
 newBuilder 
 (). 
 setRegion 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 )). 
 build 
 ()); 
  TopicStatsClient 
 
  
 pslStats 
  
 = 
  
  TopicStatsClient 
 
 . 
 create 
 ( 
  
  TopicStatsClientSettings 
 
 . 
 newBuilder 
 (). 
 setRegion 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 )). 
 build 
 ()); 
 KafkaTopicStatsClient 
  
 kafkaStats 
  
 = 
  
 new 
  
 KafkaTopicStatsClient 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 ), 
  
 gmkProperties 
 ); 
 KafkaCursorClient 
  
 kafkaCursor 
  
 = 
  
 new 
  
 KafkaCursorClient 
 ( 
  CloudRegion 
 
 . 
 of 
 ( 
 region 
 ), 
  
 gmkProperties 
 ); 
 MigrationOrchestrator 
  
 orchestrator 
  
 = 
  
 MigrationOrchestrator 
 . 
 newBuilder 
 () 
  
 . 
 setPslCursorClient 
 ( 
 pslCursor 
 ) 
  
 . 
 setPslTopicStatsClient 
 ( 
 pslStats 
 ) 
  
 . 
 setKafkaTopicStatsClient 
 ( 
 kafkaStats 
 ) 
  
 . 
 setKafkaCursorClient 
 ( 
 kafkaCursor 
 ) 
  
 . 
 setPslTopicPath 
 ( 
 pslTopicPath 
 ) 
  
 . 
 setPslSubscriptionPath 
 ( 
 pslSubPath 
 ) 
  
 . 
 setKafkaSubscriptionPath 
 ( 
 gmkSubPath 
 ) 
  
 . 
 setKafkaTopicName 
 ( 
 gmkTopic 
 ) 
  
 . 
  setPartitionCount 
 
 ( 
 partitionCount 
 ) 
  
 . 
 setDryRun 
 ( 
 false 
 ) 
  
 // set true to preview without committing 
  
 . 
 setValidate 
 ( 
 true 
 ) 
  
 // re-read committed offsets after reset 
  
 . 
 build 
 (); 
 MigrationOrchestrator 
 . 
 Result 
  
 result 
  
 = 
  
 orchestrator 
 . 
 execute 
 (); 
 System 
 . 
 out 
 . 
 println 
 ( 
 result 
 . 
 getSummary 
 ()); 
 for 
  
 ( 
 Map 
 . 
 Entry<Partition 
 , 
  
 MigrationResult 
>  
 e 
  
 : 
  
 result 
 . 
 getPartitionResults 
 (). 
 entrySet 
 ()) 
  
 { 
  
 MigrationResult 
  
 r 
  
 = 
  
 e 
 . 
 getValue 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "partition=%d pslOffset=%d kafkaOffset=%d status=%s validated=%b%n" 
 , 
  
 e 
 . 
 getKey 
 (). 
 value 
 (), 
  
 r 
 . 
 getPslOffset 
 (), 
  
 r 
 . 
 getKafkaOffset 
 (), 
  
 r 
 . 
 getStatus 
 (), 
  
 r 
 . 
 isValidated 
 ()); 
 } 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "log" 
  
 vkit 
  
 "cloud.google.com/go/pubsublite/apiv1" 
  
 "cloud.google.com/go/pubsublite/pscompat" 
  
 "google.golang.org/api/option" 
 ) 
 const 
  
 ( 
  
 projectID 
  
 = 
  
 "PROJECT_ID" 
  
 region 
  
 = 
  
 "us-central1" 
  
 pslTopic 
  
 = 
  
 "PSL_TOPIC" 
  
 pslSub 
  
 = 
  
 "PSL_SUBSCRIPTION" 
  
 gmkCluster 
  
 = 
  
 "GMK_CLUSTER_ID" 
  
 gmkTopic 
  
 = 
  
 "GMK_TOPIC" 
  
 gmkGroupID 
  
 = 
  
 "GMK_CONSUMER_GROUP" 
  
 // your application's Kafka consumer group 
  
 partitions 
  
 = 
  
 3 
 ) 
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
 pslZone 
  
 := 
  
 region 
  
 + 
  
 "-a" 
 pslTopicPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 pslZone 
 , 
  
 pslTopic 
 ) 
 pslSubPath 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s/subscriptions/%s" 
 , 
  
 projectID 
 , 
  
 pslZone 
 , 
  
 pslSub 
 ) 
 bootstrap 
  
 := 
  
 pscompat 
 . 
 BuildGMKBootstrapServer 
 ( 
 projectID 
 , 
  
 region 
 , 
  
 gmkCluster 
 ) 
 pslEndpoint 
  
 := 
  
 fmt 
 . 
 Sprintf 
 ( 
 "%s-pubsublite.googleapis.com:443" 
 , 
  
 region 
 ) 
 pslCursor 
 , 
  
 _ 
  
 := 
  
 vkit 
 . 
  NewCursorClient 
 
 ( 
 ctx 
 , 
  
 option 
 . 
 WithEndpoint 
 ( 
 pslEndpoint 
 )) 
 pslStats 
 , 
  
 _ 
  
 := 
  
 vkit 
 . 
  NewTopicStatsClient 
 
 ( 
 ctx 
 , 
  
 option 
 . 
 WithEndpoint 
 ( 
 pslEndpoint 
 )) 
 kafkaStats 
 , 
  
 _ 
  
 := 
  
 pscompat 
 . 
 NewKafkaTopicStatsClient 
 ( 
 ctx 
 , 
  
& pscompat 
 . 
 KafkaTopicStatsClientConfig 
 { 
 BootstrapServers 
 : 
  
 bootstrap 
 }) 
 kafkaCursor 
 , 
 _ 
  
 := 
  
 pscompat 
 . 
 NewKafkaCursorClient 
 ( 
 ctx 
 , 
  
& pscompat 
 . 
 KafkaCursorClientConfig 
 { 
 BootstrapServers 
 : 
  
 bootstrap 
 }) 
 defer 
  
 pslCursor 
 . 
 Close 
 (); 
  
 defer 
  
 pslStats 
 . 
 Close 
 () 
 defer 
  
 kafkaStats 
 . 
 Close 
 (); 
  
 defer 
  
 kafkaCursor 
 . 
 Close 
 () 
 parts 
  
 := 
  
 make 
 ([] 
 int32 
 , 
  
 partitions 
 ) 
 for 
  
 i 
  
 := 
  
 range 
  
 parts 
  
 { 
  
 parts 
 [ 
 i 
 ] 
  
 = 
  
 int32 
 ( 
 i 
 ) 
 } 
 cfg 
  
 := 
  
& pscompat 
 . 
 MigrationConfig 
 { 
  
 PSLCursorClient 
 : 
  
 pslCursor 
 , 
  
 PSLTopicStatsClient 
 : 
  
 pslStats 
 , 
  
 PSLTopicPath 
 : 
  
 pslTopicPath 
 , 
  
 PSLSubscriptionPath 
 : 
  
 pslSubPath 
 , 
  
 KafkaTopicStats 
 : 
  
 kafkaStats 
 , 
  
 KafkaCursor 
 : 
  
 kafkaCursor 
 , 
  
 KafkaTopicName 
 : 
  
 gmkTopic 
 , 
  
 KafkaGroupID 
 : 
  
 gmkGroupID 
 , 
  
 Partitions 
 : 
  
 parts 
 , 
  
 DryRun 
 : 
  
 false 
 , 
  
 // set true to preview 
  
 Validate 
 : 
  
 true 
 , 
  
 // re-read committed offsets after reset 
 } 
 orch 
  
 := 
  
 pscompat 
 . 
 NewMigrationOrchestrator 
 ( 
 cfg 
 ) 
 summary 
 , 
  
 err 
  
 := 
  
 orch 
 . 
 Execute 
 ( 
 ctx 
 ) 
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
 fmt 
 . 
 Print 
 ( 
 summary 
 . 
 Summary 
 ) 
 for 
  
 _ 
 , 
  
 p 
  
 := 
  
 range 
  
 parts 
  
 { 
  
 r 
  
 := 
  
 summary 
 . 
 PartitionResults 
 [ 
 p 
 ] 
  
 fmt 
 . 
 Printf 
 ( 
 "partition=%d pslOffset=%d kafkaOffset=%d status=%s validated=%t\n" 
 , 
  
 p 
 , 
  
 r 
 . 
 PSLOffset 
 , 
  
 r 
 . 
 KafkaOffset 
 , 
  
 r 
 . 
 Status 
 , 
  
 r 
 . 
 Validated 
 ) 
 } 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: