librdkafka
The Apache Kafka C/C++ client library
RdKafka::KafkaConsumer Class Referenceabstract

High-level KafkaConsumer (for brokers 0.9 and later) More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::KafkaConsumer:
Collaboration diagram for RdKafka::KafkaConsumer:

Public Member Functions

virtual ErrorCode assignment (std::vector< RdKafka::TopicPartition *> &partitions)=0
 Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
 
virtual ErrorCode subscription (std::vector< std::string > &topics)=0
 Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
 
virtual ErrorCode subscribe (const std::vector< std::string > &topics)=0
 Update the subscription set to topics. More...
 
virtual ErrorCode unsubscribe ()=0
 Unsubscribe from the current subscription set.
 
virtual ErrorCode assign (const std::vector< TopicPartition *> &partitions)=0
 Update the assignment set to partitions. More...
 
virtual ErrorCode unassign ()=0
 Stop consumption and remove the current assignment.
 
virtual Messageconsume (int timeout_ms)=0
 Consume message or get error event, triggers callbacks. More...
 
virtual ErrorCode commitSync ()=0
 Commit offsets for the current assignment. More...
 
virtual ErrorCode commitAsync ()=0
 Asynchronous version of RdKafka::KafkaConsumer::CommitSync() More...
 
virtual ErrorCode commitSync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode commitAsync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode commitSync (std::vector< TopicPartition *> &offsets)=0
 Commit offsets for the provided list of partitions. More...
 
virtual ErrorCode commitAsync (const std::vector< TopicPartition *> &offsets)=0
 Commit offset for the provided list of partitions. More...
 
virtual ErrorCode committed (std::vector< TopicPartition *> &partitions, int timeout_ms)=0
 Retrieve committed offsets for topics+partitions. More...
 
virtual ErrorCode position (std::vector< TopicPartition *> &partitions)=0
 Retrieve current positions (offsets) for topics+partitions. More...
 
virtual ErrorCode close ()=0
 Close and shut down the proper. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition *> &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition *> &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 

Static Public Member Functions

static KafkaConsumercreate (Conf *conf, std::string &errstr)
 Creates a KafkaConsumer. More...
 

Detailed Description

High-level KafkaConsumer (for brokers 0.9 and later)

Remarks
Requires Apache Kafka >= 0.9.0 brokers

Currently supports the range and roundrobin partition assignment strategies (see partition.assignment.strategy)

Member Function Documentation

◆ create()

static KafkaConsumer* RdKafka::KafkaConsumer::create ( Conf conf,
std::string &  errstr 
)
static

Creates a KafkaConsumer.

The conf object must have group.id set to the consumer group to join.

Use RdKafka::KafkaConsumer::close() to shut down the consumer.

See also
RdKafka::RebalanceCb
CONFIGURATION.md for group.id, session.timeout.ms, partition.assignment.strategy, etc.

◆ subscribe()

virtual ErrorCode RdKafka::KafkaConsumer::subscribe ( const std::vector< std::string > &  topics)
pure virtual

Update the subscription set to topics.

Any previous subscription will be unassigned and unsubscribed first.

The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.

The result of such an assignment is a rebalancing which is either handled automatically in librdkafka or can be overriden by the application by providing a RdKafka::RebalanceCb.

The rebalancing passes the assigned partition set to RdKafka::KafkaConsumer::assign() to update what partitions are actually being fetched by the KafkaConsumer.

Regex pattern matching automatically performed for topics prefixed with "^" (e.g. "^myPfx[0-9]_.*"

◆ assign()

virtual ErrorCode RdKafka::KafkaConsumer::assign ( const std::vector< TopicPartition *> &  partitions)
pure virtual

Update the assignment set to partitions.

The assignment set is the set of partitions actually being consumed by the KafkaConsumer.

◆ consume()

virtual Message* RdKafka::KafkaConsumer::consume ( int  timeout_ms)
pure virtual

Consume message or get error event, triggers callbacks.

Will automatically call registered callbacks for any such queued events, including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, etc.

Remarks
Use delete to free the message.
An application should make sure to call consume() at regular intervals, even if no messages are expected, to serve any queued callbacks waiting to be called. This is especially important when a RebalanceCb has been registered as it needs to be called and handled properly to synchronize internal consumer state.
Application MUST NOT call poll() on KafkaConsumer objects.
Returns
One of:

◆ commitSync() [1/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( )
pure virtual

Commit offsets for the current assignment.

Remarks
This is the synchronous variant that blocks until offsets are committed or the commit fails (see return value).
If a RdKafka::OffsetCommitCb callback is registered it will be called with commit details on a future call to RdKafka::KafkaConsumer::consume()
Returns
ERR_NO_ERROR or error code.

◆ commitAsync() [1/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( )
pure virtual

Asynchronous version of RdKafka::KafkaConsumer::CommitSync()

See also
RdKafka::KafkaConsummer::commitSync()

◆ commitSync() [2/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
This is the synchronous variant.
See also
RdKafka::KafkaConsummer::commitSync()

◆ commitAsync() [2/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
This is the asynchronous variant.
See also
RdKafka::KafkaConsummer::commitSync()

◆ commitSync() [3/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( std::vector< TopicPartition *> &  offsets)
pure virtual

Commit offsets for the provided list of partitions.

Remarks
This is the synchronous variant.

◆ commitAsync() [3/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( const std::vector< TopicPartition *> &  offsets)
pure virtual

Commit offset for the provided list of partitions.

Remarks
This is the asynchronous variant.

◆ committed()

virtual ErrorCode RdKafka::KafkaConsumer::committed ( std::vector< TopicPartition *> &  partitions,
int  timeout_ms 
)
pure virtual

Retrieve committed offsets for topics+partitions.

Returns
RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the offset or err field of each partitions' element is filled in with the stored offset, or a partition specific error. Else returns an error code.

◆ position()

virtual ErrorCode RdKafka::KafkaConsumer::position ( std::vector< TopicPartition *> &  partitions)
pure virtual

Retrieve current positions (offsets) for topics+partitions.

Returns
RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the offset or err field of each partitions' element is filled in with the stored offset, or a partition specific error. Else returns an error code.

◆ close()

virtual ErrorCode RdKafka::KafkaConsumer::close ( )
pure virtual

Close and shut down the proper.

For pausing and resuming consumption, see

See also
RdKafka::Handle::pause() and RdKafka::Handle::resume() This call will block until the following operations are finished:
  • Trigger a local rebalance to void the current assignment
  • Stop consumption for current assignment
  • Commit offsets
  • Leave group

The maximum blocking time is roughly limited to session.timeout.ms.

Remarks
Callbacks, such as RdKafka::RebalanceCb and RdKafka::OffsetCommitCb, etc, may be called.
The consumer object must later be freed with delete

The documentation for this class was generated from the following file: