Application Development

Once you have the Confluent Platform up and running, whether locally for testing or in a production setting, you’re ready to start using Kafka to produce and consume data. This document helps you understand the tools available for interacting with Kafka and choose the best one for your application. Before getting started, make sure to understand the components of the Confluent Platform, especially the Schema Registry.

There are multiple ways for applications to interact with Kafka through the Confluent Platform. Whichever method you choose for your application, the most important factor is to ensure that your application is coordinating with the Schema Registry to manage schemas and guarantee data compatibility. Luckily, the Confluent Platform makes this easy and nearly transparent to the application developer.

There are two tools for interacting with Kafka: serializers for the clients that ship with Kafka and the REST Proxy. Most commonly you will use the serializers if your application is developed in Java (or a compatible language such as Scala) and the REST Proxy for applications written in other languages.

Java Applications: Serializers

Java applications can use the standard Kafka producers and consumers, but will substitute the default ByteArraySerializer with io.confluent.kafka.serializers.KafkaAvroSerializer (and the equivalent deserializer), allowing Avro data to be passed into the producer directly and allowing the consumer to deserialize and return Avro data.

For a Maven project, include dependencies within the dependencies tag for the Avro serializer and for your desired version of Kafka:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>2.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.1-cp1</version>
    <scope>provided</scope>
</dependency>

In your code, you can create Kafka producers and consumers just as you normally would, with two adjustments:

  1. The generic types for key and value should be Object. This allows you to pass in primitive types, Maps, and Records.

  2. Set the key/value serializer or deserializer and Schema Registry URL options. For example if you were configuring a producer directly in your code:

    Properties props = new Properties();
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    // Set any other properties
    KafkaProducer producer = new KafkaProducer(props);
    

    We recommend these values be set using a properties file that your application loads and passes to the producer constructor. The settings are similar for the deserializer.

Now your application code can send Avro data that will be automatically serialized and the schema will be registered or validated against the Schema Registry. The application code is essentially the same as using Kafka without the Confluent Platform:

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

This brief overview should get you started, and you can find more detailed information in the Serializer and Formatter section of the Schema Registry documentation.

Non-Java Applications: REST Proxy

The REST Proxy should be used for non-Java applications. It is a convenient, language-agnostic method for interacting with Kafka. Almost all standard libraries have good support for HTTP and JSON, so even if a wrapper of the API does not exist for your language it should still be easy to use the API. It also automatically translates between Avro and JSON. This simplifies writing applications in languages that do not have good Avro support.

The REST Proxy API Reference describes the complete API in detail, but we will highlight some key interactions here. First, you will want to produce data to Kafka. To do so, construct a POST request to the /topics/{topicName} resource including the schema for the data (plain integers in this example) and a list of records, optionally including the partition for each record.

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Note that REST Proxy relies on content type information to properly convert data to Avro, so you must specify the Content-Type header. The response includes the same information you would receive from the Java clients API about the partition and offset of the published data (or errors in case of failure). Additionally, it includes the schema IDs it registered or looked up in the Schema Registry.

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

In future requests, you can use this schema ID instead of the full schema, reducing the overhead for each request. You can also produce data to specific partitions using a similar request format with the /topics/{topicName}/partitions/{partition} endpoint.

To achieve good throughput, it is important to batch your produce requests so that each HTTP request contains many records. Depending on durability and latency requirements, this can be as simple as maintaining a queue of records and only send a request when the queue has reached a certain size or a timeout is triggered.

Consuming data is a bit more complex because consumers are stateful. However, it still only requires two API calls to get started. See the API Reference for complete details and examples.

Finally, the API also provides metadata about the cluster, such as the set brokers, list of topics, and per-partition information. However, most applications will not need to use these endpoints.

Note that it is also possible to use non-Java clients developed by the community and manage registration and schema validation manually using the Schema Registry API. However, as this is error-prone and must be duplicated across every application, we recommend using the REST Proxy unless you need features that are not exposed via the REST Proxy.