librdkafka
The Apache Kafka C/C++ client library
RdKafka::RebalanceCb Class Referenceabstract

KafkaConsunmer: Rebalance callback class More...

#include <rdkafkacpp.h>

Public Member Functions

virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition *> &partitions)=0
 Group rebalance callback for use with RdKafka::KafkaConsunmer. More...
 

Detailed Description

KafkaConsunmer: Rebalance callback class

Member Function Documentation

◆ rebalance_cb()

virtual void RdKafka::RebalanceCb::rebalance_cb ( RdKafka::KafkaConsumer consumer,
RdKafka::ErrorCode  err,
std::vector< TopicPartition *> &  partitions 
)
pure virtual

Group rebalance callback for use with RdKafka::KafkaConsunmer.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.

Remarks
In this latter case (arbitrary error), the application must call unassign() to synchronize state.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

The following example show's the application's responsibilities:

class MyRebalanceCb : public RdKafka::RebalanceCb {
public:
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
consumer->unassign();
} else {
std::cerr << "Rebalancing error: <<
RdKafka::err2str(err) << std::endl;
consumer->unassign();
}
}
}

The documentation for this class was generated from the following file: