Deploy a highly-available Kafka cluster on GKE


Kafka is an open source, distributed publish-subscribe messaging system for handling high-volume, high-throughput, and real-time streaming data. You can use Kafka to build streaming data pipelines that move data reliably across different systems and applications for processing and analysis.

This tutorial is intended for platform administrators, cloud architects, and operations professionals interested in deploying highly-available Kafka clusters on Google Kubernetes Engine (GKE).

Objectives

In this tutorial, you will learn how to:
  • Use Terraform to create a regional GKE cluster.
  • Deploy a highly-available Kafka cluster.
  • Upgrade Kafka binaries.
  • Backup and restore the Kafka cluster.
  • Simulate GKE node disruption and Kafka broker failover.

Architecture

This section describes the architecture of the solution you'll build in this tutorial.

A Kafka cluster is a group of one or more servers (called brokers ) that work together to handle incoming data streams and publish-subscribe messaging for Kafka clients (called consumers ).

Each data partition in a Kafka cluster has one leader broker and can have one or more follower brokers. The leader broker handles all reads and writes to the partition. Each follower broker passively replicates the leader broker.

In a typical Kafka setup, you also use an open source service called ZooKeeper to coordinate your Kafka clusters. This service helps in electing a leader among the brokers and triggering failover in case of failures.

In this tutorial, you deploy the Kafka clusters on GKE by configuring the Kafka brokers and Zookeeper service as individual StatefulSets . To provision highly-available Kafka clusters and prepare for disaster recovery, you'll configure your Kafka and Zookeeper StatefulSets to use separate node pools and zones .

The following diagram shows how your Kafka StatefulSet runs on multiple nodes and zones in your GKE cluster.

Diagram shows an example architecture of a Kafka StatefulSet on GKE deployed across multiple zones.
Figure 1 : Deploying your Kafka StatefulSet on GKE nodes across three different zones.

The following diagram shows how your Zookeeper StatefulSet runs on multiple nodes and zones in your GKE cluster.

Diagram shows an example architecture of a Zookeeper StatefulSet on GKE deployed across multiple zones.
Figure 2 : Deploying your Kafka Zookeeper on GKE nodes across three different zones.

Node provisioning and Pod scheduling

If you are using Autopilot clusters, Autopilot handles provisioning nodes and scheduling the Pods for your workloads. You'll use Pod anti-affinity to ensure that no two Pods of the same StatefulSet are scheduled on the same node and same zone.

If you are using Standard clusters, you'll need to configure the Pod toleration and node affinity . To learn more, see Isolate your workloads in dedicated node pools .

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator .

New Google Cloud users might be eligible for a free trial .

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up .

Before you begin

Set up your project

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project .

  4. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project .

  7. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

Set up roles

  1. Make sure that you have the following role or roles on the project: role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    Check for the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. In the Principal column, find all rows that identify you or a group that you're included in. To learn which groups you're included in, contact your administrator.

    4. For all rows that specify or include you, check the Role column to see whether the list of roles includes the required roles.

    Grant the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. Click Grant access .
    4. In the New principals field, enter your user identifier. This is typically the email address for a Google Account.

    5. In the Select a role list, select a role.
    6. To grant additional roles, click Add another role and add each additional role.
    7. Click Save .

Set up your environment

In this tutorial, you use Cloud Shell to manage resources hosted on Google Cloud. Cloud Shell comes preinstalled with the software you'll need for this tutorial, including Docker , kubectl , the gcloud CLI , Helm , and Terraform .

To set up your environment with Cloud Shell, follow these steps:

  1. Launch a Cloud Shell session from the Google Cloud console, by clickingCloud Shell activation icon Activate Cloud Shellin the Google Cloud console . This launches a session in the bottom pane of Google Cloud console.

  2. Set environment variables.

      export 
      
     PROJECT_ID 
     = 
     PROJECT_ID 
     export 
      
     REGION 
     = 
    us-central1 
    

    Replace the following values:

  3. Set the default environment variables.

     gcloud  
    config  
     set 
      
    project  
     PROJECT_ID 
     
    
  4. Clone the code repository.

     git  
    clone  
    https://github.com/GoogleCloudPlatform/kubernetes-engine-samples 
    
  5. Change to the working directory.

      cd 
      
    kubernetes-engine-samples/streaming/gke-stateful-kafka 
    

Create your cluster infrastructure

In this section, you'll run a Terraform script to create two regional GKE clusters . The primary cluster will be deployed in us-central1 .

To create the cluster, follow these steps:

Autopilot

In Cloud Shell, run the following commands:

 terraform  
-chdir = 
terraform/gke-autopilot  
init
terraform  
-chdir = 
terraform/gke-autopilot  
apply  
-var  
 project_id 
 = 
 $PROJECT_ID 
 

When prompted, type yes .

Standard

In Cloud Shell, run the following commands:

 terraform  
-chdir = 
terraform/gke-standard  
init
terraform  
-chdir = 
terraform/gke-standard  
apply  
-var  
 project_id 
 = 
 $PROJECT_ID 
 

When prompted, type yes .

The Terraform configuration files create the following resources to deploy your infrastructure:

  • Create a Artifact Registry repository to store the Docker images.
  • Create the VPC network and subnet for the VM's network interface.
  • Create a two GKE clusters.

Terraform creates a private cluster in the two region, and enables Backup for GKE for disaster recovery.

Deploy Kafka on your cluster

In this section, you'll deploy Kafka on GKE using a Helm chart. The operation creates the following resources:

  • The Kafka and Zookeeper StatefulSets.
  • A Kafka exporter deployment. The exporter gathers Kafka metrics for Prometheus consumption.
  • A Pod Disruption Budget (PDB) that limits the number of offline Pods during a voluntary disruption.

To use the Helm chart to deploy Kafka, follow these steps:

  1. Configure Docker access.

     gcloud  
    auth  
    configure-docker  
    us-docker.pkg.dev 
    
  2. Populate Artifact Registry with the Kafka and Zookeeper images.

     ./scripts/gcr.sh  
    bitnami/kafka  
     3 
    .3.2-debian-11-r0
    ./scripts/gcr.sh  
    bitnami/kafka-exporter  
     1 
    .6.0-debian-11-r52
    ./scripts/gcr.sh  
    bitnami/jmx-exporter  
     0 
    .17.2-debian-11-r41
    ./scripts/gcr.sh  
    bitnami/zookeeper  
     3 
    .8.0-debian-11-r74 
    
  3. Configure kubectl command line access to the primary cluster.

     gcloud  
    container  
    clusters  
    get-credentials  
    gke-kafka-us-central1  
     \ 
      
    --location = 
     ${ 
     REGION 
     } 
      
     \ 
      
    --project = 
     ${ 
     PROJECT_ID 
     } 
     
    
  4. Create a namespace.

      export 
      
     NAMESPACE 
     = 
    kafka
    kubectl  
    create  
    namespace  
     $NAMESPACE 
     
    
  5. Install Kafka using the Helm chart version 20.0.6.

      cd 
      
    helm
    ../scripts/chart.sh  
    kafka  
     20 
    .0.6 && 
     \ 
    rm  
    -rf  
    Chart.lock  
    charts && 
     \ 
    helm  
    dependency  
    update && 
     \ 
    helm  
    -n  
    kafka  
    upgrade  
    --install  
    kafka  
    .  
     \ 
    --set  
    global.imageRegistry = 
     "us-docker.pkg.dev/ 
     $PROJECT_ID 
     /main" 
     
    

    The output is similar to the following:

     NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None 
    
  6. Verify that your Kafka replicas are running (this might take a few minutes).

     kubectl  
    get  
    all  
    -n  
    kafka 
    

    The output is similar to the following:

     ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s 
    

Create test data

In this section, you will test the Kafka application and generate messages.

  1. Create a consumer client Pod for interacting with the Kafka application.

     kubectl  
    run  
    kafka-client  
    -n  
    kafka  
    --rm  
    -ti  
     \ 
      
    --image  
    us-docker.pkg.dev/ $PROJECT_ID 
    /main/bitnami/kafka:3.3.2-debian-11-r0  
    --  
    bash 
    
  2. Create a topic named topic1 with three partitions and a replication factor of three.

     kafka-topics.sh  
     \ 
      
    --create  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --partitions  
     3 
      
     \ 
      
    --replication-factor  
     3 
      
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    
  3. Verify that the topic partitions are replicated across all three brokers.

     kafka-topics.sh  
     \ 
      
    --describe  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output is similar to the following:

     Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2 
    

    In the example output, notice that topic1 has three partitions, each with a different leader and set of replicas. This is because Kafka uses partitioning to distribute the data across multiple brokers, allowing for greater scalability and fault tolerance. The replication factor of three ensures that each partition has three replicas, so that data is still available even if one or two brokers fail.

  4. Run the following command to generate message numbers in bulk into topic1 .

      ALLOW_PLAINTEXT_LISTENER 
     = 
    yes for 
      
    x  
     in 
      
     $( 
    seq  
     0 
      
     200 
     ) 
     ; 
      
     do 
      
     echo 
      
     " 
     $x 
     : Message number 
     $x 
     " 
     done 
      
     | 
      
    kafka-console-producer.sh  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092  
     \ 
      
    --property  
    parse.key = 
     true 
      
     \ 
      
    --property  
    key.separator = 
     ":" 
     
    
  5. Run the following command to consume topic1 from all partitions.

     kafka-console-consumer.sh  
     \ 
      
    --bootstrap-server  
    kafka.kafka.svc.cluster.local:9092  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --property  
    print.key = 
     true 
      
     \ 
      
    --property  
    key.separator = 
     " : " 
      
     \ 
      
    --from-beginning ; 
     
    

    Type CTRL+C to stop the consumer process.

Benchmark Kafka

To accurately model a use case, you can run a simulation of the expected load on the cluster. To test performance, you will use the tools included in the Kafka package, namely the kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh scripts in the bin folder.

  1. Create a topic for benchmarking.

     kafka-topics.sh  
     \ 
      
    --create  
     \ 
      
    --topic  
    topic-benchmark  
     \ 
      
    --partitions  
     3 
      
     \ 
      
    --replication-factor  
     3 
      
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    
  2. Create load on the Kafka cluster.

      KAFKA_HEAP_OPTS 
     = 
     "-Xms4g -Xmx4g" 
      
    kafka-producer-perf-test.sh  
     \ 
      
    --topic  
    topic-benchmark  
     \ 
      
    --num-records  
     10000000 
      
     \ 
      
    --throughput  
    -1  
     \ 
      
    --producer-props  
    bootstrap.servers = 
    kafka.kafka.svc.cluster.local:9092  
     \ 
      
    batch.size = 
     16384 
      
     \ 
      
     acks 
     = 
    all  
     \ 
      
    linger.ms = 
     500 
      
     \ 
      
    compression.type = 
    none  
     \ 
      
    --record-size  
     100 
      
     \ 
      
    --print-metrics 
    

    The producer will generate 10,000,000 records on topic-benchmark . The output is similar to the following:

     623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency. 
    

    Once all records have been sent, you should see additional metrics in the output, similar to the following:

     producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000 
    

    To exit the watch, type CTRL + C .

  3. Exit the Pod shell.

      exit 
     
    

Manage upgrades

Version updates for Kafka and Kubernetes are released on a regular schedule. Follow operational best practices to upgrade your software environment regularly.

Plan for Kafka binary upgrades

In this section, you will update the Kafka image using Helm and verify that your topics are still available.

To upgrade from the earlier Kafka version from the Helm chart you used in Deploy Kafka on your cluster , follow these steps:

  1. Populate Artifact Registry with the following image:

     ../scripts/gcr.sh  
    bitnami/kafka  
     3 
    .4.0-debian-11-r2
    ../scripts/gcr.sh  
    bitnami/kafka-exporter  
     1 
    .6.0-debian-11-r61
    ../scripts/gcr.sh  
    bitnami/jmx-exporter  
     0 
    .17.2-debian-11-r49
    ../scripts/gcr.sh  
    bitnami/zookeeper  
     3 
    .8.1-debian-11-r0 
    
  2. Perform these steps to deploy a Helm chart with the upgraded Kafka and Zookeeper images. For version-specific guidance, refer to the Kafka instructions for version upgrades .

    1. Update the Chart.yaml dependency version:
     ../scripts/chart.sh  
    kafka  
     20 
    .1.0 
    
    1. Deploy the Helm chart with the new Kafka and Zookeeper images, as shown in the following example:

       rm  
      -rf  
      Chart.lock  
      charts && 
       \ 
      helm  
      dependency  
      update && 
       \ 
      helm  
      -n  
      kafka  
      upgrade  
      --install  
      kafka  
      ./  
       \ 
        
      --set  
      global.imageRegistry = 
       " 
       $REGION 
       -docker.pkg.dev/ 
       $PROJECT_ID 
       /main" 
       
      

    Watch the Kafka Pods get upgraded:

     kubectl  
    get  
    pod  
    -l  
    app.kubernetes.io/component = 
    kafka  
    -n  
    kafka  
    --watch 
    

    To exit the watch, type CTRL + C .

  3. Connect to the Kafka cluster using a client Pod.

     kubectl  
    run  
    kafka-client  
    -n  
    kafka  
    --rm  
    -ti  
     \ 
      
    --image  
    us-docker.pkg.dev/ $PROJECT_ID 
    /main/bitnami/kafka:3.4.0-debian-11-r2  
    --  
    bash 
    
  4. Verify you can access messages from topic1 .

     kafka-console-consumer.sh  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --from-beginning  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output should show the generated messages from the previous step. Type CTRL+C to exit the process.

  5. Exit the client Pod.

      exit 
     
    

Prepare for disaster recovery

To ensure that your production workloads remain available in the event of a service-interrupting event, you should prepare a disaster recovery (DR) plan. To learn more about DR planning, see the Disaster recovery planning guide .

To backup and restore your workloads on GKE clusters, you can use Backup for GKE .

Example Kafka backup and restore scenario

In this section, you will take a backup of your cluster from gke-kafka-us-central1 and restore the backup into gke-kafka-us-west1 . You will perform the backup and restore operation at the application scope, using the ProtectedApplication Custom Resource.

The following diagram illustrates the components of the disaster recovery solution, and how they relate to each other.

Diagram shows an example backup-and-recovery solution for a highly-available Kafka cluster.
Figure 3 : Example backup-and-recovery solution for a highly-available Kafka cluster.

To prepare to backup and restore your Kafka cluster, follow these steps:

  1. Set up the environment variables.

      export 
      
     BACKUP_PLAN_NAME 
     = 
    kafka-protected-app export 
      
     BACKUP_NAME 
     = 
    protected-app-backup-1 export 
      
     RESTORE_PLAN_NAME 
     = 
    kafka-protected-app export 
      
     RESTORE_NAME 
     = 
    protected-app-restore-1 export 
      
     REGION 
     = 
    us-central1 export 
      
     DR_REGION 
     = 
    us-west1 export 
      
     CLUSTER_NAME 
     = 
    gke-kafka- $REGION 
     export 
      
     DR_CLUSTER_NAME 
     = 
    gke-kafka- $DR_REGION 
     
    
  2. Verify the cluster is in a RUNNING state.

     gcloud  
    container  
    clusters  
    describe  
     $CLUSTER_NAME 
      
    --location  
    us-central1  
    --format = 
     'value(status)' 
     
    
  3. Create a Backup Plan.

     gcloud  
    beta  
    container  
    backup-restore  
    backup-plans  
    create  
     $BACKUP_PLAN_NAME 
      
     \ 
      
    --project = 
     $PROJECT_ID 
      
     \ 
      
    --location = 
     $DR_REGION 
      
     \ 
      
    --cluster = 
    projects/ $PROJECT_ID 
    /locations/ $REGION 
    /clusters/ $CLUSTER_NAME 
      
     \ 
      
    --selected-applications = 
    kafka/kafka,kafka/zookeeper  
     \ 
      
    --include-secrets  
     \ 
      
    --include-volume-data  
     \ 
      
    --cron-schedule = 
     "0 3 * * *" 
      
     \ 
      
    --backup-retain-days = 
     7 
      
     \ 
      
    --backup-delete-lock-days = 
     0 
     
    
  4. Manually create a Backup. While scheduled backups are typically governed by the cron-schedule in the backup plan, the following example shows how you can initiate a one-time backup operation.

     gcloud  
    beta  
    container  
    backup-restore  
    backups  
    create  
     $BACKUP_NAME 
      
     \ 
      
    --project = 
     $PROJECT_ID 
      
     \ 
      
    --location = 
     $DR_REGION 
      
     \ 
      
    --backup-plan = 
     $BACKUP_PLAN_NAME 
      
     \ 
      
    --wait-for-completion 
    
  5. Create a Restore Plan.

     gcloud  
    beta  
    container  
    backup-restore  
    restore-plans  
    create  
     $RESTORE_PLAN_NAME 
      
     \ 
      
    --project = 
     $PROJECT_ID 
      
     \ 
      
    --location = 
     $DR_REGION 
      
     \ 
      
    --backup-plan = 
    projects/ $PROJECT_ID 
    /locations/ $DR_REGION 
    /backupPlans/ $BACKUP_PLAN_NAME 
      
     \ 
      
    --cluster = 
    projects/ $PROJECT_ID 
    /locations/ $DR_REGION 
    /clusters/ $DR_CLUSTER_NAME 
      
     \ 
      
    --cluster-resource-conflict-policy = 
    use-existing-version  
     \ 
      
    --namespaced-resource-restore-mode = 
    delete-and-restore  
     \ 
      
    --volume-data-restore-policy = 
    restore-volume-data-from-backup  
     \ 
      
    --selected-applications = 
    kafka/kafka,kafka/zookeeper  
     \ 
      
    --cluster-resource-scope-selected-group-kinds = 
     "storage.k8s.io/StorageClass" 
     
    
  6. Manually restore from a Backup.

     gcloud  
    beta  
    container  
    backup-restore  
    restores  
    create  
     $RESTORE_NAME 
      
     \ 
      
    --project = 
     $PROJECT_ID 
      
     \ 
      
    --location = 
     $DR_REGION 
      
     \ 
      
    --restore-plan = 
     $RESTORE_PLAN_NAME 
      
     \ 
      
    --backup = 
    projects/ $PROJECT_ID 
    /locations/ $DR_REGION 
    /backupPlans/ $BACKUP_PLAN_NAME 
    /backups/ $BACKUP_NAME 
     
    
  7. Watch the restored application come up in the backup cluster. It may take a few minutes for all the Pods to be running and ready.

     gcloud  
    container  
    clusters  
    get-credentials  
    gke-kafka-us-west1  
     \ 
      
    --location  
    us-west1
    kubectl  
    get  
    pod  
    -n  
    kafka  
    --watch 
    

    Type CTRL+C to exit the watch when all Pods are up and running.

  8. Validate that previous topics can be fetched by a consumer.

     kubectl  
    run  
    kafka-client  
    -n  
    kafka  
    --rm  
    -ti  
     \ 
      
    --image  
    us-docker.pkg.dev/ $PROJECT_ID 
    /main/bitnami/kafka:3.4.0  
    --  
    bash 
    
     kafka-console-consumer.sh  
     \ 
      
    --bootstrap-server  
    kafka.kafka.svc.cluster.local:9092  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --property  
    print.key = 
     true 
      
     \ 
      
    --property  
    key.separator = 
     " : " 
      
     \ 
      
    --from-beginning ; 
     
    

    The output is similar to the following:

     192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages 
    

    Type CTRL+C to exit the process.

  9. Exit the Pod.

      exit 
     
    

Simulate a Kafka service disruption

In this section, you will simulate a node failure by replacing a Kubernetes node hosting the broker. This section applies to Standard only. Autopilot manages your nodes for you, so node failure cannot be simulated.

  1. Create a client pod to connect to the Kafka application.

     kubectl  
    run  
    kafka-client  
    -n  
    kafka  
    --restart = 
     'Never' 
      
    -it  
     \ 
    --image  
    us-docker.pkg.dev/ $PROJECT_ID 
    /main/bitnami/kafka:3.4.0  
    --  
    bash 
    
  2. Create topic topic-failover-test and produce test traffic.

     kafka-topics.sh  
     \ 
      
    --create  
     \ 
      
    --topic  
    topic-failover-test  
     \ 
      
    --partitions  
     1 
      
     \ 
      
    --replication-factor  
     3 
      
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    
  3. Determine which broker is the leader for the topic-failover-test topic.

     kafka-topics.sh  
    --describe  
     \ 
      
    --topic  
    topic-failover-test  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output is similar to the following:

     Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2 
    

    In the output above, Leader: 1 means that the leader for topic-failover-test is broker 1. This corresponds to Pod kafka-1 .

  4. Open a new terminal and connect to the same cluster.

     gcloud  
    container  
    clusters  
    get-credentials  
    gke-kafka-us-west1  
    --location  
    us-west1  
    --project  
     PROJECT_ID 
     
    
  5. Find which node Pod kafka-1 is running on.

     kubectl  
    get  
    pod  
    -n  
    kafka  
    kafka-1  
    -o  
    wide 
    

    The output is similar to the following:

     NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none> 
    

    In the output above, you see Pod kafka-1 is running on node gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 .

  6. Drain the node to evict the Pods.

     kubectl  
    drain  
     NODE 
      
     \ 
      
    --delete-emptydir-data  
     \ 
      
    --force  
     \ 
      
    --ignore-daemonsets 
    

    Replace NODE with the node pod kafka-1 is running on. In this example, the node is gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 .

    The output is similar to the following:

     node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained 
    
  7. Find which node Pod kafka-1 is running on.

     kubectl  
    get  
    pod  
    -n  
    kafka  
    kafka-1  
    -o  
    wide 
    

    The output should look similar to the following:

     NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none> 
    

    From the output above, you see the application is running on a new node.

  8. In the terminal connected to the kafka-client Pod, determine which broker is the leader for topic-failover-test .

     kafka-topics.sh  
    --describe  
     \ 
      
    --topic  
    topic-failover-test  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output should look similar to the following:

     Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1 
    

    In the example output, the leader is still 1 . However, it's now running on a new node.

Testing Kafka leader failure

  1. In Cloud Shell, connect to the Kafka client, and use describe to view the elected leader for each partition in topic1 .

     kafka-topics.sh  
    --describe  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output is similar to the following:

     Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1 
    
  2. In the Cloud Shell not connected to the Kafka client, delete the kafka-0 leader broker to force a new leader election. You should delete the index that maps to one of the leaders in the previous output.

     kubectl  
    delete  
    pod  
    -n  
    kafka  
    kafka-0  
    --force 
    

    The output is similar to the following:

     pod "kafka-0" force deleted 
    
  3. In the Cloud Shell connected to the Kafka client, and use describe to view the elected leader.

     kafka-topics.sh  
    --describe  
     \ 
      
    --topic  
    topic1  
     \ 
      
    --bootstrap-server  
    kafka-headless.kafka.svc.cluster.local:9092 
    

    The output is similar to the following:

     Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1 
    

    In the output, the new leader for each partition changes, if it was assigned to the leader that was interrupted ( kafka-0 ). This indicates that the original leader was replaced when the Pod was deleted and re-created.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to avoid billing is to delete the project you created for the tutorial.

  • In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  • In the project list, select the project that you want to delete, and then click Delete .
  • In the dialog, type the project ID, and then click Shut down to delete the project.
  • What's next

    Design a Mobile Site
    View Site in Mobile | Classic
    Share by: