Kafka Producers

The Confluent Platform includes the standard Java producer shipped with Kafka itself and librdkafka, the C/C++ client library. This section gives a high-level overview of how the producer works, an introduction to the configuration settings for tuning, and some examples from both client libraries.

Concepts

The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. Its main function is to map each message to a topic partition and send a produce request to the leader of that partition. It does the first of these with a partitioner, which typically selects a partition using a hash function. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition. If no key is provided, then the partition is selected in a round-robin fashion to ensure an even distribution across the topic partitions.

Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas. Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.

Messages written to the partition leader are not immediately readable by consumers regardless of the producer’s acknowledgement settings. When all in-sync replicas have acknowledged the write, then the message is considered committed, which makes it available for reading. This ensures that messages cannot be lost by a broker failure after they have already been read. Note that this implies that messages which were acknowledged by the leader only (i.e. acks=1) can be lost if the partition leader fails before the replicas have copied the message. Nevertheless, this is often a reasonable compromise in practice to ensure durability in most cases while not impacting throughput too significantly.

Most of the subtlety around producers is tied to achieving high throughput with batching/compression and ensuring message delivery guarantees as mentioned above. In the next section, we discuss the most common settings to tune producer behavior.

Configuration

The full list of configuration settings are available in the Kafka documentation Below we highlight several of the key configuration settings and how they affect the producer’s behavior.

Core Configuration: You are required to set the bootstrap.servers property so that the producer can find the Kafka cluster. Although not required, we recommend always setting a client.id since this allows you to easily correlate requests on the broker with the client instance which made it.

Message Durability: You can control the durability of messages written to Kafka through the acks setting. The default value of “1” requires an explicit acknowledgement from the partition leader that the write succeeded. The strongest guarantee that Kafka provides is with “acks=all”, which guarantees that not only did the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. You can also use a value of “0” to maximize throughput, but you will have no guarantee that the message was successfully written to the broker’s log since the broker does not even send a response in this case. This also means that you will not be able to determine the offset of the message. Note that for librdkafka, this setting is named request.required.acks, and the values “-1” is used to indicate acknowledgement by all in-sync replicas.

Message Ordering: In general, messages are written to the broker in the same order that they are received by the producer client. However, if you enable message retries by setting retries to a value larger than 0 (which is the default), then message reordering may occur since the retry may occur after a following write succeeded. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time. Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures. For librdkafka, the setting is message.send.max.retries and the default is 2.

Batching and Compression: The producer attempts to collect sent messages into batches to improve throughput. Use batch.size (batch.num.messages in librdkafka) to control the maximum number of messages in each batch and buffer.memory to limit the total memory used to store each batch. The number of records in a batch will never exceed batch.size and the total size of will never exceed buffer.memory. To improve batching, you can use linger.ms to have the producer delay sending to give more time for batches to be filled. Compression can be enabled with the compression.type setting. Compression covers full message batches, so larger batches will typically mean a higher compression ratio.

Examples

Initial Setup

The Java producer is constructed with a standard Properties file.

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "host1:9092,host2:9092");
config.put("acks", "all");
new KafkProducer<K, V>(config);

Configuration errors will result in a raised KafkaException from the constructor of KafkaProducer. The main difference in librdkafka is that we handle the errors for each setting directly.

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
  fprintf(stderr, "%% Failed to lookup hostname\n");
  exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "request.required.acks", "-1",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

/* Create Kafka producer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr)))) {
  fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
  exit(1);
}

Asynchronous Writes

All writes are asynchronous by default. The Java producer includes a send() API which returns a future which can be polled to get the result of the send.

final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);

With librdkafka, you first need to create a rd_kafka_topic_t handle for the topic you want to write to. Then you can use rd_kafka_produce to send messages to it. For example:

rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);

if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
                     RD_KAFKA_MSG_F_COPY,
                     payload, payload_len,
                     key, key_len,
                     NULL) == -1) {
  fprintf(stderr, "%% Failed to produce to topic %s: %s\n",
       topic, rd_kafka_err2str(rd_kafka_errno2err(errno)));
}

You can pass topic-specific configuration in the third argument to rd_kafka_topic_new, but here we passed NULL to get the default configuration.

The second argument to rd_kafka_produce can be used to set the desired partition for the message. If set to RD_KAFKA_PARTITION_UA, as in this case, librdkafka will use the default partitioner to select the partition for this message. The third argument indicates that librdkafka should copy the payload and key, which would let us free it upon returning.

If you do not want to wait on the returned future, then you can also provide a callback in send(), which will be invoked upon completion of the write. Just one word of caution, however: you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.

final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null)
      log.debug("Send failed for record {}", record, e);
  }
});

A similar feature is available in librdkafka, but we have to configure it on initialization:

static void on_delivery(rd_kafka_t *rk,
                        const rd_kafka_message_t *rkmessage
                        void *opaque) {
  if (rkmessage->err)
    fprintf(stderr, "%% Message delivery failed: %s\n",
            rd_kafka_message_errstr(rkmessage));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);

  // initialization ommitted
}

The delivery callback in librdkafka is invoked in the user’s thread by calling rd_kafka_poll. A common pattern is to call this function after every call to the produce API, but this may not be sufficient to ensure regular delivery reports if the message produce rate is not steady. Note, however, that this API does not provide a direct way to block for the result of any particular message delivery. If you need to do this, then see the synchronous write example below.

Synchronous Writes

To make writes synchronous, just wait on the returned future. This would typically be a bad idea since it would kill throughput, but may be justified in some cases.

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

A similar capability could be achieved in librdkafka using the delivery callback, but it takes a bit more work. A full example can be found here.