Create a Amazon Managed Streaming for Apache Kafka import topic

An Amazon Managed Streaming for Apache Kafka (Amazon MSK) import topic lets you continuously ingest data from Amazon MSK as an external source and into Pub/Sub. Then you can stream the data into any of the destinations that Pub/Sub supports.

This document shows you how to create and manage Amazon MSK import topics. To create a standard topic, see Create a standard topic .

For more information about import topics, see About import topics .

Before you begin

Required roles and permissions

To get the permissions that you need to create and manage Amazon MSK import topics, ask your administrator to grant you the Pub/Sub Editor ( roles/pubsub.editor ) IAM role on your topic or project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to create and manage Amazon MSK import topics. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to create and manage Amazon MSK import topics:

  • Create an import topic: pubsub.topics.create
  • Delete an import topic: pubsub.topics.delete
  • Get an import topic: pubsub.topics.get
  • List an import topic: pubsub.topics.list
  • Publish to an import topic: pubsub.topics.publish and pubsub.serviceAgent
  • Update an import topic: pubsub.topics.update
  • Get the IAM policy for an import topic: pubsub.topics.getIamPolicy
  • Configure the IAM policy for an import topic: pubsub.topics.setIamPolicy

You might also be able to get these permissions with custom roles or other predefined roles .

You can configure access control at the project level and the individual resource level.

Set up federated identity to access Amazon MSK

Workload Identity Federation lets Google Cloud services access workloads running outside of Google Cloud. With identity federation, you don't need to maintain or pass credentials to Google Cloud to access your resources in other clouds. Instead, you can use the identities of the workloads themselves to authenticate to Google Cloud and access resources.

This is an optional step. If you already have a service account, you can use it in this procedure instead of creating a new service account. If you are using an existing service account, go to Record the service account unique ID for the next step.

For Amazon MSK import topics, Pub/Sub uses the service account as the identity to access resources from AWS.

For more information about creating a service account, including prerequisites, required roles and permissions, and naming guidelines, see Create service accounts . After you create a service account, you might need to wait for 60 seconds or more before you use the service account. This behavior occurs because read operations are eventually consistent; it can take time for the new service account to become visible.

You need a service account unique ID to set up a role in the AWS console.

  1. In the Google Cloud console, go to the Service accountdetails page.

    Go to service account

  2. Click the service account that you just created or the one that you are planning to use.

  3. From the Service account detailspage, record the Unique ID number.

    You need the ID as part of the workflow to set up a role in the AWS console .

The Service account token creator role( roles/iam.serviceAccountTokenCreator ) lets principals create short-lived credentials for a service account. These tokens or credentials are used to impersonate the service account.

For more information about service account impersonation, see Service account impersonation .

You can also add the Pub/Sub publisher role( roles/pubsub.publisher ) during this procedure. For more information about the role and why you are adding it, see Add the Pub/Sub publisher role to the Pub/Sub service account .

  1. In the Google Cloud console, go to the IAMpage.

    Go to IAM

  2. Click the Include Google-provided role grantscheckbox.

  3. Look for the service account that has the format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com .

  4. For this service account, click the Edit Principalbutton.

  5. If required, click Add another role.

  6. Search and click the Service account token creator role( roles/iam.serviceAccountTokenCreator ).

  7. Click Save.

Create a policy in AWS

You need a policy in AWS to let Pub/Sub authenticate to AWS so that Pub/Sub can ingest data from Amazon MSK.

To create a policy in AWS, perform the following steps:

  1. Sign in to the AWS Management Console and open the IAM console .

  2. In the navigation pane of the console for IAM, click Access Management> Policies.

  3. Click Create policy.

  4. For Click a service, click MSK.

  5. For Action allowed,click Read> GetBootstrapBrokers .

    This action grants permission to get the bootstrap brokers that Pub/Sub uses to connect to the MSK cluster.

  6. Click Add more permissions.

  7. For Select a service, click Apache Kafka APIs for MSK.

  8. For Action allowed, select the following:

    • List> DescribeTopic

      This action grants permission to allow the Pub/Sub ingestion topic to get details about the Amazon MSK Kafka topic.

    • Read> ReadData

      This action grants permission to read data from the Amazon MSK Kafka topic.

    • Write> Connect

      This action grants permission to connect and authenticate to the Amazon MSK Kafka cluster.

  9. For Resources, specify the cluster ARN (if you want to restrict the policy to specific clusters, which is recommended).

  10. Click Add more permissions.

  11. For Select a service, click STS.

  12. For Action allowed, click Write> AssumeRoleWithWebIdentity .

    This action grants permission to obtain a set of temporary security credentials for Pub/Sub to authenticate to Amazon MSK by using identity federation.

  13. Click Next.

  14. Enter a policy name and description.

  15. Click Create policy.

Create a role in AWS using a custom trust policy

You must create a role in AWS so that Pub/Sub can authenticate to AWS to ingest data from Amazon MSK.

  1. Sign in to the AWS Management Console and open the IAM console .

  2. In the navigation pane of the console for IAM, click Roles.

  3. Click Create role.

  4. For Select trusted entity, click Custom trust policy.

  5. In the Custom trust policysection, enter or paste the following:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Principal" 
     : 
      
     { 
      
     "Federated" 
     : 
      
     "accounts.google.com" 
      
     }, 
      
     "Action" 
     : 
      
     "sts:AssumeRoleWithWebIdentity" 
     , 
      
     "Condition" 
     : 
      
     { 
      
     "StringEquals" 
     : 
      
     { 
      
     "accounts.google.com:sub" 
     : 
      
     "<SERVICE_ACCOUNT_UNIQUE_ID>" 
      
     } 
      
     } 
      
     } 
      
     ] 
     } 
     
    

    Replace <SERVICE_ACCOUNT_UNIQUE_ID> with the unique ID of the service account that you recorded in Record the service account unique ID .

  6. Click Next.

  7. For Add permissions, search and click the custom policy that you just created.

  8. Click Next.

  9. Enter a role name and description.

  10. Click Create role.

Add the Pub/Sub publisher role to the Pub/Sub principal

To enable publishing, you must assign a publisher role to the Pub/Sub service account so that Pub/Sub is able to publish to the Amazon MSK import topic.

Add the Pub/Sub service agent role to the Pub/Sub service account

To allow Pub/Sub to use your import topic project's publish quota, the Pub/Sub service agent requires the serviceusage.services.use permission on your import topic's project.

To provide this permission, we recommend you add the Pub/Sub service agent role to the Pub/Sub service account.

If the Pub/Sub service account does not have the Pub/Sub service agent role, it can be granted as follows:

  1. In the Google Cloud console, go to the IAMpage.

    Go to IAM

  2. Click the Include Google-provided role grantscheckbox.

  3. Look for the service account that has the format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com .

  4. For this service account, click the Edit principalbutton.

  5. If required, click Add another role.

  6. Search and click the Pub/Sub Service Agent role( roles/pubsub.serviceAgent ).

  7. Click Save.

Enable publishing from all topics

Use this method if you have not created any Amazon MSK import topics.

  1. In the Google Cloud console, go to the IAMpage.

    Go to IAM

  2. Click the Include Google-provided role grantscheckbox.

  3. Look for the service account that has the format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com .

  4. For this service account, click the Edit principalbutton.

  5. If required, click Add another role.

  6. Search and click the Pub/Sub publisher role( roles/pubsub.publisher ).

  7. Click Save.

Enable publishing from a single topic

Use this method only if the Amazon MSK import topic already exists.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud pubsub topics add-iam-policy-binding command:

    gcloud  
    pubsub  
    topics  
    add-iam-policy-binding  
     TOPIC_ID 
      
     \ 
      
    --member = 
     "serviceAccount:service- PROJECT_NUMBER 
    @gcp-sa-pubsub.iam.gserviceaccount.com" 
      
     \ 
      
    --role = 
     "roles/pubsub.publisher" 
    

    Replace the following:

    • TOPIC_ID : the topic ID of the Amazon MSK import topic.

    • PROJECT_NUMBER : the project number. To view the project number, see Identifying projects .

The Service Account User role( roles/iam.serviceAccountUser ) includes the permission iam.serviceAccounts.actAs that lets a principal attach a service account to the Amazon MSK import topic's ingestion settings and use that service account for federated identity.

  1. In the Google Cloud console, go to the IAMpage.

    Go to IAM

  2. For the principal that's issuing the create or update topic calls, click the Edit principalbutton.

  3. If required, click Add another role.

  4. Search and click the Service account user role( roles/iam.serviceAccountUser ).

  5. Click Save.

Use Amazon MSK import topics

You can create a new import topic or edit an existing topic.

Considerations

  • Creating the topic and subscription separately, even if done in rapid succession, can lead to data loss. There's a short window where the topic exists without a subscription. If any data is sent to the topic during this time, it is lost. By creating the topic first, creating the subscription, and then converting the topic to an import topic, you guarantee that no messages are missed during the import process.

  • If you need to re-create the Kafka topic of an existing import topic with the same name, you can't just delete the Kafka topic and re-create it. This action can invalidate Pub/Sub's offset management, which can lead to data loss. To mitigate this, follow these steps:

    • Delete the Pub/Sub import topic.
    • Delete the Kafka topic.
    • Create the Kafka topic.
    • Create the Pub/Sub import topic.
  • Data from an Amazon MSK Kafka topic is always read from the earliest offset .

Create Amazon MSK import topics

To know more about properties associated with a topic, see Properties of a topic .

Ensure that you have completed the following procedures:

To create an Amazon MSK import topic, follow these steps:

Console

  1. In the Google Cloud console, go to the Topicspage.

    Go to Topics

  2. Click Create topic.

  3. In the Topic IDfield, enter an ID for your Amazon MSK import topic. For more information about naming topics, see the naming guidelines .

  4. Select Add a default subscription.

  5. Select Enable ingestion.

  6. For ingestion source, select Amazon MSK.

  7. Enter the following details:

    • Cluster ARN: The ARN of the Amazon MSK that you are ingesting into Pub/Sub. The ARN format is as follows: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId} .
    • Topic: The name of the Amazon MSK Kafka topic that you are ingesting into Pub/Sub.
    • AWS Role ARN: The ARN of the AWS role. The ARN format of the role is as follows: arn:aws:iam::${Account}:role/${RoleName} .
    • Service account: The service account that you created in Create a service account in Google Cloud .
  8. Click Create topic.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud pubsub topics create command:

     gcloud  
    pubsub  
    topics  
    create  
     TOPIC_ID 
      
     \ 
      
    --aws-msk-ingestion-cluster-arn  
     MSK_CLUSTER_ARN 
      
     \ 
      
    --aws-msk-ingestion-topic  
     MSK_TOPIC 
      
     \ 
      
    --aws-msk-ingestion-aws-role-arn  
     MSK_ROLE_ARN 
      
     \ 
      
    --aws-msk-ingestion-service-account  
     PUBSUB_SERVICE_ACCOUNT 
     
    

    Replace the following:

    • TOPIC_ID : the name or ID of your Pub/Sub topic.
    • MSK_CLUSTER_ARN : the ARN for the Amazon MSK cluster that you are ingesting into Pub/Sub. The ARN format is as follows: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId} .
    • MSK_TOPIC : the name of the Amazon MSK Kafka topic that you are ingesting into Pub/Sub.
    • MSK_ROLE_ARN : the ARN of the AWS role. The ARN format of the role is as follows: arn:aws:iam::${Account}:role/${RoleName} .
    • PUBSUB_SERVICE_ACCOUNT : the service account that you created in Create a service account in Google Cloud .

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 namespace 
  
 pubsub_admin 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub_admin 
 ; 
 []( 
 pubsub_admin 
 :: 
 TopicAdminClient 
  
 client 
 , 
  
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 cluster_arn 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 msk_topic 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 aws_role_arn 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 gcp_service_account 
 ) 
  
 { 
  
 google 
 :: 
 pubsub 
 :: 
 v1 
 :: 
 Topic 
  
 request 
 ; 
  
 request 
 . 
 set_name 
 ( 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )). 
 FullName 
 ()); 
  
 auto 
 * 
  
 aws_msk 
  
 = 
  
 request 
 . 
 mutable_ingestion_data_source_settings 
 () 
 - 
> mutable_aws_msk 
 (); 
  
 aws_msk 
 - 
> set_cluster_arn 
 ( 
 cluster_arn 
 ); 
  
 aws_msk 
 - 
> set_topic 
 ( 
 msk_topic 
 ); 
  
 aws_msk 
 - 
> set_aws_role_arn 
 ( 
 aws_role_arn 
 ); 
  
 aws_msk 
 - 
> set_gcp_service_account 
 ( 
 gcp_service_account 
 ); 
  
 auto 
  
 topic 
  
 = 
  
 client 
 . 
 CreateTopic 
 ( 
 request 
 ); 
  
 // Note that kAlreadyExists is a possible error when the library retries. 
  
 if 
  
 ( 
 topic 
 . 
 status 
 (). 
 code 
 () 
  
 == 
  
 google 
 :: 
 cloud 
 :: 
 StatusCode 
 :: 
 kAlreadyExists 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "The topic already exists 
 \n 
 " 
 ; 
  
 return 
 ; 
  
 } 
  
 if 
  
 ( 
 ! 
 topic 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 topic 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "The topic was successfully created: " 
 << 
 topic 
 - 
> DebugString 
 () 
 << 
 " 
 \n 
 " 
 ; 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub" 
 ) 
 func 
  
 createTopicWithAWSMSKIngestion 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
 , 
  
 clusterARN 
 , 
  
 mskTopic 
 , 
  
 awsRoleARN 
 , 
  
 gcpSA 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // // AWS MSK ingestion settings. 
  
 // clusterARN := "cluster-arn" 
  
 // mskTopic := "msk-topic" 
  
 // awsRoleARN := "aws-role-arn" 
  
 // gcpSA := "gcp-service-account" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 cfg 
  
 := 
  
& pubsub 
 . 
 TopicConfig 
 { 
  
 IngestionDataSourceSettings 
 : 
  
& pubsub 
 . 
 IngestionDataSourceSettings 
 { 
  
 Source 
 : 
  
& pubsub 
 . 
  IngestionDataSourceAmazonMSK 
 
 { 
  
 ClusterARN 
 : 
  
 clusterARN 
 , 
  
 Topic 
 : 
  
 mskTopic 
 , 
  
 AWSRoleARN 
 : 
  
 awsRoleARN 
 , 
  
 GCPServiceAccount 
 : 
  
 gcpSA 
 , 
  
 }, 
  
 }, 
  
 } 
  
 t 
 , 
  
 err 
  
 := 
  
 client 
 . 
  CreateTopicWithConfig 
 
 ( 
 ctx 
 , 
  
 topicID 
 , 
  
 cfg 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "CreateTopic: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Created topic with AWS MSK ingestion settings: %v\n" 
 , 
  
 t 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.cloud.pubsub.v1. TopicAdminClient 
 
 ; 
 import 
  
 com.google.pubsub.v1. IngestionDataSourceSettings 
 
 ; 
 import 
  
 com.google.pubsub.v1. Topic 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 public 
  
 class 
 CreateTopicWithAwsMskIngestionExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 // AWS MSK ingestion settings. 
  
 String 
  
 clusterArn 
  
 = 
  
 "cluster-arn" 
 ; 
  
 String 
  
 mskTopic 
  
 = 
  
 "msk-topic" 
 ; 
  
 String 
  
 awsRoleArn 
  
 = 
  
 "aws-role-arn" 
 ; 
  
 String 
  
 gcpServiceAccount 
  
 = 
  
 "gcp-service-account" 
 ; 
  
 createTopicWithAwsMskIngestionExample 
 ( 
  
 projectId 
 , 
  
 topicId 
 , 
  
 clusterArn 
 , 
  
 mskTopic 
 , 
  
 awsRoleArn 
 , 
  
 gcpServiceAccount 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createTopicWithAwsMskIngestionExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 , 
  
 String 
  
 clusterArn 
 , 
  
 String 
  
 mskTopic 
 , 
  
 String 
  
 awsRoleArn 
 , 
  
 String 
  
 gcpServiceAccount 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  TopicAdminClient 
 
  
 topicAdminClient 
  
 = 
  
  TopicAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  IngestionDataSourceSettings 
 
 . 
  AwsMsk 
 
  
 awsMsk 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 AwsMsk 
 . 
 newBuilder 
 () 
  
 . 
 setClusterArn 
 ( 
 clusterArn 
 ) 
  
 . 
 setTopic 
 ( 
 mskTopic 
 ) 
  
 . 
 setAwsRoleArn 
 ( 
 awsRoleArn 
 ) 
  
 . 
 setGcpServiceAccount 
 ( 
 gcpServiceAccount 
 ) 
  
 . 
 build 
 (); 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 newBuilder 
 (). 
  setAwsMsk 
 
 ( 
 awsMsk 
 ). 
 build 
 (); 
  
  Topic 
 
  
 topic 
  
 = 
  
 topicAdminClient 
 . 
 createTopic 
 ( 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
 topicName 
 . 
  toString 
 
 ()) 
  
 . 
  setIngestionDataSourceSettings 
 
 ( 
 ingestionDataSourceSettings 
 ) 
  
 . 
 build 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Created topic with AWS MSK ingestion settings: " 
  
 + 
  
 topic 
 . 
 getAllFields 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
  
 * 
  
 TODO 
 ( 
 developer 
 ): 
  
 Uncomment 
  
 these 
  
 variables 
  
 before 
  
 running 
  
 the 
  
 sample 
 . 
  
 */ 
 // 
  
 const 
  
 topicNameOrId 
  
 = 
  
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
  
 const 
  
 clusterArn 
  
 = 
  
 'arn:aws:kafka:...' 
 ; 
 // 
  
 const 
  
 mskTopic 
  
 = 
  
 'YOUR_MSK_TOPIC' 
 ; 
 // 
  
 const 
  
 roleArn 
  
 = 
  
 'arn:aws:iam:...' 
 ; 
 // 
  
 const 
  
 gcpServiceAccount 
  
 = 
  
 'ingestion-account@...' 
 ; 
 // 
  
 Imports 
  
 the 
  
 Google 
  
 Cloud 
  
 client 
  
 library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 '@google-cloud/pubsub' 
 ); 
 // 
  
 Creates 
  
 a 
  
 client 
 ; 
  
 cache 
  
 this 
  
 for 
  
 further 
  
 use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 createTopicWithAwsMskIngestion 
 ( 
  
 topicNameOrId 
 , 
  
 clusterArn 
 , 
  
 mskTopic 
 , 
  
 awsRoleArn 
 , 
  
 gcpServiceAccount 
 , 
 ) 
  
 { 
  
 // 
  
 Creates 
  
 a 
  
 new 
  
 topic 
  
 with 
  
 AWS 
  
 MSK 
  
 ingestion 
 . 
  
 await 
  
 pubSubClient 
 . 
 createTopic 
 ({ 
  
 name 
 : 
  
 topicNameOrId 
 , 
  
 ingestionDataSourceSettings 
 : 
  
 { 
  
 awsMsk 
 : 
  
 { 
  
 clusterArn 
 , 
  
 topic 
 : 
  
 mskTopic 
 , 
  
 awsRoleArn 
 , 
  
 gcpServiceAccount 
 , 
  
 }, 
  
 }, 
  
 }); 
  
 console 
 . 
 log 
 ( 
 ` 
 Topic 
  
 $ 
 { 
 topicNameOrId 
 } 
  
 created 
  
 with 
  
 AWS 
  
 MSK 
  
 ingestion 
 . 
 ` 
 ); 
 } 
 

Node.ts

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 these 
 variables 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 topicNameOrId 
 = 
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
 const 
 clusterArn 
 = 
 'arn:aws:kafka:...' 
 ; 
 // 
 const 
 mskTopic 
 = 
 'YOUR_MSK_TOPIC' 
 ; 
 // 
 const 
 roleArn 
 = 
 'arn:aws:iam:...' 
 ; 
 // 
 const 
 gcpServiceAccount 
 = 
 'ingestion-account@...' 
 ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 async 
 function 
 createTopicWithAwsMskIngestion 
 ( 
 topicNameOrId 
 : 
 string 
 , 
 clusterArn 
 : 
 string 
 , 
 mskTopic 
 : 
 string 
 , 
 awsRoleArn 
 : 
 string 
 , 
 gcpServiceAccount 
 : 
 string 
 , 
 ) 
 { 
 // 
 Creates 
 a 
 new 
 topic 
 with 
 AWS 
 MSK 
 ingestion 
 . 
 await 
 pubSubClient 
 . 
 createTopic 
 ({ 
 name 
 : 
 topicNameOrId 
 , 
 ingestionDataSourceSettings 
 : 
 { 
 awsMsk 
 : 
 { 
 clusterArn 
 , 
 topic 
 : 
 mskTopic 
 , 
 awsRoleArn 
 , 
 gcpServiceAccount 
 , 
 }, 
 }, 
 }); 
 console 
 . 
 log 
 ( 
 ` 
 Topic 
 $ 
 { 
 topicNameOrId 
 } 
 created 
 with 
 AWS 
 MSK 
 ingestion 
 . 
 ` 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 google.pubsub_v1.types 
  
 import 
  Topic 
 
 from 
  
 google.pubsub_v1.types 
  
 import 
  IngestionDataSourceSettings 
 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # cluster_arn = "your-cluster-arn" 
 # msk_topic = "your-msk-topic" 
 # aws_role_arn = "your-aws-role-arn" 
 # gcp_service_account = "your-gcp-service-account" 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 request 
 = 
 Topic 
 ( 
 name 
 = 
 topic_path 
 , 
 ingestion_data_source_settings 
 = 
 IngestionDataSourceSettings 
 ( 
 aws_msk 
 = 
  IngestionDataSourceSettings 
 
 . 
  AwsMsk 
 
 ( 
 cluster_arn 
 = 
 cluster_arn 
 , 
 topic 
 = 
 msk_topic 
 , 
 aws_role_arn 
 = 
 aws_role_arn 
 , 
 gcp_service_account 
 = 
 gcp_service_account 
 , 
 ) 
 ), 
 ) 
 topic 
 = 
 publisher 
 . 
 create_topic 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Created topic: 
 { 
 topic 
 . 
 name 
 } 
 with AWS MSK Ingestion Settings" 
 ) 
 

For more information about ARNs, see Amazon Resource Names (ARNs) and IAM Identifiers .

If you run into issues, see Troubleshooting an Amazon MSK import topic .

Edit Amazon MSK import topics

To edit the ingestion data source settings of an Amazon MSK import topic, follow these steps:

Console

  1. In the Google Cloud console, go to the Topicspage.

    Go to Topics

  2. Click the Amazon MSK import topic.

  3. In the topic details page, click Edit.

  4. Update the fields that you want to change.

  5. Click Update.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud pubsub topics update command with all the flags mentioned in the following sample:

    gcloud  
    pubsub  
    topics  
    update  
     TOPIC_ID 
      
     \ 
      
    --aws-msk-ingestion-cluster-arn  
     MSK_CLUSTER_ARN 
      
     \ 
      
    --aws-msk-ingestion-topic  
     MSK_TOPIC 
      
     \ 
      
    --aws-msk-ingestion-aws-role-arn  
     MSK_ROLE_ARN 
      
     \ 
      
    --aws-msk-ingestion-service-account  
     PUBSUB_SERVICE_ACCOUNT 
    

    Replace the following:

    • TOPIC_ID : the name or ID of your Pub/Sub topic.
    • MSK_CLUSTER_ARN : the ARN for the Amazon MSK cluster that you are ingesting into Pub/Sub. The ARN format is as follows: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId} .
    • MSK_TOPIC : the name of the Amazon MSK Kafka topic that you are ingesting into Pub/Sub.
    • MSK_ROLE_ARN : the ARN of the AWS role. The ARN format of the role is as follows: arn:aws:iam::${Account}:role/${RoleName} .
    • PUBSUB_SERVICE_ACCOUNT : the service account that you created in Create a service account in Google Cloud .

Quotas and limits

The publisher throughput for import topics is bound by the publish quota of the topic. For more information, see Pub/Sub quotas and limits .

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Design a Mobile Site
View Site in Mobile | Classic
Share by: