Produce Avro messages with the schema registry

Learn how to create a Google Cloud Managed Service for Apache Kafka cluster and write a Java producer application. The application demonstrates how to use the schema registry included with the service to work with Apache Avro messages.

Before you begin

Follow these steps to set up the gcloud CLI and a Google Cloud project. These are required to complete this guide.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  4. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  5. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project .

  7. Enable the Managed Service for Apache Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud  
    services  
     enable 
      
    managedkafka.googleapis.com  
     compute.googleapis.com  
     dns.googleapis.com
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  10. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  11. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verify that billing is enabled for your Google Cloud project .

  13. Enable the Managed Service for Apache Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud  
    services  
     enable 
      
    managedkafka.googleapis.com  
     compute.googleapis.com  
     dns.googleapis.com

Create a cluster

You need a Managed Service for Apache Kafka cluster in the Google Cloud location where you plan to use the schema registry. For this quickstart, the region used is REGION . To create a cluster, run the gcloud managed-kafka clusters create command.

 gcloud  
managed-kafka  
clusters  
create  
 CLUSTER_ID 
  
 \ 
  
--location = 
 REGION 
  
 \ 
  
--cpu = 
 3 
  
 \ 
  
--memory = 
3GiB  
 \ 
  
--subnets = 
projects/ PROJECT_ID 
/regions/ REGION 
/subnetworks/default  
 \ 
  
--async 

Replace the following:

  • CLUSTER_ID : a name for the cluster. For guidelines on how to name a cluster, see Cluster, topic, or consumer group ID .

  • PROJECT_ID : the name of the project that you created in the previous section. For guidelines on how to find the project ID, see Find the project name, number, and ID . After you run the command, you should get a response similar to the following:

  • REGION : the name of a Compute Engine region, such as us-central1 .

 Create request issued for: [ CLUSTER_ID 
] 

This operation returns immediately, but cluster creation might take around half an hour. You don't have to wait for the cluster creation to finish to start using the schema registry.

You can monitor the cluster status using the following command:

 gcloud  
managed-kafka  
clusters  
describe  
 CLUSTER_ID 
  
 \ 
  
--location = 
 REGION 
 

The output of the command is similar to the following:

 bootstrapAddress: bootstrap. CLUSTER_ID 
. REGION 
.managedkafka. PROJECT_ID 
.cloud.goog:9092
capacityConfig:
  memoryBytes: '3221225472'
  vcpuCount: '3'
createTime: '2024-05-28T04:32:08.671168869Z'
gcpConfig:
  accessConfig:
    networkConfigs:
    - subnet: projects/ PROJECT_ID 
/regions/ REGION 
/subnetworks/default
name: projects/ PROJECT_ID 
/locations/ REGION 
/clusters/ CLUSTER_ID 
rebalanceConfig:
  mode: AUTO_REBALANCE_ON_SCALE_UP
state: CREATING
updateTime: '2024-05-28T04:32:08.671168869Z' 

The state field is useful for monitoring the creation operation. You can use the cluster once the state turns to ACTIVE . The bootstrapAddress is the URL you use to connect to the cluster.

Set up a client VM

A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration. You can do this as follows:

 gcloud  
compute  
instances  
create  
test-instance  
 \ 
  
--scopes = 
https://www.googleapis.com/auth/cloud-platform  
 \ 
  
--subnet = 
projects/ PROJECT_ID 
/regions/ REGION 
/subnetworks/default  
 \ 
  
--zone = 
 REGION 
-c 

This VM has a Compute Engine default service account of the following format:

  PROJECT_NUMBER 
-compute@developer.gserviceaccount.com 

Replace PROJECT_NUMBER with the name of the project that contains the cluster. You can look up the project number using the gcloud projects describe command:

 gcloud  
projects  
describe  
 PROJECT_ID 
 ` 
. 

Your producer application uses this service account to authenticate with the Managed Service for Apache Kafka API. This service account needs the following roles:

  • The Managed Service for Apache Kafka Client role ( roles/managedkafka.client ): provides access to connect to the Managed Service for Apache Kafka cluster.

  • The Managed Service for Apache Kafka Schema Registry Admin role ( roles/managedkafka.schemaRegistryAdmin ): provides full access to the Managed Service for Apache Kafka schema registry.

  • The Service Account Token Creator role ( roles/iam.serviceAccountTokenCreator ): required to create OAuth tokens used to authenticate with the registry service.

To grant the required permissions, run the gcloud projects add-iam-policy-binding command.

 gcloud  
projects  
add-iam-policy-binding  
 \ 
 PROJECT_ID 
  
 \ 
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
--role = 
roles/managedkafka.client

gcloud  
projects  
add-iam-policy-binding  
 \ 
 PROJECT_ID 
  
 \ 
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
--role = 
roles/managedkafka.schemaRegistryAdmin

gcloud  
projects  
add-iam-policy-binding  
 \ 
 PROJECT_ID 
  
 \ 
--member = 
 "serviceAccount: PROJECT_NUMBER 
-compute@developer.gserviceaccount.com" 
  
 \ 
--role = 
roles/iam.serviceAccountTokenCreator 

Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using gcloud projects describe PROJECT_ID .

Set up an Apache Maven project

  1. Connect to the client VM using SSH. One way to do this is to run the following command:

     gcloud  
    compute  
    ssh  
    --project = 
     PROJECT_ID 
      
     \ 
    --zone = 
     REGION 
    -f  
    test-instance 
    

    For more information about connecting using SSH, see About SSH connections .

  2. Install Java and Maven with the command:

     sudo  
    apt-get  
    install  
    maven  
    openjdk-17-jdk 
    
  3. Set up an Apache Maven project.

    Use the following command to create a package com.google.example in a directory called demo .

     mvn  
    archetype:generate  
    -DartifactId = 
    demo  
    -DgroupId = 
    com.google.example \ 
      
    -DarchetypeArtifactId = 
    maven-archetype-quickstart \ 
      
    -DarchetypeVersion = 
     1 
    .5  
    -DinteractiveMode = 
     false 
     
    

Define the schema and its Java implementation

In this example, a message represents a "user" that has a name and an optional ID. The corresponds to an Avro schema with two fields: a required field name of type string and an optional integer id. To use this schema in a Java program, you will also need to generate an Java implementation of an object corresponding to this schema.

  1. Change into the project directory with cd demo .

  2. Create the folders for storing schema files in your code:

     mkdir  
    -p  
    src/main/avro 
    
  3. Create the Avro schema definition by pasting the following code into a file called src/main/avro/User.avsc :

      { 
      
     "namespace" 
     : 
      
     "com.google.example" 
     , 
      
     "type" 
     : 
      
     "record" 
     , 
      
     "name" 
     : 
      
     "User" 
     , 
      
     "fields" 
     : 
      
     [ 
      
     { 
     "name" 
     : 
      
     "name" 
     , 
      
     "type" 
     : 
      
     "string" 
     }, 
      
     { 
     "name" 
     : 
      
     "id" 
     , 
      
     "type" 
     : 
      
     [ 
     "int" 
     , 
      
     "null" 
     ]} 
      
     ] 
     } 
     
    
  4. Configure your Maven project to use an Avro Java code generation plugin by adding the following to the build node of your pom.xml. Note that the pom.xml may have other plugins nodes inside the pluginManagement node. Don't change the pluginManagement node in this step. The plugins node needs to be at the same level level as pluginManagement .

     <plugins>
    <plugin>  
    <groupId>org.apache.avro</groupId>  
    <artifactId>avro-maven-plugin</artifactId>  
    <version>1.11.1</version>  
    <executions>  
    <execution>  
    <phase>generate-sources</phase>  
    <goals>  
    <goal>schema</goal>  
    </goals>  
    <configuration>  
    <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>  
    <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>  
    </configuration>  
    </execution>  
    </executions>
    </plugin>
    </plugins> 
    
  5. Add Avro as a dependency by adding the following to the end of the project/dependencies node of pom.xml . Note that the pom.xml already has a dependencies node inside dependencyManagement tag. Don't change the dependncyManagement node in this step.

     <dependency>  
    <groupId>org.apache.avro</groupId>  
    <artifactId>avro</artifactId>  
    <version>1.11.1</version>
    </dependency> 
    
  6. Generate Java sources

       
    mvn  
    generate-sources 
    
  7. Run the following command to check that the implementation source file was created. The source is a Java class file that implements constructors, accessors, serializers and de-serializers for User objects. You will use this class in the producer code.

     cat  
    src/main/java/com/google/example/User.java 
    

Create a producer client

This section walks through the steps of writing, building, and running a producer client.

Implement the producer

The producer uses KafkaAvroSerializer.java to encode messages and manage their schemas. The serializer automatically connects to the schema registry, registers the schema under a subject, retrieves its ID, and then serializes the message using Avro. You still need to configure the producer and serializer.

  1. Create the producer client class by pasting the following code into a new file called src/main/java/com/google/example/UserProducer.java

      package 
      
     com.google.example 
     ; 
     import 
      
     java.util.Properties 
     ; 
     import 
      
     org.apache.kafka.clients.producer.KafkaProducer 
     ; 
     import 
      
     org.apache.kafka.clients.producer.ProducerRecord 
     ; 
     import 
      
     org.apache.kafka.clients.producer.RecordMetadata 
     ; 
     import 
      
     org.apache.kafka.clients.producer.ProducerConfig 
     ; 
     import 
      
     org.apache.kafka.common.serialization.StringSerializer 
     ; 
     import 
      
     io.confluent.kafka.serializers.KafkaAvroSerializer 
     ; 
      
     public 
      
     class 
     UserProducer 
      
     { 
      
     private 
      
     static 
      
     Properties 
      
     configure 
     () 
      
     throws 
      
     Exception 
      
     { 
      
     Properties 
      
     p 
      
     = 
      
     new 
      
     Properties 
     (); 
      
     p 
     . 
     load 
     ( 
     new 
      
     java 
     . 
     io 
     . 
     FileReader 
     ( 
     "client.properties" 
     )); 
      
     p 
     . 
     put 
     ( 
     ProducerConfig 
     . 
     KEY_SERIALIZER_CLASS_CONFIG 
     , 
      
     StringSerializer 
     . 
     class 
     ); 
      
     p 
     . 
     put 
     ( 
     ProducerConfig 
     . 
     VALUE_SERIALIZER_CLASS_CONFIG 
     , 
      
     KafkaAvroSerializer 
     . 
     class 
     ); 
      
     return 
      
     p 
     ; 
      
     } 
      
     public 
      
     static 
      
     void 
      
     main 
     ( 
     String 
     [] 
      
     args 
     ) 
      
     throws 
      
     Exception 
      
     { 
      
     Properties 
      
     p 
      
     = 
      
     configure 
     (); 
      
     KafkaProducer<String 
     , 
      
     User 
    >  
     producer 
      
     = 
      
     new 
      
     KafkaProducer<String 
     , 
      
     User 
    > ( 
     p 
     ); 
      
     final 
      
     User 
      
     u 
      
     = 
      
     new 
      
     User 
     ( 
     "SchemaEnthusiast" 
     , 
      
     42 
     ); 
      
     final 
      
     String 
      
     topicName 
      
     = 
      
     "newUsers" 
     ; 
      
     ProducerRecord<String 
     , 
      
     User 
    >  
     message 
      
     = 
      
     new 
      
     ProducerRecord<String 
     , 
      
     User 
    > ( 
     topicName 
     , 
      
     "" 
     , 
      
     u 
     ); 
      
     producer 
     . 
     send 
     ( 
     message 
     , 
      
     new 
      
     SendCallback 
     ()); 
      
     producer 
     . 
     close 
     (); 
      
     } 
      
     } 
     
    
  2. Define the callback class in src/main/java/com/google/example/SendCallback.java :

      package 
      
     com.google.example 
     ; 
     import 
      
     org.apache.kafka.clients.producer.Callback 
     ; 
     import 
      
     org.apache.kafka.clients.producer.RecordMetadata 
     ; 
     class 
     SendCallback 
      
     implements 
      
     Callback 
      
     { 
      
     public 
      
     void 
      
     onCompletion 
     ( 
     RecordMetadata 
      
     m 
     , 
      
     Exception 
      
     e 
     ){ 
      
     if 
      
     ( 
     e 
      
     == 
      
     null 
     ){ 
      
     System 
     . 
     out 
     . 
     println 
     ( 
     "Produced a message successfully." 
     ); 
      
     } 
      
     else 
      
     { 
      
     System 
     . 
     out 
     . 
     println 
     ( 
     e 
     . 
     getMessage 
     ()); 
      
     } 
      
     } 
     } 
     
    
  3. To compile this code, you need the org.apache.kafka.clients package and the serializer code. The serializer Maven artifact is distributed through a custom repository. Add the following node to the project node of your pom.xml to configure this repository:

       
    <repositories>  
    <repository>  
    <id>confluent</id>  
    <name>Confluent</name>  
    <url>https://packages.confluent.io/maven/</url>  
    </repository>  
    </repositories> 
    
  4. Add the following to the dependencies node in your pom.xml file:

       
    <dependency>  
    <groupId>org.slf4j</groupId>  
    <artifactId>slf4j-simple</artifactId>  
    <version>1.7.32</version>  
    </dependency>  
    <dependency>  
    <groupId>io.confluent</groupId>  
    <artifactId>kafka-avro-serializer</artifactId>  
    <version>7.8.1</version>  
    </dependency>  
    <dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>3.7.2</version>  
    </dependency> 
    
  5. To make sure all the dependencies are properly resolved, compile the client:

     mvn  
    compile 
    

Create a schema registry

To create a schema registry, run the following command:

 gcloud  
beta  
managed-kafka  
schema-registries  
create  
 REGISTRY_ID 
  
 \ 
  
--location = 
 REGION 
 

Replace the following:

  • REGISTRY_ID : a unique identifier for your new schema registry. This forms part of the registry's resource name. The name must start with a letter, contain only letters (a-z, A-Z) , numbers (0-9) , and underscores (_) , and be 63 characters or less.

  • REGION : Google Cloud region where the schema registry is going to be created. This location must match the region of the Kafka cluster or clusters using this registry.

The schema definition that you have created is not yet uploaded to the registry. The producer client does this the first time it runs in the following steps.

Configure and run the producer

At this point the producer won't run since it is not fully configured. To configure the producer, provide both the Kafka and schema registry configuration.

  1. Create a file called client.properties in the same directory as your pom.xml and add the following content to it:

     bootstrap.servers=bootstrap. CLUSTER_ID 
    . REGION 
    .managedkafka. PROJECT_ID 
    .cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/ PROJECT_ID 
    /locations/ REGION 
    /schemaRegistries/ REGISTRY_ID 
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider 
    

    Add the Kafka and schema registry authentication handler dependencies to your Maven project by inserting the following to the dependencies node of pom.xml above the kafka-avro-serializer dependency:

       
    <dependency>  
    <groupId>com.google.cloud.hosted.kafka</groupId>  
    <artifactId>managed-kafka-auth-login-handler</artifactId>  
    <version>1.0.6</version>  
    <exclusions>  
    <exclusion>  
    <groupId>io.confluent</groupId>  
    <artifactId>kafka-schema-registry-client</artifactId>  
    </exclusion>  
    </exclusions>  
    </dependency> 
    

    If you would like to see the implementation of the custom schema registry authentcation handler authentication handler, look at the GcpBearerAuthCredentialProvider class.

  2. Compile and run the producer client:

     mvn  
    compile  
    -q  
    exec:java  
    -Dexec.mainClass = 
    com.google.example.UserProducer 
    

    If all goes well, you see the output Produced a message successfully generated by the SendCallback class.

Examine the output

  1. Check that the User schema has been registered under a subject name derived from the topic and schema names:

      SR_DOMAIN 
     = 
    https://managedkafka.googleapis.com SR_PATH 
     = 
    /v1/projects/ PROJECT_ID 
    /locations/ REGION 
     SR_HOST 
     = 
     $SR_DOMAIN 
    / $SR_PATH 
    /schemaRegistries/ REGISTRY_ID 
    /subjects
    
    curl  
    -X  
    GET  
     \ 
      
    -H  
     "Content-Type: application/vnd.schemaregistry.v1+json" 
      
     \ 
      
    -H  
     "Authorization: Bearer 
     $( 
    gcloud  
    auth  
    print-access-token ) 
     " 
     \ 
      
     $SR_HOST 
     
    

    The output of this command should look like this:

     ["newUsers-value"] 
    
  2. Check that the schema registered in the repository is the same as User :

     curl  
    -X  
    GET  
     \ 
      
    -H  
     "Content-Type: application/vnd.schemaregistry.v1+json" 
      
     \ 
      
    -H  
     "Authorization: Bearer 
     $( 
    gcloud  
    auth  
    print-access-token ) 
     " 
      
     \ 
      
     $SR_HOST 
    /newUsers-value/versions/1 
    

    The output of the command should look like this:

     {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    } 
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

To delete the schema registry, run the gcloud beta managed-kafka schema-registries delete command:

 gcloud  
beta  
managed-kafka  
schema-registries  
delete  
 REGISTRY_ID 
  
 \ 
  
--location = 
 REGION 
 

To delete the cluster, run the gcloud managed-kafka clusters delete command:

 gcloud  
managed-kafka  
clusters  
delete  
 CLUSTER_ID 
  
--location = 
 REGION 
 

To delete the VM, run the gcloud compute instances delete command:

 gcloud  
instances  
delete  
test-instance  
 \ 
  
--zone = 
 REGION 
-c 

What's next

Apache Kafka® and Apache Avro are registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Confluent is a registered trademark of Confluent, Inc.
Design a Mobile Site
View Site in Mobile | Classic
Share by: