Produce and consume messages with the Kafka command-line tools

Learn how to use the Kafka command-line tools to connect to a Managed Service for Apache Kafka cluster, produce messages, and consume messages.

Before you begin

Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka .

If you already have a Managed Service for Apache Kafka cluster, you can skip this step.

Set up a client machine

Set up a client on a Compute Engine instance that can access the VPC containing the default subnet where the Kafka cluster is reachable.

For this section, you require the project number and the project ID of the project where the Kafka cluster is located.

To find the project name and project number for your project, see Find the project name, number, and ID .

  1. Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called test-instance :

     gcloud  
    compute  
    instances  
    create  
    test-instance  
     \ 
      
    --scopes = 
    https://www.googleapis.com/auth/cloud-platform  
     \ 
      
    --subnet = 
    projects/ PROJECT_ID 
    /regions/ REGION 
    /subnetworks/default  
     \ 
      
    --zone = 
     REGION 
    -c 
    

    For more information about creating a VM, see Create a VM instance in a specific subnet .

  2. Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role ( roles/managedkafka.client ), the Service Account Token Creator role ( roles/iam.serviceAccountTokenCreator ), and the Service Account OpenID Token Creator role ( roles/iam.serviceAccountOpenIdTokenCreator ).

     gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/managedkafka.client
    
    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/iam.serviceAccountTokenCreator
    
    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/iam.serviceAccountOpenIdTokenCreator 
    

    Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using gcloud projects describe PROJECT_ID .

  3. Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:

       
    gcloud  
    compute  
    ssh  
    test-instance  
    --project = 
     PROJECT_ID 
      
    --zone = 
     REGION 
    -c 
    

    Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections .

  4. Install Java to run Kafka command line tools and wget to help download dependencies. The following commands assume you are using a Debian Linux environment.

     sudo  
    apt-get  
    install  
    default-jre  
    wget 
    
  5. Install the Kafka command line tools on the VM.

     wget  
    -O  
    kafka_2.13-3.7.2.tgz  
    https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar  
    xfz  
    kafka_2.13-3.7.2.tgz export 
      
     KAFKA_HOME 
     = 
     $( 
     pwd 
     ) 
    /kafka_2.13-3.7.2 export 
      
     PATH 
     = 
     $PATH 
    : $KAFKA_HOME 
    /bin export 
      
     CLASSPATH 
     = 
     $CLASSPATH 
    : $KAFKA_HOME 
    /libs/release-and-dependencies/*: $KAFKA_HOME 
    /libs/release-and-dependencies/dependency/* 
    

    This code downloads and extracts the Apache Kafka distribution, sets the KAFKA_HOME environment variable for convenience, adds the Kafka bin directory to the PATH variable, and explicitly sets the CLASSPATH to include the downloaded files.

  6. Set up the Managed Service for Apache Kafka authentication library.

    1. Download the dependencies and install them locally. Since the Kafka command line tools look for Java dependencies in the lib directory of the Kafka installation directory, we add these dependencies there.

       wget  
      https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
      sudo  
      apt-get  
      install  
      unzip
      unzip  
      -n  
      -j  
      release-and-dependencies.zip  
      -d  
       $KAFKA_HOME 
      /libs/ 
      
    2. Set up the client machine configuration properties.

       cat  
      <<EOF>  
      client.properties
      security.protocol = 
      SASL_SSL
      sasl.mechanism = 
      OAUTHBEARER
      sasl.login.callback.handler.class = 
      com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
      sasl.jaas.config = 
      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule  
      required ; 
      EOF 
      

      This code configures a Kafka client for the following settings:

    • Use SASL_SSL for secure communication with the Kafka cluster.

    • Employ OAuth 2.0 bearer tokens for authentication.

    • Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.

Send and consume messages

Run these commands on the client machine.

  1. Set up the project-ID address as an environment variable.

      export 
      
     PROJECT_ID 
     = 
     PROJECT_ID 
     export 
      
     CLUSTER_ID 
     = 
     CLUSTER_ID 
     
    

    Replace the following:

    • PROJECT_ID with the name of the project.
    • CLUSTER_ID with the name of the new cluster.
  2. Set the bootstrap address as an environment variable.

     export 
      
     BOOTSTRAP 
     = 
    bootstrap. CLUSTER_ID 
    . REGION 
    .managedkafka. PROJECT_ID 
    .cloud.goog:9092

    For more information, see Get the bootstrap address .

  3. List the topics in the cluster.

     kafka-topics.sh  
    --list  
     \ 
    --bootstrap-server  
     $BOOTSTRAP 
      
     \ 
    --command-config  
    client.properties 
    
  4. Write a message to a topic.

      echo 
      
     "hello world" 
      
     | 
      
    kafka-console-producer.sh  
    --topic  
     KAFKA_TOPIC_NAME 
      
     \ 
    --bootstrap-server  
     $BOOTSTRAP 
      
    --producer.config  
    client.properties 
    

    Replace KAFKA_TOPIC_NAME with the name of the topic.

  5. Consume message from the topic.

       
    kafka-console-consumer.sh  
    --topic  
     KAFKA_TOPIC_NAME 
      
    --from-beginning  
     \ 
      
    --bootstrap-server  
     $BOOTSTRAP 
      
    --consumer.config  
    client.properties 
    

    To stop consuming messages, enter Ctrl + C .

  6. Run a producer performance test.

     kafka-producer-perf-test.sh  
    --topic  
     KAFKA_TOPIC_NAME 
      
    --num-records  
     1000000 
      
     \ 
    --throughput  
    -1  
    --print-metrics  
    --record-size  
     1024 
      
     \ 
    --producer-props  
    bootstrap.servers = 
     $BOOTSTRAP 
      
    --producer.config  
    client.properties 
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

  1. To delete the cluster, run the gcloud managed-kafka clusters delete command:

    gcloud  
    managed-kafka  
    clusters  
    delete  
     CLUSTER_ID 
      
    --location = 
     REGION 
    

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Design a Mobile Site
View Site in Mobile | Classic
Share by: