librdkafka
The Apache Kafka C/C++ client library
RdKafka::Producer Class Referenceabstract

Producer. More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::Producer:
Collaboration diagram for RdKafka::Producer:

Public Member Functions

virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
 Produce and send a single message to broker. More...
 
virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
 Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
 
virtual ErrorCode produce (Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
 Variant produce() that accepts vectors for key and payload. The vector data will be copied.
 
virtual ErrorCode flush (int timeout_ms)=0
 Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition *> &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition *> &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 

Static Public Member Functions

static Producercreate (Conf *conf, std::string &errstr)
 Creates a new Kafka producer handle. More...
 

Static Public Attributes

static const int RK_MSG_FREE = 0x1
 RdKafka::Producer::produce() msgflags. More...
 
static const int RK_MSG_COPY = 0x2
 
static const int RK_MSG_BLOCK = 0x4
 

Detailed Description

Member Function Documentation

◆ create()

static Producer* RdKafka::Producer::create ( Conf conf,
std::string &  errstr 
)
static

Creates a new Kafka producer handle.

conf is an optional object that will be used instead of the default configuration. The conf object is reusable after this call.

Returns
the new handle on success or NULL on error in which case errstr is set to a human readable error message.

◆ produce()

virtual ErrorCode RdKafka::Producer::produce ( Topic topic,
int32_t  partition,
int  msgflags,
void *  payload,
size_t  len,
const std::string *  key,
void *  msg_opaque 
)
pure virtual

Produce and send a single message to broker.

This is an asynch non-blocking API.

partition is the target partition, either:

msgflags is zero or more of the following flags OR:ed together: RK_MSG_BLOCK - block produce*() call if queue.buffering.max.messages or queue.buffering.max.kbytes are exceeded. Messages are considered in-queue from the point they are accepted by produce() until their corresponding delivery report callback/event returns. It is thus a requirement to call poll() (or equiv.) from a separate thread when RK_MSG_BLOCK is used. See WARNING on RK_MSG_BLOCK above. RK_MSG_FREE - rdkafka will free(3) payload when it is done with it. RK_MSG_COPY - the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns.

NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.

If the function returns -1 and RK_MSG_FREE was specified, then the memory associated with the payload is still the caller's responsibility.

payload is the message payload of size len bytes.

key is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.

msg_opaque is an optional application-provided per-message opaque pointer that will provided in the delivery report callback (dr_cb) for referencing this message.

Returns
an ErrorCode to indicate success or failure:
  • ERR__QUEUE_FULL - maximum number of outstanding messages has been reached: queue.buffering.max.message
  • ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: messages.max.bytes
  • ERR__UNKNOWN_PARTITION - requested partition is unknown in the Kafka cluster.
  • ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster.

◆ flush()

virtual ErrorCode RdKafka::Producer::flush ( int  timeout_ms)
pure virtual

Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

Remarks
This function will call poll() and thus trigger callbacks.
Returns
ERR__TIMED_OUT if timeout_ms was reached before all outstanding requests were completed, else ERR_NO_ERROR

Field Documentation

◆ RK_MSG_FREE

const int RdKafka::Producer::RK_MSG_FREE = 0x1
static

RdKafka::Producer::produce() msgflags.

These flags are optional and mutually exclusive.rdkafka will free(3) payload when it is done with it.

◆ RK_MSG_COPY

const int RdKafka::Producer::RK_MSG_COPY = 0x2
static

the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns.

◆ RK_MSG_BLOCK

const int RdKafka::Producer::RK_MSG_BLOCK = 0x4
static

Block produce*() on message queue full. WARNING: If a delivery report callback is used the application MUST call rd_kafka_poll() (or equiv.) to make sure delivered messages are drained from the internal delivery report queue. Failure to do so will result in indefinately blocking on the produce() call when the message queue is full.


The documentation for this class was generated from the following file: