Develop a Python producer application

Learn how to develop a Python producer application that authenticates with a Managed Service for Apache Kafka cluster by using Application Default Credentials (ADC). ADC lets applications running on Google Cloud automatically find and use the right credentials for authenticating to Google Cloud services.

Before you begin

Before you start this tutorial, create a new Managed Service for Apache Kafka cluster. If you already have a cluster, you can skip this step.

How to create a cluster

Console

  1. Go to the Managed Service for Apache Kafka > Clusters page.

    Go to Clusters

  2. Click Create .
  3. In the Cluster name box, enter a name for the cluster.
  4. In the Region list, select a location for the cluster.
  5. For Network configuration , configure the subnet where the cluster is accessible:
    1. For Project , select your project.
    2. For Network , select the VPC network.
    3. For Subnet , select the subnet.
    4. Click Done .
  6. Click Create .

After you click Create , the cluster state is Creating . When the cluster is ready, the state is Active .

gcloud

To create a Kafka cluster, run the managed-kafka clusters create command.

gcloud  
managed-kafka  
clusters  
create  
 KAFKA_CLUSTER 
  
 \ 
--location = 
 REGION 
  
 \ 
--cpu = 
 3 
  
 \ 
--memory = 
3GiB  
 \ 
--subnets = 
projects/ PROJECT_ID 
/regions/ REGION 
/subnetworks/ SUBNET_NAME 
  
 \ 
--async

Replace the following:

  • KAFKA_CLUSTER : a name for the Kafka cluster
  • REGION : the location of the cluster
  • PROJECT_ID : your project ID
  • SUBNET_NAME : the subnet where you want to create the cluster, for example default

For information about supported locations, see Managed Service for Apache Kafka locations .

The command runs asynchronously and returns an operation ID:

Check  
operation  
 [ 
projects/ PROJECT_ID 
/locations/ REGION 
/operations/ OPERATION_ID 
 ] 
  
 for 
  
status.

To track the progress of the create operation, use the gcloud managed-kafka operations describe command:

gcloud  
managed-kafka  
operations  
describe  
 OPERATION_ID 
  
 \ 
  
--location = 
 REGION 

When the cluster is ready, the output from this command includes the entry state: ACTIVE . For more information, see Monitor the cluster creation operation .

Set up a client VM

Create a Linux virtual machine (VM) instance in Compute Engine that can access the Kafka cluster. When you configure the VM, set the following options:

  • Region. Create the VM in the same region as your Kafka cluster.

  • Subnet. Create the VM in the same VPC network as the subnet that you used in your Kafka cluster configuration. For more information, see View a cluster's subnets .

  • Access scopes. Assign the https://www.googleapis.com/auth/cloud-platform access scope to the VM. This scope authorizes the VM to send requests to the Managed Kafka API.

The following steps show how to set these options.

Console

  1. In the Google Cloud console, go to the Create an instancepage.

    Create an instance

  2. In the Machine configurationpane, do the following:

    1. In the Namefield, specify a name for your instance. For more information, see Resource naming convention .

    2. In the Regionlist, select the same region as your Kafka cluster.

    3. In the Zonelist, select a zone.

  3. In the navigation menu, click Networking. In the Networkingpane that appears, do the following:

    1. Go to the Network interfacessection.

    2. To expand the default network interface, click the arrow.

    3. In the Networkfield, choose the VPC network.

    4. In the Subnetworklist, select the subnet.

    5. Click Done.

  4. In the navigation menu, click Security. In the Securitypane that appears, do the following:

    1. For Access scopes, select Set access for each API.

    2. In the list of access scopes, find the Cloud Platformdrop-down list and select Enabled.

  5. Click Createto create the VM.

gcloud

To create the VM instance, use the gcloud compute instances create command.

 gcloud  
compute  
instances  
create  
 VM_NAME 
  
 \ 
  
--scopes = 
https://www.googleapis.com/auth/cloud-platform  
 \ 
  
--subnet = 
projects/ PROJECT_ID 
/regions/ REGION 
/subnetworks/ SUBNET 
  
 \ 
  
--zone = 
 ZONE 
 

Replace the following:

  • VM_NAME : the name of the VM
  • PROJECT_ID : your project ID
  • REGION : the region where you created the Kafka cluster, for example us-central1
  • SUBNET : a subnet in the same VPC network as the subnet that you used in the cluster configuration
  • ZONE : a zone in the region where you created the cluster, for example us-central1-c

For more information about creating a VM, see Create a VM instance in a specific subnet .

Grant IAM roles

Grant the following Identity and Access Management (IAM) roles to the Compute Engine default service account :

  • Managed Kafka Client( roles/managedkafka.client )
  • Service Account Token Creator( roles/iam.serviceAccountTokenCreator )
  • Service Account OpenID Token Creator( roles/iam.serviceAccountOpenIdTokenCreator )

Console

  1. In the Google Cloud console, go to the IAMpage.

    Go to IAM

  2. Find the row for Compute Engine default service accountand click Edit principal.

  3. Click Add another roleand select the role Managed Kafka Client. Repeat this step for the Service Account Token Creatorand Service Account OpenID Token Creatorroles.

  4. Click Save.

gcloud

To grant IAM roles, use the gcloud projects add-iam-policy-binding command.

 gcloud  
projects  
add-iam-policy-binding  
 PROJECT_ID 
  
 \ 
  
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
  
--role = 
roles/managedkafka.client

gcloud  
projects  
add-iam-policy-binding  
 PROJECT_ID 
 \ 
  
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
  
--role = 
roles/iam.serviceAccountTokenCreator

gcloud  
projects  
add-iam-policy-binding  
 PROJECT_ID 
  
 \ 
  
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
  
--role = 
roles/iam.serviceAccountOpenIdTokenCreator 

Replace the following:

  • PROJECT_ID : your project ID

  • PROJECT_NUMBER : your project number

To get the project number, run the gcloud projects describe command:

 gcloud  
projects  
describe  
 PROJECT_ID 
 

For more information, see Find the project name, number, and ID .

Connect to the VM

Use SSH to connect to the VM instance.

Console

  1. Go to the VM instancespage.

    Go to VM instances

  2. In the list of VM instances, find the VM name and click SSH.

gcloud

To connect to the VM, use the gcloud compute ssh command.

 gcloud  
compute  
ssh  
 VM_NAME 
  
 \ 
  
--project = 
 PROJECT_ID 
  
 \ 
  
--zone = 
 ZONE 
 

Replace the following:

  • VM_NAME : the name of the VM
  • PROJECT_ID : your project ID
  • ZONE : the zone where you created the VM

Additional configuration might be required for first time SSH usage. For more information, see About SSH connections .

Create a Python producer application

From your SSH session, run the following commands to create a producer application.

  1. Install pip, a Python package manager and the virtual environment manager:

     sudo  
    apt  
    install  
    python3-pip  
    -y
    sudo  
    apt  
    install  
    python3-venv  
    -y 
    
  2. Create a new virtual environment (venv) and activate it:

     python3  
    -m  
    venv  
    kafka source 
      
    kafka/bin/activate 
    
  3. Install the confluent-kafka client and other dependencies:

     pip  
    install  
    confluent-kafka  
    google-auth  
    urllib3  
    packaging 
    
  4. Copy the following producer client code into a file called producer.py

      import 
      
     confluent_kafka 
     import 
      
     argparse 
     from 
      
     tokenprovider 
      
     import 
     TokenProvider 
     parser 
     = 
     argparse 
     . 
     ArgumentParser 
     () 
     parser 
     . 
     add_argument 
     ( 
     '-b' 
     , 
     '--bootstrap-servers' 
     , 
     dest 
     = 
     'bootstrap' 
     , 
     type 
     = 
     str 
     , 
     required 
     = 
     True 
     ) 
     parser 
     . 
     add_argument 
     ( 
     '-t' 
     , 
     '--topic-name' 
     , 
     dest 
     = 
     'topic_name' 
     , 
     type 
     = 
     str 
     , 
     default 
     = 
     'example-topic' 
     , 
     required 
     = 
     False 
     ) 
     parser 
     . 
     add_argument 
     ( 
     '-n' 
     , 
     '--num_messages' 
     , 
     dest 
     = 
     'num_messages' 
     , 
     type 
     = 
     int 
     , 
     default 
     = 
     1 
     , 
     required 
     = 
     False 
     ) 
     args 
     = 
     parser 
     . 
     parse_args 
     () 
     token_provider 
     = 
     TokenProvider 
     () 
     config 
     = 
     { 
     'bootstrap.servers' 
     : 
     args 
     . 
     bootstrap 
     , 
     'security.protocol' 
     : 
     'SASL_SSL' 
     , 
     'sasl.mechanisms' 
     : 
     'OAUTHBEARER' 
     , 
     'oauth_cb' 
     : 
     token_provider 
     . 
     get_token 
     , 
     } 
     producer 
     = 
     confluent_kafka 
     . 
     Producer 
     ( 
     config 
     ) 
     def 
      
     callback 
     ( 
     error 
     , 
     message 
     ): 
     if 
     error 
     is 
     not 
     None 
     : 
     print 
     ( 
     error 
     ) 
     return 
     print 
     ( 
     "Delivered a message to 
     {} 
     [ 
     {} 
     ]" 
     . 
     format 
     ( 
     message 
     . 
     topic 
     (), 
     message 
     . 
     partition 
     ())) 
     for 
     i 
     in 
     range 
     ( 
     args 
     . 
     num_messages 
     ): 
     message 
     = 
     f 
     " 
     { 
     i 
     } 
     hello world!" 
     . 
     encode 
     ( 
     'utf-8' 
     ) 
     producer 
     . 
     produce 
     ( 
     args 
     . 
     topic_name 
     , 
     message 
     , 
     callback 
     = 
     callback 
     ) 
     producer 
     . 
     flush 
     () 
     
    
  5. You now need an implementation of the OAuth token provider. Save the following code in a file called tokenprovider.py :

      import 
      
     base64 
     import 
      
     datetime 
     import 
      
     http.server 
     import 
      
     json 
     import 
      
     google.auth 
     from 
      
     google.auth.transport.urllib3 
      
     import 
     Request 
     import 
      
     urllib3 
     import 
      
     time 
     def 
      
     encode 
     ( 
     source 
     ): 
      
     """Safe base64 encoding.""" 
     return 
     base64 
     . 
     urlsafe_b64encode 
     ( 
     source 
     . 
     encode 
     ( 
     'utf-8' 
     )) 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     . 
     rstrip 
     ( 
     '=' 
     ) 
     class 
      
     TokenProvider 
     ( 
     object 
     ): 
      
     """ 
     Provides OAuth tokens from Google Cloud Application Default credentials. 
     """ 
     HEADER 
     = 
     json 
     . 
     dumps 
     ({ 
     'typ' 
     : 
     'JWT' 
     , 
     'alg' 
     : 
     'GOOG_OAUTH2_TOKEN' 
     }) 
     def 
      
     __init__ 
     ( 
     self 
     , 
     ** 
     config 
     ): 
     self 
     . 
     credentials 
     , 
     _project 
     = 
     google 
     . 
     auth 
     . 
     default 
     () 
     self 
     . 
     http_client 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     get_credentials 
     ( 
     self 
     ): 
     if 
     not 
     self 
     . 
     credentials 
     . 
     valid 
     : 
     self 
     . 
     credentials 
     . 
     refresh 
     ( 
     Request 
     ( 
     self 
     . 
     http_client 
     )) 
     return 
     self 
     . 
     credentials 
     def 
      
     get_jwt 
     ( 
     self 
     , 
     creds 
     ): 
     token_data 
     = 
     dict 
     ( 
     exp 
     = 
     creds 
     . 
     expiry 
     . 
     replace 
     ( 
     tzinfo 
     = 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     . 
     timestamp 
     (), 
     iat 
     = 
     datetime 
     . 
     datetime 
     . 
     now 
     ( 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     . 
     timestamp 
     (), 
     iss 
     = 
     'Google' 
     , 
     sub 
     = 
     creds 
     . 
     service_account_email 
     ) 
     return 
     json 
     . 
     dumps 
     ( 
     token_data 
     ) 
     def 
      
     get_token 
     ( 
     self 
     , 
     args 
     ): 
     creds 
     = 
     self 
     . 
     get_credentials 
     () 
     token 
     = 
     '.' 
     . 
     join 
     ([ 
     encode 
     ( 
     self 
     . 
     HEADER 
     ), 
     encode 
     ( 
     self 
     . 
     get_jwt 
     ( 
     creds 
     )), 
     encode 
     ( 
     creds 
     . 
     token 
     ) 
     ]) 
     # compute expiry time 
     expiry_utc 
     = 
     creds 
     . 
     expiry 
     . 
     replace 
     ( 
     tzinfo 
     = 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     now_utc 
     = 
     datetime 
     . 
     datetime 
     . 
     now 
     ( 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     expiry_seconds 
     = 
     ( 
     expiry_utc 
     - 
     now_utc 
     ) 
     . 
     total_seconds 
     () 
     return 
     token 
     , 
     time 
     . 
     time 
     () 
     + 
     expiry_seconds 
     
    
  6. You are now ready to run the application:

     python  
    producer.py  
    -b  
    bootstrap. CLUSTER_ID 
    . REGION 
    .managedkafka. PROJECT_ID 
    .cloud.goog:9092 
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

Console

  1. Delete the VM instance.

    1. Go to the VM instancespage.

      Go to VM instances

    2. Select the VM and click Delete.

  2. Delete the Kafka cluster.

    1. Go to the Managed Service for Apache Kafka > Clusterspage.

      Go to Clusters

    2. Select the Kafka cluster and click Delete.

gcloud

  1. To delete the VM, use the gcloud compute instances delete command.

     gcloud  
    compute  
    instances  
    delete  
     VM_NAME 
      
    --zone = 
     ZONE 
     
    
  2. To delete the Kafka cluster, use the gcloud managed-kafka clusters delete command.

     gcloud  
    managed-kafka  
    clusters  
    delete  
     CLUSTER_ID 
      
     \ 
      
    --location = 
     REGION 
      
    --async 
    

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Create a Mobile Website
View Site in Mobile | Classic
Share by: