Kafka Consumers¶
The Confluent Platform ships with the standard Java consumer first released in Kafka 0.9.0.0, the high-performance C/C++ client librdkafka, and clients for Python, Go and .NET
Note
The older Scala consumers are still supported for now, but they are not covered in this documentation, and we encourage users to migrate away from them. In particular, Kafka security extensions and the new broker based balanced consumer groups are not supported with the old consumers.
Concepts¶
A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.
The main difference between the older “high-level” consumer and the new consumer is that the former depended on Zookeeper for group management, while the latter uses a group protocol built into Kafka itself. In this protocol, one of the brokers is designated as the group’s coordinator and is responsible for managing the members of the group as well as their partition assignments.
The coordinator of each group is chosen from the leaders of the
internal offsets topic __consumer_offsets
, which is used to store
committed offsets. Basically the group’s ID is hashed to one of the
partitions for this topic and the leader of that partition is selected
as the coordinator. In this way, management of consumer groups is
divided roughly equally across all the brokers in the cluster, which
allows the number of groups to scale by increasing the number of
brokers.
When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new generation of the group.
Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.
Offset Management: After the consumer receives its assignment from
the coordinator, it must determine the initial position for each
assigned partition. When the group is first created, before any
messages have been consumed, the position is set according to a
configurable offset reset policy (auto.offset.reset
). Typically,
consumption starts either at the earliest offset or the latest offset.
As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shutdown, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. In the examples below, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.
Configuration¶
The full list of configuration settings are available in the Kafka documentation Below we highlight several of the key configuration settings and how they affect the consumer’s behavior.
Core Configuration: The only required setting is
bootstrap.servers
, but we recommend always setting a client.id
since this allows you to easily correlate requests on the broker with
the client instance which made it. Typically, all consumers within the
same group will share the same client ID in order to enforce
client quotas.
Group Configuration: You should always configure group.id
unless
you are using the simple assignment API and you don’t need to store
offsets in Kafka.
You can control the session timeout by overriding the
session.timeout.ms
value. The default is 30 seconds, but you can
safely increase it to avoid excessive rebalances if you find that your
application needs more time to process messages. This concern is
mainly relevant if you are using the Java consumer and handling
messages in the same thread. In that case, you may also want to adjust
max.poll.records
to tune the number of records that must be
handled on every loop iteration. See basic usage below for more detail on this issue.
The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance.
The other setting which affects rebalance behavior is
heartbeat.interval.ms
. This controls how often the consumer will
send heartbeats to the coordinator. It is also the way that the
consumer detects when a rebalance is needed, so a lower heartbeat
interval will generally mean faster rebalances. The default setting is
three seconds. For larger groups, it may be wise to increase this
setting.
Offset Management: The two main settings affecting offset
management are whether auto-commit is enabled and the offset reset
policy. First, if you set enable.auto.commit
(which is the
default), then the consumer will automatically commit offsets
periodically at the interval set by auto.commit.interval.ms
. The
default is 5 seconds.
Second, use auto.offset.reset
to define the behavior of the
consumer when there is no committed position (which would be the case
when the group is first initialized) or when an offset is out of
range. You can choose either to reset the position to the “earliest”
offset or the “latest” offset (the default). You can also select
“none” if you would rather set the initial offset yourself and you are
willing to handle out of range errors manually.
Initialization¶
The Java consumer is constructed with a standard Properties
file.
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);
Configuration errors will result in a KafkaException
raised from
the constructor of KafkaConsumer
.
The C/C++ (librdkafka) configuration is similar, but we need to handle configuration errors directly when setting properties:
char hostname[128];
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (gethostname(hostname, sizeof(hostname))) {
fprintf(stderr, "%% Failed to lookup hostname\n");
exit(1);
}
if (rd_kafka_conf_set(conf, "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "group.id", "foo",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
exit(1);
}
The Python client can be configured via a dictionary as follows:
from confluent_kafka import Consumer
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'group.id': "foo",
'default.topic.config': {'auto.offset.reset': 'smallest'}}
consumer = Consumer(conf)
The Go client uses a ConfigMap
object to pass configuration to the consumer:
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
)
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"group.id": "foo",
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})
In C#, use a Dictionary<string, object>
:
using System.Collections.Generic;
using Confluent.Kafka;
...
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "host1:9092,host2:9092" },
{ "group.id", "foo" },
{ "default.topic.config", new Dictionary<string, object>
{
{ "auto.offset.reset", "smallest" }
}
}
}
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
...
}
Basic Usage¶
Although the Java client and librdkafka share many of the same configuration options and underlying features, they take fairly different approaches when it comes to their threading model and how they handle consumer liveness. Before diving into the examples, it’s helpful to understand the API designs of each client.
Java Client¶
The Java client is designed around an event loop which is driven by
the poll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java API
usually takes the following form:
while (running) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
process(records); // application-specific processing
consumer.commitSync();
}
There is no background thread in the Java consumer. The API depends on
calls to poll()
to drive all of its IO including:
- Joining the consumer group and handling partition rebalances.
- Sending periodic heartbeats if part of an active generation.
- Sending periodic offset commits (if autocommit is enabled).
- Sending and receiving fetch requests for assigned partitions.
Due to this single-threaded model, no heartbeats can be sent while
the application is handling the records returned from a call to poll()
.
This means that the consumer will fall out of the consumer group if either the event loop
terminates or if a delay in record processing causes the session
timeout to expire before the next iteration of the loop. This is
actually by design. One of the problems that the Java client attempts
to solve is ensuring the liveness of consumers in the group. As long
as the consumer is assigned partitions, no other members in the group
can consume from the same partitions, so it is important to ensure
that it is actually making progress and has not become a zombie.
This feature protects your application from a large class of failures,
but the downside is that it puts the burden on you to tune the session
timeout so that the consumer does not exceed it in its normal record
processing. In the 0.9 release of Kafka, this was difficult because
there was no direct way to limit the amount of data returned from
poll()
, but 0.10 added a new configuration option,
max.poll.records
, which places an upper bound on the number of
records returned from each call. We recommend both using a fairly high
session timeout (e.g. 30 to 60 seconds), and keeping the number of
records processed on each iteration bounded so that worst-case
behavior is predictable.
If you fail to tune these settings appropriately, the consequence is
typically a CommitFailedException
raised from the call to commit
offsets for the processed records. If you are using the automatic
commit policy, then you might not even notice when this happens since
the consumer silently ignores commit failures internally (unless it’s
occurring often enough to impact lag metrics). You can catch this
exception and either ignore it or perform any needed rollback logic.
while (running) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
process(records); // application-specific processing
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application-specific rollback of processed records
}
}
C/C++ Client (librdkafka)¶
Librdkafka uses a multi-threaded approach to Kafka consumption. From a
user’s perspective, interaction with the API is not too different from
the example used by the Java client with the user calling
rd_kafka_consumer_poll
in a loop, though this API returns only a
single message or event at a time:
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 0);
}
}
Unlike the Java client, librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, it is up to you to ensure that your process does not become a zombie since it will continue to hold onto assigned partitions in that case.
Note that partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing.
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 1000);
if (!rkmessage)
continue; // timeout: no message
msg_process(rkmessage); // application-specific processing
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) {
rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0);
if (err) {
// application-specific rollback of processed records
}
}
}
Python, Go and .NET Clients¶
The Python, Go and .NET clients use librdkafka internally so they
also use a multi-threaded approach to Kafka consumption. From a
user’s perspective, interaction with the API is not too different from
the example used by the Java client with the user calling the
poll()
method in a loop, though this API returns only a
single message at a time.
In Python:
try:
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
msg_process(msg) # application-specific processing
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(async=False)
finally:
# Shut down consumer
consumer.close()
In Go:
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
// application-specific processing
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
Behavior of the C# Consumer is similar, except that before entering the Poll loop, you need to set up handlers for the various types of events and switching on event type effectively happens inside the Poll method (note all code is executed on the same thread in the below example):
consumer.OnMessage += (_, msg) =>
{
// handle message.
}
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");
consumer.OnError += (_, error)
{
Console.WriteLine($"Error: {error}");
cancelled = true;
}
while (!cancelled)
{
consumer.Poll(TimeSpan.FromSeconds(1));
}
Detailed Examples¶
Below we provide detailed examples using the consumer API with special attention payed to offset management and delivery semantics. These examples are intended to give you a starting point for building your consumer application.
Basic Poll Loop¶
The consumer API is centered around the poll()
method, which is
used to retrieve records from the brokers. The subscribe()
method
controls which topics will be fetched in poll. Typically, consumer
usage involves an initial call to subscribe()
to setup the topics
of interest and then a loop which calls poll()
until the
application is shutdown.
The consumer intentionally avoids a specific threading model. It is
not safe for multi-threaded access and it has no background threads of
its own. In particular, this means that all IO occurs in the thread
calling poll()
. In the example below, we wrap the poll loop in a
Runnable
which makes it easy to use with an ExecutorService
.
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics;
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
}
We’ve hard-coded the poll timeout to 500 milliseconds. If no records
are received before this timeout expires, then poll()
will return
an empty record set. It’s not a bad idea to add a shortcut check for
this case if your message processing involves any setup overhead.
To shutdown the consumer, we’ve added a flag which is checked on each
loop iteration. After shutdown is triggered, the consumer will wait at
most 500 milliseconds (plus the message processing time) before
shutting down since it might be triggered while we are in poll()
.
A better approach is provided in the next example.
Note that you should always call close()
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired. We’ve added a latch to this example to ensure
that the consumer has time to finish closing before finishing
shutdown.
The same example looks similar in librdkafka:
static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);
void basic_consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
In Python:
running = True
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
running = False
In Go:
err = consumer.SubscribeTopics(topics, nil)
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
consumer.Close()
And in C#:
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Message value: {msg.Value}");
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");
consumer.OnError += (_, error)
{
Console.WriteLine($"Error: {error}");
cancelled = true;
}
consumer.Subscribe(topics);
while (!cancelled)
{
consumer.Poll(TimeSpan.FromSeconds(1));
}
}
Although the APIs are similar, the C/C++, Python, Go and C# clients use a
different approach than the Java client
beneath the surface. While the Java consumer does all IO and
processing in the foreground thread, these clients use a background
thread. The main consequence of this is that calling
rd_kafka_consumer_poll
or Consumer.poll()
is totally safe when used from multiple
threads. You can use this to parallelize message handling in multiple
threads. From a high level, poll is taking messages off of a queue
which is filled in the background.
Another consequence of using a background thread is that all heartbeats and rebalances are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold onto its partitions and the read lag will continue to build until the process is shutdown.
Although the clients have taken different approaches internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.
Shutdown with Wakeup¶
An alternative pattern for the poll loop in the Java consumer is to
use Long.MAX_VALUE
for the timeout. To break from the loop, we can
use the consumer’s wakeup()
method from a separate thread. This
will raise a WakeupException
from the thread blocking in
poll()
. If the thread is not currently blocking, then this will
wakeup the next poll invocation.
public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
consumer.wakeup();
shutdownLatch.await();
}
}
Synchronous Commits¶
In the examples above, we have assumed that the consumer is configured
to auto-commit offsets (this is the default). Auto-commit basically
works as a cron with a period set through the
auto.commit.interval.ms
configuration property. If the consumer
crashes, then after a restart or a rebalance, the position of all
partitions owned by the crashed consumer will be reset to the last
committed offset. When this happens, the last committed position may
be as old as the auto-commit interval itself. Any messages which have
arrived since the last commit will have to be read again.
Clearly if you want to reduce the window for duplicates, you can
reduce the auto-commit interval, but some users may want even finer
control over offsets. The consumer therefore supports a commit API
which gives you full control over offsets. The simplest and most
reliable way to manually commit offsets is using a synchronous commit
with commitSync()
. As its name suggests, this method blocks until
the commit has completed successfully.
Note that when you use the commit API directly, you should first
disable auto-commit in the configuration by setting the
enable.auto.commit
property to false
.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
In this example, we’ve added a try/catch block around the call to
commitSync
. The CommitFailedException
is thrown when the
commit cannot be completed because the group has been rebalanced. This
is the main thing we have to be careful of when using the Java
client. Since all network IO (including heartbeating) and message
processing is done in the foreground, it is possible for the session
timeout to expire while a batch of messages is being processed. To
handle this, you have two choices.
First you can adjust the session.timeout.ms
setting to ensure that
the handler has enough time to finish processing messages. You can
then tune max.partition.fetch.bytes
to limit the amount of data
returned in a single batch, though you will have to consider how many
partitions are in the subscribed topics.
The second option is to do message processing in a separate thread,
but you will have to manage flow control to ensure that the threads
can keep up. For example, just pushing messages into a blocking queue
would probably not be sufficient unless the rate of processing can
keep up with the rate of delivery (in which case you might not need a
separate thread anway). It may even exacerbate the problem if the poll
loop is stuck blocking on a call to offer()
while the background
thread is handling an even larger batch of messages. The Java API
offers a pause()
method to help in these situations, which we will
cover in another example.
For now, we recommend setting session.timeout.ms
large enough that
commit failures from rebalances are rare. As mentioned above, the only
drawback to this is a longer delay before partitions can be
re-assigned in the event of a hard failure (where the consumer cannot
be cleanly shutdown with close()
). This should be rare in
practice.
Note also that we have to be a little careful in this example since
the wakeup()
might be triggered while the commit is pending. The
recursive call is safe since the wakeup will only be triggered once.
In C/C++ (librdkafka), we can get similar behavior with rd_kafka_commit
,
which is used for both synchronous and asynchronous commits. The
approach is slightly different, however, since
rd_kafka_consumer_poll
returns single messages instead of batches
as the Java consumer does.
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 0);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
In this example, we trigger a synchronous commit every 1000
messages. The second argument to rd_kafka_commit
is the list of
offsets to be committed; if set to NULL
, librdkafka will commit
the latest offsets for the assigned positions. The third argument in
rd_kafka_commit
is a flag which controls whether this call is
asynchronous. We could also trigger the commit on expiration of a
timeout to ensure there the committed position is updated regularly.
Since the Python client uses librdkafka internally, it uses a similar
pattern by setting the async
parameter to the Consumer.commit()
method call. This method can also accept the mutually exclusive keyword
parameters offsets
to explicitly list the offsets for each assigned
topic partition and message
which will commit offsets relative to a
Message
object returned by poll()
.
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(async=False)
finally:
# Close down consumer to commit final offsets.
consumer.close()
The Go client also uses librdkafka internally, and so it uses a similar
pattern but only provides a synchronous Commit()
method call. Other variants of commit methods also accept a list of offsets to
commit or a Message
in order to commit offsets relative to a consumed message.
When using manual offset commit, be sure to disable the enable.auto.commit
configuration.
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
consumer.Commit()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
The C# client provides an awaitable CommitAsync
method with
a number of overloads. It can be
used in a synchronous manner by calling Result
or Wait()
on the returned Task
. There are variants that commit all
offsets in the current assignment, a specific list of offsets
or an offset based on a Message
.
var msgCount = 0;
consumer.OnMessage += (_, msg) =>
{
msgCount += 1;
if (msgCount % MIN_COMMIT_COUNT == 0)
{
consumer.CommitAsync().Wait();
}
Console.WriteLine($"Message value: {msg.Value}");
}
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");
consumer.OnError += (_, error)
{
Console.WriteLine($"Error: {error}");
cancelled = true;
}
while (!cancelled)
{
consumer.Poll(TimeSpan.FromSeconds(1));
}
Delivery Guarantees: This is as good a time as any to talk about
delivery semantics. Using auto-commit gives you “at least once”
delivery: Kafka guarantees that no messages will be missed, but
duplicates are possible. In the above example, we get the same since
the commit follows the message processing. By changing the order,
however, we can get “at most once” delivery. But we have to be a
little careful with the commit failure, so we change doCommitSync
to return whether or not the commit succeeded. There’s also no longer
any need to catch the WakeupException
in the synchronous commit.
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
And in C/C++ (librdkafka):
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage && !rd_kakfa_commit_message(rk, rkmessage, 0)) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
And in Python:
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
consumer.commit(async=False)
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
And in Go:
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
err = consumer.CommitMessage(e)
if err == nil {
msg_process(e)
}
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
And in C#:
consumer.OnMessage += (_, msg) =>
{
var err = consumer.CommitAsync().Result.Error;
if (!err)
{
processMessage(msg);
}
}
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");
consumer.OnError += (_, error)
{
Console.WriteLine($"Error: {error}");
cancelled = true;
}
while (!cancelled)
{
consumer.Poll(TimeSpan.FromSeconds(1));
}
For simplicity in this example, we’ve used rd_kafka_commit_message
prior to processing the message. Committing on every message would
produce a lot of overhead in practice. A better approach would be to
collect a batch of messages, execute the synchronous commit, and then
process the messages only if the commit succeeded.
Correct offset management is crucial because it affects delivery semantics. As of version 0.9.0.0, the best Kafka can give you is at-least-once or at-most-once. If you are not careful, however, it might not give you either. Exactly-once delivery is under active investigation, but it is not currently possible without depending on another system (e.g. a transactional RDBMS).
Asynchronous Commits¶
Each call to the commit API results in an offset commit request being
sent to the broker. Using the synchronous API, the consumer is blocked
until that request returns successfully. This may reduce overall
throughput since the consumer might otherwise be able to process
records while that commit is pending. One way to deal with this is to
increase the amount of data that is returned in each poll()
. The
consumer has a configuration setting fetch.min.bytes
which
controls how much data is returned in each fetch. The broker will hold
onto the fetch until enough data is available (or
fetch.max.wait.ms
expires). The tradeoff, however, is that this
also increases the amount of duplicates that have to be dealt with in
a worst-case failure.
A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately.
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
And in C/C++ (librdkafka):
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 1);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
The only difference between this example and the previous one is that
we have enabled asynchronous commit in the call to
rd_kafka_commit
.
The change in Python is very similar. The async
parameter to commit()
is
changed to True
. Here we pass the value in explicitly, but asynchronous
commits are the default if the parameter is not included.
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(async=True)
finally:
# Close down consumer to commit final offsets.
consumer.close()
In Go, simply execute the commit in a goroutine to commit asynchronously:
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
go func() {
offsets, err := consumer.Commit()
}()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
In C#, simply call the CommitAsync
method:
var msgCount = 0;
consumer.OnMessage += (_, msg) =>
{
processMessage(msg);
msgCount += 1;
if (msgCount % MIN_COMMIT_COUNT == 0)
{
consumer.CommitAsync();
}
}
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");
consumer.OnError += (_, error)
{
Console.WriteLine($"Error: {error}");
cancelled = true;
}
while (!cancelled)
{
consumer.Poll(TimeSpan.FromSeconds(1));
}
So if it helps performance, why not always use async commits? The main
reason is that the consumer does not retry the request if the commit
fails. This is something that commitSync
gives you for free; it
will retry indefinitely until the commit succeeds or an unrecoverable
error is ecountered. The problem with asynchronous commits is dealing
with commit ordering. By the time the consumer finds out that a commit
has failed, we may already have processed the next batch of messages
and even sent the next commit. In this case, a retry of the old commit
could cause duplicate consumption.
Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
A similar feature is available in C/C++ (librdkafka), but we have to configure it on initialization:
static void on_commit(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
if (err)
fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}
void init_rd_kafka() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_offset_commit_cb(conf, on_commit);
// initialization omitted
}
Similarly, in Python the commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.
from confluent_kafka import Consumer
def commit_completed(err, partitions):
if err:
print(str(err))
else:
print("Committed partition offsets: " + str(partitions))
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'group.id': "foo",
'default.topic.config': {'auto.offset.reset': 'smallest'},
'on_commit': commit_completed}
consumer = Consumer(conf)
In C#, you can use a Task
continuation:
var msgCount = 0;
consumer.OnMessage += (_, msg) =>
{
processMessage(msg);
msgCount += 1;
if (msgCount % MIN_COMMIT_COUNT == 0)
{
consumer.CommitAsync().ContinueWith(
commitResult =>
{
if (commitResult.Error)
{
Console.Error.WriteLine(commitResult.Error);
}
else
{
Console.WriteLine(
$"Committed Offsets [{string.Join(", ", commitResult.Offsets)}]");
}
}
)
}
}
In Go, rebalance events are exposed as events returned by the Poll()
method. To see these events you must create the consumer with the
go.application.rebalance.enable
configuration and handle
AssignedPartitions
and RevokedPartitions
events by explicitly calling
Assign()
and Unassign()
for AssignedPartitions
and
RevokedPartitions
respectively:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"group.id": "foo",
"go.application.rebalance.enable": true})
msg_count := 0
for run == true {
ev := consumer.Poll(0)
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Assign(e.Partitions)
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Unassign()
case *kafka.Message:
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0 {
consumer.Commit()
}
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
Offset commit failures are merely annoying if the following commits
succeed since they won’t actually result in duplicate reads. However,
if the last commit fails before a rebalance occurs or before the
consumer is shutdown, then offsets will be reset to the last commit
and you will likely see duplicates. A common pattern is therefore to
combine async commits in the poll loop with sync commits on rebalances
or shutdown. Committing on close is straightforward, but we need a way
to hook into rebalances. For this, the subscribe()
method
introduced earlier has a variant which accepts a
ConsumerRebalanceListener
, which has two methods to hook into
rebalance behavior.
In the example below, we incorporate synchronous commits on rebalances and on close.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
doCommitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is our last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, we’ve used the revocation hook to commit the current offsets synchronously.
In general, asynchronous commits should be considered less safe than
synchronous commits. Consecutive commit failures before a crash will
result in increased duplicate processing. You can mitigate this danger
by adding logic to handle commit failures in the callback or by mixing
calls to commitSync()
occasionally, but we wouldn’t recommend too
much complexity unless testing shows it is necessary. If you need more
reliability, synchronous commits are there for you, and you can still
scale up by increasing the number of topic partitions and the number
of consumers in the group. But if you just want to maximize throughput
and you’re willing to accept some increase in the number of
duplicates, then asynchronous commits may be a good option.
A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.
Administration¶
Since 0.9, the Kafka release includes an admin utility for viewing the status of consumer groups.
List Groups¶
To get a list of the active groups in the cluster, you can use the
kafka-consumer-groups
utility included in the Kafka distribution. On
a large cluster, this may take a little time since we need to collect
the list by inspecting each broker in the cluster.
$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
Describe Group¶
The utility kafka-consumer-groups
can also be used to collect
information on a current group. For example, to see the current
assignments for the foo
group, use the following command:
$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo
If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.