Kafka Consumers

The Confluent Platform ships with the standard Java consumer first released in Kafka 0.9.0.0, the high-performance C/C++ client librdkafka, and clients for Python and Go

Note

The older Scala consumers are still supported for now, but they are not covered in this documentation, and we encourage users to migrate away from them. In particular, Kafka security extensions and the new broker based balanced consumer groups are not supported with the old consumers.

Concepts

A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.

The main difference between the older “high-level” consumer and the new consumer is that the former depended on Zookeeper for group management, while the latter uses a group protocol built into Kafka itself. In this protocol, one of the brokers is designated as the group’s coordinator and is responsible for managing the members of the group as well as their partition assignments.

The coordinator of each group is chosen from the leaders of the internal offsets topic __consumer_offsets, which is used to store committed offsets. Basically the group’s ID is hashed to one of the partitions for this topic and the leader of that partition is selected as the coordinator. In this way, management of consumer groups is divided roughly equally across all the brokers in the cluster, which allows the number of groups to scale by increasing the number of brokers.

When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new generation of the group.

Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.

Offset Management: After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages have been consumed, the position is set according to a configurable offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.

As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shutdown, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. In the examples below, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.

Configuration

The full list of configuration settings are available in the Kafka documentation Below we highlight several of the key configuration settings and how they affect the consumer’s behavior.

Core Configuration: The only required setting is bootstrap.servers, but we recommend always setting a client.id since this allows you to easily correlate requests on the broker with the client instance which made it. Typically, all consumers within the same group will share the same client ID in order to enforce client quotas.

Group Configuration: You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka.

You can control the session timeout by overriding the session.timeout.ms value. The default is 30 seconds, but you can safely increase it to avoid excessive rebalances if you find that your application needs more time to process messages. This concern is mainly relevant if you are using the Java consumer and handling messages in the same thread. In that case, you may also want to adjust max.poll.records to tune the number of records that must be handled on every loop iteration. See basic usage below for more detail on this issue.

The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance.

The other setting which affects rebalance behavior is heartbeat.interval.ms. This controls how often the consumer will send heartbeats to the coordinator. It is also the way that the consumer detects when a rebalance is needed, so a lower heartbeat interval will generally mean faster rebalances. The default setting is three seconds. For larger groups, it may be wise to increase this setting.

Offset Management: The two main settings affecting offset management are whether auto-commit is enabled and the offset reset policy. First, if you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the interval set by auto.commit.interval.ms. The default is 5 seconds.

Second, use auto.offset.reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default). You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

Initialization

The Java consumer is constructed with a standard Properties file.

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);

Configuration errors will result in a KafkaException raised from the constructor of KafkaConsumer.

The C/C++ (librdkafka) configuration is similar, but we need to handle configuration errors directly when setting properties:

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
 fprintf(stderr, "%% Failed to lookup hostname\n");
 exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

if (rd_kafka_conf_set(conf, "group.id", "foo",
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                       errstr, sizeof(errstr)))) {
 fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
 exit(1);
}

The Python client can be configured via a dictionary as follows:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'default.topic.config': {'auto.offset.reset': 'smallest'}}

consumer = Consumer(conf)

The Go client uses a ConfigMap object to pass configuration to the consumer:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})

Basic Usage

Although the Java client and librdkafka share many of the same configuration options and underlying features, they take fairly different approaches when it comes to their threading model and how they handle consumer liveness. Before diving into the examples, it’s helpful to understand the API designs of each client.

Java Client

The Java client is designed around an event loop which is driven by the poll() API. This design is motivated by the UNIX select and poll system calls. A basic consumption loop with the Java API usually takes the following form:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
  process(records); // application-specific processing
  consumer.commitSync();
}

There is no background thread in the Java consumer. The API depends on calls to poll() to drive all of its IO including:

  • Joining the consumer group and handling partition rebalances.
  • Sending periodic heartbeats if part of an active generation.
  • Sending periodic offset commits (if autocommit is enabled).
  • Sending and receiving fetch requests for assigned partitions.

Due to this single-threaded model, no heartbeats can be sent while the application is handling the records returned from a call to poll(). This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.

This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. In the 0.9 release of Kafka, this was difficult because there was no direct way to limit the amount of data returned from poll(), but 0.10 added a new configuration option, max.poll.records, which places an upper bound on the number of records returned from each call. We recommend both using a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.

If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
  process(records); // application-specific processing
  try {
    consumer.commitSync();
  } catch (CommitFailedException e) {
    // application-specific rollback of processed records
  }
}

C/C++ Client (librdkafka)

Librdkafka uses a multi-threaded approach to Kafka consumption. From a user’s perspective, interaction with the API is not too different from the example used by the Java client with the user calling rd_kafka_consumer_poll in a loop, though this API returns only a single message or event at a time:

while (running) {
 rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
 if (rkmessage) {
   msg_process(rkmessage);
   rd_kafka_message_destroy(rkmessage);

   if ((++msg_count % MIN_COMMIT_COUNT) == 0)
     rd_kafka_commit(rk, NULL, 0);
 }
}

Unlike the Java client, librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, it is up to you to ensure that your process does not become a zombie since it will continue to hold onto assigned partitions in that case.

Note that partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing.

while (running) {
  rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 1000);
  if (!rkmessage)
    continue; // timeout: no message

  msg_process(rkmessage); // application-specific processing
  rd_kafka_message_destroy(rkmessage);

  if ((++msg_count % MIN_COMMIT_COUNT) == 0) {
    rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0);
    if (err) {
      // application-specific rollback of processed records
    }
  }
}

Python Client

The Python client uses librdkafka internally so it also uses a multi-threaded approach to Kafka consumption. From a user’s perspective, interaction with the API is not too different from the example used by the Java client with the user calling the poll() method in a loop, though this API returns only a single message at a time:

try:
    msg_count = 0
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        msg_process(msg) # application-specific processing
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0:
            consumer.commit(async=False)
finally:
    # Shut down consumer
    consumer.close()

Go Client

The Go client uses librdkafka internally so it also uses a multi-threaded approach to Kafka consumption. From a user’s perspective, interaction with the API is not too different from the example used by the Java client with the user calling the Poll() method in a loop, though this API returns only a single message at a time:

for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case *kafka.Message:
        // application-specific processing
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Detailed Examples

Below we provide detailed examples using the consumer API with special attention payed to offset management and delivery semantics. These examples are intended to give you a starting point for building your consumer application.

Basic Poll Loop

The consumer API is centered around the poll() method, which is used to retrieve records from the brokers. The subscribe() method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to subscribe() to setup the topics of interest and then a loop which calls poll() until the application is shutdown.

The consumer intentionally avoids a specific threading model. It is not safe for multi-threaded access and it has no background threads of its own. In particular, this means that all IO occurs in the thread calling poll(). In the example below, we wrap the poll loop in a Runnable which makes it easy to use with an ExecutorService.

public abstract class BasicConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics;
    this.shutdown = new AtomicBoolean(false);
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (!shutdown.get()) {
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
      }
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    shutdownLatch.await();
  }
}

We’ve hard-coded the poll timeout to 500 milliseconds. If no records are received before this timeout expires, then poll() will return an empty record set. It’s not a bad idea to add a shortcut check for this case if your message processing involves any setup overhead.

To shutdown the consumer, we’ve added a flag which is checked on each loop iteration. After shutdown is triggered, the consumer will wait at most 500 milliseconds (plus the message processing time) before shutting down since it might be triggered while we are in poll(). A better approach is provided in the next example.

Note that you should always call close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. We’ve added a latch to this example to ensure that the consumer has time to finish closing before finishing shutdown.

The same example looks similar in librdkafka:

static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);

void basic_consume_loop(rd_kafka_t *rk,
                        rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

In Python:

running = True

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

def shutdown():
    running = False

And in Go:

err = consumer.SubscribeTopics(topics, nil)

for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case *kafka.Message:
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))
    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

consumer.Close()

Although the APIs are similar, the C/C++, Python, and Go clients uses a different approach than the Java client beneath the surface. While the Java consumer does all IO and processing in the foreground thread, these clients use a background thread. The main consequence of this is that calling rd_kafka_consumer_poll or Consumer.poll() is totally safe when used from multiple threads. You can use this to parallelize message handling in multiple threads. From a high level, poll is taking messages off of a queue which is filled in the background.

Another consequence of using a background thread is that all heartbeats and rebalances are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold onto its partitions and the read lag will continue to build until the process is shutdown.

Although the clients have taken different approaches internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.

Shutdown with Wakeup

An alternative pattern for the poll loop in the Java consumer is to use Long.MAX_VALUE for the timeout. To break from the loop, we can use the consumer’s wakeup() method from a separate thread. This will raise a WakeupException from the thread blocking in poll(). If the thread is not currently blocking, then this will wakeup the next poll invocation.

public abstract class ConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
    this.consumer = consumer;
    this.topics = topics;
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    consumer.wakeup();
    shutdownLatch.await();
  }
}

Synchronous Commits

In the examples above, we have assumed that the consumer is configured to auto-commit offsets (this is the default). Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.

Clearly if you want to reduce the window for duplicates, you can reduce the auto-commit interval, but some users may want even finer control over offsets. The consumer therefore supports a commit API which gives you full control over offsets. The simplest and most reliable way to manually commit offsets is using a synchronous commit with commitSync(). As its name suggests, this method blocks until the commit has completed successfully.

Note that when you use the commit API directly, you should first disable auto-commit in the configuration by setting the enable.auto.commit property to false.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

In this example, we’ve added a try/catch block around the call to commitSync. The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing we have to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the session.timeout.ms setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread anway). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations, which we will cover in another example.

For now, we recommend setting session.timeout.ms large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shutdown with close()). This should be rare in practice.

Note also that we have to be a little careful in this example since the wakeup() might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.

In C/C++ (librdkafka), we can get similar behavior with rd_kafka_commit, which is used for both synchronous and asynchronous commits. The approach is slightly different, however, since rd_kafka_consumer_poll returns single messages instead of batches as the Java consumer does.

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 0);
    }
 }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

In this example, we trigger a synchronous commit every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Since the Python client uses librdkafka internally, it uses a similar pattern by setting the async parameter to the Consumer.commit() method call. This method can also accept the mutually exclusive keyword parameters offsets to explicitly list the offsets for each assigned topic partition and message which will commit offsets relative to a Message object returned by poll().

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(async=False)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

The Go client also uses librdkafka internally, and so it uses a similar pattern but only provides a synchronous Commit() method call. Other variants of commit methods also accept a list of offsets to commit or a Message in order to commit offsets relative to a consumed message. When using manual offset commit, be sure to disable the enable.auto.commit configuration.

msg_count := 0
for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            consumer.Commit()
        }
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Delivery Guarantees: This is as good a time as any to talk about delivery semantics. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. In the above example, we get the same since the commit follows the message processing. By changing the order, however, we can get “at most once” delivery. But we have to be a little careful with the commit failure, so we change doCommitSync to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException in the synchronous commit.

private boolean doCommitSync() {
  try {
    consumer.commitSync();
    return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

And in C/C++ (librdkafka):

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage && !rd_kakfa_commit_message(rk, rkmessage, 0)) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

And in Python:

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                consumer.commit(async=False)
                msg_process(msg)

    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

And in Go:

for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case *kafka.Message:
        err = consumer.CommitMessage(e)
        if err == nil {
            msg_process(e)
        }

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

For simplicity in this example, we’ve used rd_kafka_commit_message prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Correct offset management is crucial because it affects delivery semantics. As of version 0.9.0.0, the best Kafka can give you is at-least-once or at-most-once. If you are not careful, however, it might not give you either. Exactly-once delivery is under active investigation, but it is not currently possible without depending on another system (e.g. a transactional RDBMS).

Asynchronous Commits

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending. One way to deal with this is to increase the amount of data that is returned in each poll(). The consumer has a configuration setting fetch.min.bytes which controls how much data is returned in each fetch. The broker will hold onto the fetch until enough data is available (or fetch.max.wait.ms expires). The tradeoff, however, is that this also increases the amount of duplicates that have to be dealt with in a worst-case failure.

A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately.

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

And in C/C++ (librdkafka):

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 1);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

The only difference between this example and the previous one is that we have enabled asynchronous commit in the call to rd_kafka_commit.

The change in Python is very similar. The async parameter to commit() is changed to True. Here we pass the value in explicitly, but asynchronous commits are the default if the parameter is not included.

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(async=True)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In Go, simply execute the commit in a goroutine to commit asynchronously:

msg_count := 0
for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            go func() {
                offsets, err := consumer.Commit()
            }()
        }
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

So if it helps performance, why not always use async commits? The main reason is that the consumer does not retry the request if the commit fails. This is something that commitSync gives you for free; it will retry indefinitely until the commit succeeds or an unrecoverable error is ecountered. The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, we may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption.

Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (e != null)
            log.debug("Commit failed for offsets {}", offsets, e);
          }
      });
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

A similar feature is available in C/C++ (librdkafka), but we have to configure it on initialization:

static void on_commit(rd_kafka_t *rk,
                      rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *offsets,
                      void *opaque) {
  if (err)
    fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_offset_commit_cb(conf, on_commit);

  // initialization omitted
}

Similarly, in Python the commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.

from confluent_kafka import Consumer

def commit_completed(err, partitions):
    if err:
        print(str(err))
    else:
        print("Committed partition offsets: " + str(partitions))

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'on_commit': commit_completed}

consumer = Consumer(conf)

In Go, rebalance events are exposed as events returned by the Poll() method. To see these events you must create the consumer with the go.application.rebalance.enable configuration and handle AssignedPartitions and RevokedPartitions events by explicitly calling Assign() and Unassign() for AssignedPartitions and RevokedPartitions respectively:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "go.application.rebalance.enable": true})

msg_count := 0
for run == true {
    ev := consumer.Poll(0)
    switch e := ev.(type) {
    case kafka.AssignedPartitions:
        fmt.Fprintf(os.Stderr, "%% %v\n", e)
        c.Assign(e.Partitions)
    case kafka.RevokedPartitions:
        fmt.Fprintf(os.Stderr, "%% %v\n", e)
        c.Unassign()
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            consumer.Commit()
        }

        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Offset commit failures are merely annoying if the following commits succeed since they won’t actually result in duplicate reads. However, if the last commit fails before a rebalance occurs or before the consumer is shutdown, then offsets will be reset to the last commit and you will likely see duplicates. A common pattern is therefore to combine async commits in the poll loop with sync commits on rebalances or shutdown. Committing on close is straightforward, but we need a way to hook into rebalances. For this, the subscribe() method introduced earlier has a variant which accepts a ConsumerRebalanceListener, which has two methods to hook into rebalance behavior.

In the example below, we incorporate synchronous commits on rebalances and on close.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is our last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, we’ve used the revocation hook to commit the current offsets synchronously.

In general, asynchronous commits should be considered less safe than synchronous commits. Consecutive commit failures before a crash will result in increased duplicate processing. You can mitigate this danger by adding logic to handle commit failures in the callback or by mixing calls to commitSync() occasionally, but we wouldn’t recommend too much complexity unless testing shows it is necessary. If you need more reliability, synchronous commits are there for you, and you can still scale up by increasing the number of topic partitions and the number of consumers in the group. But if you just want to maximize throughput and you’re willing to accept some increase in the number of duplicates, then asynchronous commits may be a good option.

A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.

Administration

Since 0.9, the Kafka release includes an admin utility for viewing the status of consumer groups.

List Groups

To get a list of the active groups in the cluster, you can use the kafka-consumer-groups utility included in the Kafka distribution. On a large cluster, this may take a little time since we need to collect the list by inspecting each broker in the cluster.

$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list

Describe Group

The utility kafka-consumer-groups can also be used to collect information on a current group. For example, to see the current assignments for the foo group, use the following command:

$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo

If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.