Avro Producer and Consumer — Schema Registry

Ranjeet Borate
3 min readMay 16, 2023

--

To understand the basics of Schema registry, it’s behavior and the working you can refer here. Now we are going to create a simple AVRO message producer and a consumer which will communicate with the schema registry in order to verify the schema definition on which both producer and consumers agreed upon.

Prerequisites

Before proceeding with the producer and consumer code we must ensure that the Schema Registry is up and running.

Both the producer and consumer must agree upon a common schema initially to produce and consume an Avro message.

If the schema registry is up, you must be sure if the mutually agreed schema is present on the local cache of the schema registry, if not learn how to create a schema in the local cache here.

Now as we have the mutually agreed schema available on the local cache we can proceed with creation of the producer and consumer.

Avro Producer

At first we need to fill up the properties which are required to make a connection with the bootstrap servers and the schema registry as given below:

private Properties setProducerProperties(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP_SERVER");
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "SCHEMA_REGISTRY_URL");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer"); //Optional
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "KEY_SERIALIZER");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "VALUE_SERIALIZER");
return props;
}

Now, if your application has to communicate with the bootstrap servers and schema registry servers which has security imposed on it, then you need to specify below given properties as well:

    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "TRUSTSTORE_LOCATION");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "PASSCODE");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "KEYSTORE_LOCATION");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "PASSCODE");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
props.put("sasl.kerberos.service.name", "NAME");

Apart from these properties you may need to add few more properties related to security as shown below:

static {
System.setProperty("sun.security.krb5.debug", "true");
System.setProperty("java.security.krb5.conf", "krb5.conf");
System.setProperty("java.security.auth.login.config", "client_jaas.conf");
System.setProperty("ssl.jaas.config","client_jaas.conf");
System.setProperty("ssl.krb.config","krb5.conf");
}

Now create the schema definition which you already have present in the schema registry local cache:

private String getStringifiedAvroSchema(){
return "{\"type\": \"record\",\"name\":\"USER\",\"fields\":[{\"name\":\"id\", \"type\":\"string\"},{\"name\":\"name\", \"type\":\"string\"},{\"name\":\"age\", \"type\":\"int\"},{\"name\":\"timestamp\", \"type\":\"long\"}]}";
}

Later we need to create a generic record which will accept the schema definition basis on which the generic record will be created:

private GenericRecord createGenericRecord(String schemaDefinition){
Schema schema = new Schema.Parser().parse(schemaDefinition);
GenericRecord record = new GenericData.Record(schema);
record.put("id","1");
record.put("name","2");
record.put("age",Integer.parseInt("3"));
record.put("timestamp",Long.parseLong("16092234543"));
return record;
}

At last, we should write a method which sends the data to the topic by creating the producer object as below:

public void produceAvroMessage(){
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(setProducerProperties());
GenericRecord genericRecord = createGenericRecord(getStringifiedAvroSchema());
ProducerRecord<String, GenericRecord> rec = new ProducerRecord<String, GenericRecord>("YOUR_TOPIC", "KEY", genericRecord);

producer.send(rec);

producer.flush();
producer.close();
}

Here, we created a producer by passing all the required properties after which we created a GenericRecord by passing the schema definition basis on which we created the record. This GenericRecord object is then passed to the ProducerRecord object along with the topic and key. Then the ProducerRecord object is finally sent using producer.send() method.

Consumer

Now, for writing the consumer we can reuse the above given method named setProducerProperties() with some properties to be added in it as given below:

props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

And finally we need some business logic as shown below using which we get consume the record and display it:

try(final Consumer<String, GenericRecord> consumer = new KafkaConsumer(props)){
consumer.subscribe(Collections.singleton(TOPIC_NAME));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));

for (final ConsumerRecord<String, GenericRecord> record : records) {
System.out.println(record.value());
}
consumer.commitAsync();
}
}

Conclusion

We were able to create and configure a producer and consumer which helps to produce and consume a message compatible with Avro Schema present on the Schema Registry.

--

--

Ranjeet Borate
Ranjeet Borate

Written by Ranjeet Borate

Interested in Tech • General Knowledge Awareness • Astronomy • Airforce and Aircrafts • History • Trekking

No responses yet