Create a Compute Engine instance in a zone which is in the same region
as the Kafka cluster. The instance must also be in a VPC containing the
subnet that you've used in the cluster configuration. For example, the
following command creates a Compute Engine instance calledtest-instance:
Give the Compute Engine default service account the necessary
permissions to connect to the cluster and authenticate. You need to grant the
Managed Kafka Client role (roles/managedkafka.client), the Service Account
Token Creator role (roles/iam.serviceAccountTokenCreator), and the Service
Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator).
Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, seeAbout SSH connections.
Install Java to run Kafka command line tools and wget to help download dependencies.
The following commands assume you are using a Debian Linux environment.
This code downloads and extracts the Apache Kafka distribution, sets theKAFKA_HOMEenvironment variable for convenience, adds the Kafkabindirectory to thePATHvariable, and explicitly sets theCLASSPATHto
include the downloaded files.
Set up the Managed Service for Apache Kafka authentication library.
Download the dependencies and install them locally. Since the Kafka
command line tools look for Java dependencies in thelibdirectory of
the Kafka installation directory, we add these dependencies there.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-09-05 UTC."],[],[],null,["Produce and consume messages with the Kafka command-line tools Learn how to use the Kafka command-line tools to connect to a\nManaged Service for Apache Kafka cluster, produce messages, and consume\nmessages.\n\nBefore you begin\n\nBefore you start this tutorial, create a new cluster by following the steps in\n[Create a cluster in Managed Service for Apache Kafka](/managed-service-for-apache-kafka/docs/quickstart-cluster).\n\nIf you already have a Managed Service for Apache Kafka cluster, you can\nskip this step.\n\nSet up a client machine\n\nSet up a client on a Compute Engine instance that can access the VPC\ncontaining the `default` subnet where the Kafka cluster is reachable.\n\nFor this section, you require the project number and the\nproject ID of the project where the Kafka cluster is located.\n\nTo find the project name and project number for your project, see\n[Find the project name, number, and ID](/resource-manager/docs/creating-managing-projects#identifying_projects).\n\n1. Create a Compute Engine instance in a zone which is in the same region\n as the Kafka cluster. The instance must also be in a VPC containing the\n subnet that you've used in the cluster configuration. For example, the\n following command creates a Compute Engine instance called\n `test-instance`:\n\n gcloud compute instances create test-instance \\\n --scopes=https://www.googleapis.com/auth/cloud-platform \\\n --subnet=projects/\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e/regions/\u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e/subnetworks/default \\\n --zone=\u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e-c\n\n For more information about creating a VM, see\n [Create a VM instance in a specific subnet](/compute/docs/instances/create-start-instance#create_a_VM_instance_in_a_specific_subnet).\n2. Give the Compute Engine default service account the necessary\n permissions to connect to the cluster and authenticate. You need to grant the\n Managed Kafka Client role (`roles/managedkafka.client`), the Service Account\n Token Creator role (`roles/iam.serviceAccountTokenCreator`), and the Service\n Account OpenID Token Creator role (`roles/iam.serviceAccountOpenIdTokenCreator`).\n\n gcloud projects add-iam-policy-binding \u003cvar label=\"project id\" translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e \\\n --member=\"serviceAccount:\u003cvar label=\"project number\" translate=\"no\"\u003ePROJECT_NUMBER\u003c/var\u003e-compute@developer.gserviceaccount.com\" \\\n --role=roles/managedkafka.client\n\n gcloud projects add-iam-policy-binding \u003cvar label=\"project id\" translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e\\\n --member=\"serviceAccount:\u003cvar label=\"project number\" translate=\"no\"\u003ePROJECT_NUMBER\u003c/var\u003e-compute@developer.gserviceaccount.com\" \\\n --role=roles/iam.serviceAccountTokenCreator\n\n gcloud projects add-iam-policy-binding \u003cvar label=\"project id\" translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e \\\n --member=\"serviceAccount:\u003cvar label=\"project number\" translate=\"no\"\u003ePROJECT_NUMBER\u003c/var\u003e-compute@developer.gserviceaccount.com\" \\\n --role=roles/iam.serviceAccountOpenIdTokenCreator\n\n Replace \u003cvar label=\"cluster\" translate=\"no\"\u003ePROJECT_NUMBER\u003c/var\u003e with the number of the project\n containing the cluster. You can look up this number using `gcloud projects describe `\u003cvar label=\"project id\" translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e.\n3. Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:\n\n gcloud compute ssh test-instance --project=\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e --zone=\u003cvar translate=\"no\"\u003eREGION\u003c/var\u003e-c\n\n Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see [About SSH connections](/compute/docs/instances/ssh).\n4. Install Java to run Kafka command line tools and wget to help download dependencies.\n The following commands assume you are using a Debian Linux environment.\n\n sudo apt-get install default-jre wget\n\n5. Install the Kafka command line tools on the VM.\n\n wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz\n tar xfz kafka_2.13-3.7.2.tgz\n export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2\n export PATH=$PATH:$KAFKA_HOME/bin\n export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*\n\n This code downloads and extracts the Apache Kafka distribution, sets the\n `KAFKA_HOME` environment variable for convenience, adds the Kafka\n `bin` directory to the `PATH` variable, and explicitly sets the `CLASSPATH` to\n include the downloaded files.\n6. Set up the Managed Service for Apache Kafka authentication library.\n\n 1. Download the dependencies and install them locally. Since the Kafka\n command line tools look for Java dependencies in the `lib` directory of\n the Kafka installation directory, we add these dependencies there.\n\n wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip\n sudo apt-get install unzip\n unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/\n\n 2. Set up the client machine configuration properties.\n\n cat \u003c\u003cEOF\u003e client.properties\n security.protocol=SASL_SSL\n sasl.mechanism=OAUTHBEARER\n sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler\n sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;\n EOF\n\n This code configures a Kafka client for the following settings:\n\n - Use SASL_SSL for secure communication with the Kafka cluster.\n\n - Employ OAuth 2.0 bearer tokens for authentication.\n\n - Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.\n\nSend and consume messages\n\nRun these commands on the client machine.\n\n1. Set up the project-ID address as an environment variable.\n\n export PROJECT_ID=\u003cvar label=\"project id\" translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e\n export CLUSTER_ID=\u003cvar label=\"cluster id\" translate=\"no\"\u003eCLUSTER_ID\u003c/var\u003e\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e with the name of the project.\n - \u003cvar translate=\"no\"\u003eCLUSTER_ID\u003c/var\u003e with the name of the new cluster.\n2. Set the bootstrap address as an environment variable.\n\n ```bash\n export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092\n ```\n\n For more information, see\n [Get the bootstrap address](/managed-service-for-apache-kafka/docs/quickstart-cluster#get-bootstrap-address).\n3. List the topics in the cluster.\n\n kafka-topics.sh --list \\\n --bootstrap-server $BOOTSTRAP \\\n --command-config client.properties\n\n4. Write a message to a topic.\n\n echo \"hello world\" | kafka-console-producer.sh --topic \u003cvar translate=\"no\"\u003eKAFKA_TOPIC_NAME\u003c/var\u003e \\\n --bootstrap-server $BOOTSTRAP --producer.config client.properties\n\n Replace \u003cvar translate=\"no\"\u003eKAFKA_TOPIC_NAME\u003c/var\u003e with the name of the topic.\n5. Consume message from the topic.\n\n kafka-console-consumer.sh --topic \u003cvar translate=\"no\"\u003eKAFKA_TOPIC_NAME\u003c/var\u003e --from-beginning \\\n --bootstrap-server $BOOTSTRAP --consumer.config client.properties\n\n To stop consuming messages, enter \u003ckbd\u003eCtrl\u003c/kbd\u003e+\u003ckbd\u003eC\u003c/kbd\u003e.\n6. Run a producer performance test.\n\n kafka-producer-perf-test.sh --topic \u003cvar translate=\"no\"\u003eKAFKA_TOPIC_NAME\u003c/var\u003e --num-records 1000000 \\\n --throughput -1 --print-metrics --record-size 1024 \\\n --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties\n\nClean up\n\n\nTo avoid incurring charges to your Google Cloud account for\nthe resources used on this page, follow these steps.\n\n1. To delete the cluster, run the [`gcloud managed-kafka clusters delete`](/sdk/gcloud/reference/managed-kafka/clusters/delete) command:\n\n ```bash\n gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION\n ```\n\nWhat's next\n\n- [Overview of Managed Service for Apache Kafka](/managed-service-for-apache-kafka/docs/overview).\n\n- [Authenticate to Managed Kafka API](/managed-service-for-apache-kafka/docs/authentication).\n\n*Apache Kafka® is a registered\ntrademark of The Apache Software Foundation or its affiliates in the United\nStates and/or other countries.*"]]