Develop a Python producer application

Learn how to develop a Python producer application that authenticates with a Managed Service for Apache Kafka cluster by using Application Default Credentials (ADC). ADC lets applications running on Google Cloud automatically find and use the right credentials for authenticating to Google Cloud services.

Before you start this tutorial, follow the steps in Create a cluster in Managed Service for Apache Kafka .

Before you begin

Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka .

If you already have a Managed Service for Apache Kafka cluster, you can skip this step.

Set up a client VM

A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration.

  1. To create the client VM, run the following command:

     gcloud  
    compute  
    instances  
    create  
    test-instance  
     \ 
      
    --scopes = 
    https://www.googleapis.com/auth/cloud-platform  
     \ 
      
    --subnet = 
    projects/ PROJECT_ID 
    /regions/ REGION 
    /subnetworks/default  
     \ 
      
    --zone = 
     REGION 
    -c 
    
  2. Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role ( roles/managedkafka.client ), the Service Account Token Creator role ( roles/iam.serviceAccountTokenCreator ), and the Service Account OpenID Token Creator role ( roles/iam.serviceAccountOpenIdTokenCreator ).

     gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/managedkafka.client
    
    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/iam.serviceAccountTokenCreator
    
    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
     \ 
      
    --member = 
     "serviceAccount: PROJECT_NUMBER 
    -compute@developer.gserviceaccount.com" 
      
     \ 
      
    --role = 
    roles/iam.serviceAccountOpenIdTokenCreator 
    

    Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using gcloud projects describe PROJECT_ID .

Create a Python producer application

  1. Connect to the client VM using SSH. One way to do this is to run the following command:

     gcloud  
    compute  
    ssh  
    --project = 
     PROJECT_ID 
      
     \ 
      
    --zone = 
     REGION 
    -f  
    test-instance 
    

    Replace PROJECT_ID with your Google Cloud project name.

    For more information about connecting using SSH, see About SSH connections .

  2. Install pip, a Python package manager and the virtual environment manager:

     sudo  
    apt  
    install  
    python3-pip  
    -y
    sudo  
    apt  
    install  
    python3-venv  
    -y 
    
  3. Create a new virtual environment (venv) and activate it:

     python3  
    -m  
    venv  
    kafka source 
      
    kafka/bin/activate 
    
  4. Install the confluent-kafka client and other dependencies:

     pip  
    install  
    confluent-kafka  
    google-auth  
    urllib3  
    packaging 
    
  5. Copy the following producer client code into a file called producer.py

      import 
      
     confluent_kafka 
     import 
      
     argparse 
     from 
      
     tokenprovider 
      
     import 
     TokenProvider 
     parser 
     = 
     argparse 
     . 
     ArgumentParser 
     () 
     parser 
     . 
     add_argument 
     ( 
     '-b' 
     , 
     '--bootstrap-servers' 
     , 
     dest 
     = 
     'bootstrap' 
     , 
     type 
     = 
     str 
     , 
     required 
     = 
     True 
     ) 
     parser 
     . 
     add_argument 
     ( 
     '-t' 
     , 
     '--topic-name' 
     , 
     dest 
     = 
     'topic_name' 
     , 
     type 
     = 
     str 
     , 
     default 
     = 
     'example-topic' 
     , 
     required 
     = 
     False 
     ) 
     parser 
     . 
     add_argument 
     ( 
     '-n' 
     , 
     '--num_messages' 
     , 
     dest 
     = 
     'num_messages' 
     , 
     type 
     = 
     int 
     , 
     default 
     = 
     1 
     , 
     required 
     = 
     False 
     ) 
     args 
     = 
     parser 
     . 
     parse_args 
     () 
     token_provider 
     = 
     TokenProvider 
     () 
     config 
     = 
     { 
     'bootstrap.servers' 
     : 
     args 
     . 
     bootstrap 
     , 
     'security.protocol' 
     : 
     'SASL_SSL' 
     , 
     'sasl.mechanisms' 
     : 
     'OAUTHBEARER' 
     , 
     'oauth_cb' 
     : 
     token_provider 
     . 
     get_token 
     , 
     } 
     producer 
     = 
     confluent_kafka 
     . 
     Producer 
     ( 
     config 
     ) 
     def 
      
     callback 
     ( 
     error 
     , 
     message 
     ): 
     if 
     error 
     is 
     not 
     None 
     : 
     print 
     ( 
     error 
     ) 
     return 
     print 
     ( 
     "Delivered a message to 
     {} 
     [ 
     {} 
     ]" 
     . 
     format 
     ( 
     message 
     . 
     topic 
     (), 
     message 
     . 
     partition 
     ())) 
     for 
     i 
     in 
     range 
     ( 
     args 
     . 
     num_messages 
     ): 
     message 
     = 
     f 
     " 
     { 
     i 
     } 
     hello world!" 
     . 
     encode 
     ( 
     'utf-8' 
     ) 
     producer 
     . 
     produce 
     ( 
     args 
     . 
     topic_name 
     , 
     message 
     , 
     callback 
     = 
     callback 
     ) 
     producer 
     . 
     flush 
     () 
     
    
  6. You now need an implementation of the OAuth token provider. Save the following code in a file called tokenprovider.py :

      import 
      
     base64 
     import 
      
     datetime 
     import 
      
     http.server 
     import 
      
     json 
     import 
      
     google.auth 
     from 
      
     google.auth.transport.urllib3 
      
     import 
     Request 
     import 
      
     urllib3 
     import 
      
     time 
     def 
      
     encode 
     ( 
     source 
     ): 
      
     """Safe base64 encoding.""" 
     return 
     base64 
     . 
     urlsafe_b64encode 
     ( 
     source 
     . 
     encode 
     ( 
     'utf-8' 
     )) 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     . 
     rstrip 
     ( 
     '=' 
     ) 
     class 
      
     TokenProvider 
     ( 
     object 
     ): 
      
     """ 
     Provides OAuth tokens from Google Cloud Application Default credentials. 
     """ 
     HEADER 
     = 
     json 
     . 
     dumps 
     ({ 
     'typ' 
     : 
     'JWT' 
     , 
     'alg' 
     : 
     'GOOG_OAUTH2_TOKEN' 
     }) 
     def 
      
     __init__ 
     ( 
     self 
     , 
     ** 
     config 
     ): 
     self 
     . 
     credentials 
     , 
     _project 
     = 
     google 
     . 
     auth 
     . 
     default 
     () 
     self 
     . 
     http_client 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     get_credentials 
     ( 
     self 
     ): 
     if 
     not 
     self 
     . 
     credentials 
     . 
     valid 
     : 
     self 
     . 
     credentials 
     . 
     refresh 
     ( 
     Request 
     ( 
     self 
     . 
     http_client 
     )) 
     return 
     self 
     . 
     credentials 
     def 
      
     get_jwt 
     ( 
     self 
     , 
     creds 
     ): 
     token_data 
     = 
     dict 
     ( 
     exp 
     = 
     creds 
     . 
     expiry 
     . 
     timestamp 
     (), 
     iat 
     = 
     datetime 
     . 
     datetime 
     . 
     now 
     ( 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     . 
     timestamp 
     (), 
     iss 
     = 
     'Google' 
     , 
     scope 
     = 
     'kafka' 
     , 
     sub 
     = 
     creds 
     . 
     service_account_email 
     , 
     ) 
     return 
     json 
     . 
     dumps 
     ( 
     token_data 
     ) 
     def 
      
     get_token 
     ( 
     self 
     , 
     args 
     ): 
     creds 
     = 
     self 
     . 
     get_credentials 
     () 
     token 
     = 
     '.' 
     . 
     join 
     ([ 
     encode 
     ( 
     self 
     . 
     HEADER 
     ), 
     encode 
     ( 
     self 
     . 
     get_jwt 
     ( 
     creds 
     )), 
     encode 
     ( 
     creds 
     . 
     token 
     ) 
     ]) 
     # compute expiry time explicitly 
     utc_expiry 
     = 
     creds 
     . 
     expiry 
     . 
     replace 
     ( 
     tzinfo 
     = 
     datetime 
     . 
     timezone 
     . 
     utc 
     ) 
     expiry_seconds 
     = 
     ( 
     utc_expiry 
     - 
     datetime 
     . 
     datetime 
     . 
     now 
     ( 
     datetime 
     . 
     timezone 
     . 
     utc 
     )) 
     . 
     total_seconds 
     () 
     return 
     token 
     , 
     time 
     . 
     time 
     () 
     + 
     expiry_seconds 
     
    
  7. You are now ready to run the application:

     python  
    producer.py  
    -b  
    bootstrap. CLUSTER_ID 
    . REGION 
    .managedkafka. PROJECT_ID 
    .cloud.goog:9092 
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

To delete the cluster, run the gcloud managed-kafka clusters delete command:

 gcloud  
managed-kafka  
clusters  
delete  
 CLUSTER_ID 
  
--location = 
 REGION 
 

To delete the VM, run the gcloud compute instances delete command:

 gcloud  
instances  
delete  
test-instance  
 \ 
  
--zone = 
 REGION 
-c 

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Design a Mobile Site
View Site in Mobile | Classic
Share by: