Kafka Consumers¶
The 0.9.0.0 release of Kafka introduces the new Kafka consumer, which is a complete rewrite in Java of the older Scala consumer. It combines the functionality of the 0.8 simple and high-level consumers into a single clean API. In addition to the consolidated API, it brings several important advantages to the table:
- It supports Kafka 0.9.0.0 security extensions (SSL/SASL).
- It uses a redesigned group management protocol on top of Kafka, which allows the number of consumer groups in the cluster to scale with the number of brokers.
- Its library has a much smaller footprint with no unneeded dependences. In particular, it does not depend on Kafka core.
The Confluent Platform also ships with librdkafka, which is a high-performance C/C++ client library with support for the new consumer group management protocol.
Concepts¶
A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.
The main difference between the older “high-level” consumer and the new consumer is that the former depended on Zookeeper for group management, while the latter uses Kafka itself. For group management with Kafka, one of the brokers serves as the group’s coordinator. The coordinator maintains the current members of the group as well as their partition assignments. It is responsible for detecting when new members have joined and old members have left.
All of the groups managed by Kafka are divided roughly equally across all the brokers in the cluster. In other words, each broker may be the coordinator for some subset of the groups. This allows the number of groups to scale by increasing the number of brokers.
When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then rebalances the partitions between all current member of the group and the new member. Every rebalance results in a new generation of the group.
Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.
Offset Management: After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. Initially, before any messages have been read in the group, the position is set according to a configurable offset reset policy. Typically, consumption starts either at the earliest offset or the latest offset.
As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shutdown, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. In the examples below, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.
Configuration¶
The full list of configuration settings are available in the Kafka documentation Below we highlight several of the key configuration settings and how they affect the consumer’s behavior.
Core Configuration: The only required setting is
bootstrap.servers
, but we recommend always setting a client.id
since this allows you to easily correlate requests on the broker with
the client instance which made it. It also helps when using the
consumer group utility shown below.
Group Configuration: You should always configure group.id
unless
you are using the simple assignment API and you don’t need to store
offsets in Kafka.
You can control the session timeout by overriding the
session.timeout.ms
value. The default is 30 seconds, but you can
safely increase it to avoid excessive rebalances if you find that your
application needs more time to process messages. This concern is
mainly relevant if you are using the Java consumer and handling
messages in the same thread. The main drawback to using a larger
session timeout is that it will take longer for the coordinator to
detect when a consumer instance has crashed, which means it will
also take longer for another consumer in the group to take over its
partitions. For normal shutdowns, however, the consumer sends
an explicit request to the coordinator to leave the group which
triggers an immediate rebalance.
The other setting which affects rebalance behavior is
heartbeat.interval.ms
. This controls how often the consumer will
send heartbeats to the coordinator. It is also the way that the
consumer detects when a rebalance is needed, so a lower heartbeat
interval will generally mean faster rebalances. The default setting is
three seconds. For larger groups, it may be wise to increase this
setting.
Offset Management: The two main settings affecting offset
management are whether auto-commit is enabled and the offset reset
policy. First, if you set enable.auto.commit
(which is the
default), then the consumer will automatically commit offsets
periodically at the interval set by auto.commit.interval.ms
. The
default is 5 seconds.
Second, use auto.offset.reset
to define the behavior of the
consumer when there is no committed position (which would be the case
when the group is first initialized) or when an offset is out of
range. You can choose either to reset the position to the “earliest”
offset or the “latest” offset (the default). You can also select
“none” if you would rather set the initial offset yourself.
Administration¶
The 0.9.0 version of Kafka includes an admin utility for viewing the status of consumer groups.
List Groups¶
To get a list of the active groups in the cluster, you can use the
kafka-consumer-groups
utility included in the Kafka distribution. On
a large cluster, this may take a little time since we need to collect
the list by inspecting each broker in the cluster.
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
Describe Group¶
The utility kafka-consumer-groups
can also be used to collect
information on a current group. For example, to see the current
assignments for the foo
group, use the following command:
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo
If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.
Examples¶
Below we provide detailed examples of consumer API with special attention payed to offset management and delivery semantics. These examples are intended to be usable .
Initial Setup¶
The java consumer is constructed with a standard Properties
file.
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);
Configuration errors will result in a KafkaException
raised from
the constructor of KafkaConsumer
. The librdkafka configuration is
similar, but we need handle configuration errors directly when setting
properties:
char hostname[128];
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (gethostname(hostname, sizeof(hostname))) {
fprintf(stderr, "%% Failed to lookup hostname\n");
exit(1);
}
if (rd_kafka_conf_set(conf, "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "group.id", "foo",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
exit(1);
}
Basic Poll Loop¶
The consumer API is centered around the poll()
method, which is
used to retrieve records from the brokers. The subscribe()
method
controls which topics will be fetched in poll. Typically, consumer
usage involves an initial call to subscribe()
to setup the topics
of interest and then a loop which calls poll()
until the
application is shutdown.
The consumer intentionally avoids a specific threading model. It is
not safe for multi-threaded access and it has no background threads of
its own. In particular, this means that all IO occurs in the thread
calling poll()
. In the example below, we wrap the poll loop in a
Runnable
which makes it easy to use with an ExecutorService
.
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics;
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
}
We’ve hard-coded the poll timeout to 500 milliseconds. If no records
are received before this timeout expires, then poll()
will return
an empty record set. It’s not a bad idea to add a shortcut check for
this case if your message processing involves any setup overhead.
To shutdown the consumer, we’ve added a flag which is checked on each
loop iteration. After shutdown is triggered, the consumer will wait at
most 500 milliseconds (plus the message processing time) before
shutting down since it might be triggered while we are in poll()
.
A better approach is provided in the next example.
Note that you should always call close()
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired. We’ve added a latch to this example to ensure
that the consumer has time to finish closing before finishing
shutdown.
The same example looks similar in librdkafka:
static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);
void basic_consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (!shutdown) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void shutdown() {
shutdown = 1;
}
Although the APIs are similar, librdkafka uses a different approach
beneath the surface. While the Java consumer does all IO and
processing in the foreground thread, librdkafka uses a background
thread. The main consequence of this is that calling
rd_kafka_consumer_poll
is totally safe when used from multiple
threads. You can use this parallelize message handling in multiple
threads. From a high level, poll is taking messages off of a queue
which is filled in the background.
Another consequence of using a background thread is that all heartbeats and rebalances are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold onto its partitions and the read lag will continue to build until the process is shutdown.
Although the two clients have taken a different approach internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.
Shutdown with Wakeup¶
An alternative pattern for the poll loop in the Java consumer is to
use Long.MAX_VALUE
for the timeout. To break from the loop, we can
use the consumer’s wakeup()
method from a separate thread. This
will raise a WakeupException
from the thread blocking in
poll()
. If the thread is not currently blocking, then this will
wakeup the next poll invocation.
public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
consumer.wakeup();
shutdownLatch.await();
}
}
Synchronous Commits¶
In the examples above, we have assumed that the consumer is configured
to auto-commit offsets (this is the default). Auto-commit basically
works as a cron with a period set through the
auto.commit.interval.ms
configuration property. If the consumer
crashes, then after a restart or a rebalance, the position of all
partitions owned by the crashed consumer will be reset to the last
committed offset. When this happens, the last committed position may
be as old as the auto-commit interval itself. Any messages which have
arrived since the last commit will have to be read again.
Clearly if you want to reduce the window for duplicates, you can
reduce the auto-commit interval, but some users may want even finer
control over offsets. The consumer therefore supports a commit API
which gives you full control over offsets. The simplest and most
reliable way to manually commit offsets is using a synchronous commit
with commitSync()
. As its name suggests, this method blocks until
the commit has completed successfully.
Note that when you use the commit API directly, you should first
disable auto-commit in the configuration by setting the
enable.auto.commit
property to false
.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
In this example, we’ve added a try/catch block around the call to
commitSync
. The CommitFailedException
is thrown when the
commit cannot be completed because the group has been rebalanced. This
is the main thing we have to be careful of when using the Java
client. Since all network IO (including heartbeating) and message
processing is done in the foreground, it is possible for the session
timeout to expire while a batch of messages is being processed. To
handle this, you have two choices.
First you can adjust the session.timeout.ms
setting to ensure that
the handler has enough time to finish processing messages. You can
then tune max.partition.fetch.bytes
to limit the amount of data
returned in a single batch, though you will have to consider how many
partitions are in the subscribed topics.
The second option is to do message processing in a separate thread,
but you will have to manage flow control to ensure that the threads
can keep up. For example, just pushing messages into a blocking queue
would probably not be sufficient unless the rate of processing can
keep up with the rate of delivery (in which case you might not need a
separate thread anway). It may even exacerbate the problem if the poll
loop is stuck blocking on a call to offer()
while the background
thread is handling an even larger batch of messages. The Java API
offers a pause()
method to help in these situations, which we will
cover in another example.
For now, we recommend setting session.timeout.ms
large enough that
commit failures from rebalances are rare. As mentioned above, the only
drawback to this is a longer delay before partitions can be
re-assigned in the event of a hard failure (where the consumer cannot
be cleanly shutdown with close()
). This should be rare in
practice.
Note also that we have to be a little careful in this example since
the wakeup()
might be triggered while the commit is pending. The
recursive call is safe since the wakeup will only be triggered once.
In librdkafka, we can get similar behavior with rd_kafka_commit
,
which is used for both synchronous and asynchronous commits. The
approach is slightly different, however, since
rd_kafka_consumer_poll
returns single messages instead of batches
as the Java consumer does.
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (!shutdown) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 0);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
In this example, we trigger a synchronous commit every 1000
messages. The second argument to rd_kafka_commit
is the list of
offsets to be committed; if set to NULL
, librdkafka will commit
the latest offsets for the assigned positions. The third argument in
rd_kafka_commit
is a flag which controls whether this call is
asynchronous. We could also trigger the commit on expiration of a
timeout to ensure there the committed position is updated regularly.
Delivery Guarantees: This is as good a time as any to talk about
delivery semantics. Using auto-commit gives you “at least once”
delivery: Kafka guarantees that no messages will be missed, but
duplicates are possible. In the above example, we get the same since
the commit follows the message processing. By changing the order,
however, we can get “at most once” delivery. But we have to be a
little careful with the commit failure, so we change doCommitSync
to return whether or not the commit succeeded. There’s also no longer
any need to catch the WakeupException
in the synchronous commit.
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
And in librdkafka:
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (!shutdown) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage && !rd_kakfa_commit_message(rk, rkmessage, 0)) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
For simplicity in this example, we’ve used rd_kafka_commit_message
prior to processing the message. Committing on every message would
produce a lot of overhead in practice. A better approach would be to
collect a batch of messages, execute the synchronous commit, and then
process the messages only if the commit succeeded.
Correct offset management is crucial because it affects delivery semantics. As of version 0.9.0.0, the best Kafka can give you is at-least-once or at-most-once. If you are not careful, however, it might not give you either. Exactly-once delivery is under active investigation, but it is not currently possible without depending on another system (e.g. a transactional RDBMS).
Asynchronous Commits¶
Each call to the commit API results in an offset commit request being
sent to the broker. Using the synchronous API, the consumer is blocked
until that request returns successfully. This may reduce overall
throughput since the consumer might otherwise be able to process
records while that commit is pending. One way to deal with this is to
increase the amount of data that is returned in each poll()
. The
consumer has a configuration setting fetch.min.bytes
which
controls how much data is returned in each fetch. The broker will hold
onto the fetch until enough data is available (or
fetch.max.wait.ms
expires). The tradeoff, however, is that this
also increases the amount of duplicates that have to be dealt with in
a worst-case failure.
A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately.
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
And in librdkafka:
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (!shutdown) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 1);
}
}
err = rd_kakfka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
The only difference between this example and the previous one is that
we have enabled asynchronous commit in the call to
rd_kafka_commit
.
So if it helps performance, why not always use async commits? The main
reason is that the consumer does not retry the request if the commit
fails. This is something that commitSync
gives you for free; it
will retry indefinitely until the commit succeeds or an unrecoverable
error is ecountered. The problem with asynchronous commits is dealing
with commit ordering. By the time the consumer finds out that a commit
has failed, we may already have processed the next batch of messages
and even sent the next commit. In this case, a retry of the old commit
could cause duplicate consumption.
Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
A similar feature is available in librdkafka, but we have to configure it on initialization:
static void on_commit(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
if (err)
fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}
void init_rd_kafka() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_offset_commit_cb(conf, on_commit);
// initialization ommitted
}
Offset commit failures are merely annoying if the following commits
succeed since they won’t actually result in duplicate reads. However,
if the last commit fails before a rebalance occurs or before the
consumer is shutdown, then offsets will be reset to the last commit
and you will likely see duplicates. A common pattern is therefore to
combine async commits in the poll loop with sync commits on rebalances
or shutdown. Committing on close is straightforward, but we need a way
to hook into rebalances. For this, the subscribe()
method
introduced earlier has a variant which accepts a
ConsumerRebalanceListener
, which has two methods to hook into
rebalance behavior. In the example below, we incorporate synchronous
commits on rebalances and on close.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
doCommitSync();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is our last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, we’ve used the revocation hook to commit the current offsets synchronously.
In general, asynchronous commits should be considered less safe than
synchronous commits. Consecutive commit failures before a crash will
result in increased duplicate processing. You can mitigate this danger
by adding logic to handle commit failures in the callback or by mixing
calls to commitSync()
occasionally, but we wouldn’t recommend too
much complexity unless testing shows it is necessary. If you need more
reliability, synchronous commits are there for you, and you can still
scale up by increasing the number of topic partitions and the number
of consumers in the group. But if you just want to maximize throughput
and you’re willing to accept some increase in the number of
duplicates, then asynchronous commits may be a good option.
A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.