Serializer and Formatter

In this document, we describe how to use Avro in Kafka java client and Kafka console tools.

Assuming that you have the Schema Registry source code checked out at /tmp/schema-registry, the following is how you can obtain all needed jars.

mvn package

The jars can be found in

/tmp/schema-registrypackage/target/package-$VERSION-package/share/java/avro-serializer/

Serializer

You can plug KafkaAvroSerializer into KafkaProducer to send messages of Avro type to Kafka. Currently, we support primitive types of null, Boolean, Integer, Long, Float, Double, String, byte[], and complex type of IndexedRecord. Sending data of other types to KafkaAvroSerializer will cause a SerializationException. Typically, IndexedRecord will be used for the value of the Kafka message. If used, the key of the Kafka message is often of one of the primitive types. When sending a message to a topic t, the Avro schema for the key and the value will be automatically registered in the schema registry under the subject t-key and t-value, respectively, if the compatibility test passes. The only exception is that the null type is never registered in the schema registry.

In the following example, we send a message with key of type string and value of type Avro record to Kafka. A SerializationException may occur during the send call, if the data is not well formed.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

You can plug in KafkaAvroDecoder to KafkaConsumer to receive messages of any Avro type from Kafka. In the following example, we receive messages with key of type string and value of type Avro record from Kafka. When getting the message key or value, a SerializationException may occur if the data is not well formed.

import org.apache.avro.generic.IndexedRecord;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Properties;

Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "group1");
props.put("schema.registry.url", "http://localhost:8081");

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));

VerifiableProperties vProps = new VerifiableProperties(props);
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(vProps);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(vProps);

kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(vProps));

Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(
    topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
  MessageAndMetadata messageAndMetadata = it.next();
  try {
    String key = (String) messageAndMetadata.key();
    String value = (IndexedRecord) messageAndMetadata.message();

    ...
  } catch(SerializationException e) {
    // may need to do something with it
  }
}

We recommend users use the new producer in org.apache.kafka.clients.producer.KafkaProducer. If you are using a version of Kafka older than 0.8.2.0, you can plug KafkaAvroEncoder into the old producer in kafka.javaapi.producer. However, there will be some limitations. You can only use KafkaAvroEncoder for serializing the value of the message and only send value of type Avro record. The Avro schema for the value will be registered under the subject recordName-value, where recordName is the name of the Avro record. Because of this, the same Avro record type shouldn’t be used in more than one topic.

In the following example, we send a message with key of type string and value of type Avro record to Kafka. Note that unlike the example in the new producer, we use a StringEncoder for serializing the key and therefore there is no schema registration for the key.

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.VerifiableProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Properties;

Properties props = new Properties();
props.put("serializer.class", "io.confluent.kafka.serializers.KafkaAvroEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", brokerList);
props.put("schema.registry.url", "http://localhost:8081");

Producer producer = new Producer<String, Object>(new ProducerConfig(props));
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");

KeyedMessage<String, Object> message = new KeyedMessage<String, Object>(topic, key, avroRecord);
producer.send(message);

Formatter

You can use kafka-avro-console-producer and kafka-avro-console-consumer respectively to send and receive Avro data in JSON format from the console. Under the hood, they use AvroMessageReader and AvroMessageFormatter to convert between Avro and JSON.

To run the Kafka console tools, first make sure that Zookeeper, Kafka and the Schema Registry server are all started. In the following examples, we use the default value of the schema registry URL. You can configure that by supplying

--property schema.registry.url=address of your schema registry

in the commandline arguments of kafka-avro-console-producer and kafka-avro-console-consumer.

In the following example, we send Avro records in JSON as the message value (make sure there is no space in the schema string).

bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t1 \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

In the shell, type in the following.
  {"f1": "value1"}

In the following example, we read the value of the messages in JSON.

bin/kafka-avro-console-consumer --topic t1 \
  --zookeeper localhost:2181

You should see following in the console.
  {"f1": "value1"}

In the following example, we send strings and Avro records in JSON as the key and the value of the message, respectively.

bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t2 \
  --property parse.key=true \
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

In the shell, type in the following.
  "key1" \t {"f1": "value1"}

In the following example, we read both the key and the value of the messages in JSON,

bin/kafka-avro-console-consumer --topic t2 \
  --zookeeper localhost:2181 \
  --property print.key=true

You should see following in the console.
   "key1" \t {"f1": "value1"}