Welcome to Confluent’s Apache Kafka Python client documentation¶
Indices and tables¶
confluent_kafka
— Confluent’s Apache Kafka Python client¶
Consumer¶
-
class
confluent_kafka.
Consumer
¶ High-level Kafka Consumer
-
Consumer
(**kwargs)¶ - Create new Consumer instance using provided configuration dict.
- Special configuration properties:
on_commit
: Optional callback will be called when a commit request has succeeded or failed.
-
on_commit
(err, partitions)¶ Parameters: - consumer (Consumer) – Consumer instance.
- err (KafkaError) – Commit error object, or None on success.
- partitions (list(TopicPartition)) – List of partitions with their committed offsets or per-partition errors.
-
assign
()¶ -
assign
(partitions)
Set consumer partition assignment to the provided list of
TopicPartition
and starts consuming.Parameters: partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming. -
-
assignment
()¶ -
assignment
()
Returns the current partition assignment.
Returns: List of assigned topic+partitions. Return type: list(TopicPartition) Raises: KafkaException -
-
close
()¶ Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming
- Commits offsets - except if the consumer property ‘enable.auto.commit’ is set to False
- Leave consumer group
Return type: None
-
commit
()¶ -
commit
([message=None][, offsets=None][, async=True])
Commit a message or a list of offsets.
message
andoffsets
are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead. The consumer relies on your use of this method if you have set ‘enable.auto.commit’ to FalseParameters: - message (confluent_kafka.Message) – Commit message’s offset+1.
- offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
- async (bool) – Asynchronous commit, return immediately.
Return type: None
Raises: KafkaException
-
-
committed
()¶ -
committed
(partitions[, timeout=None])
Retrieve committed offsets for the list of partitions.
Parameters: - partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
- timeout (float) – Request timeout
Returns: List of topic+partitions with offset and possibly error set.
Return type: list(TopicPartition)
Raises: KafkaException
-
-
get_watermark_offsets
()¶ -
get_watermark_offsets
(partition[, timeout=None][, cached=False])
Retrieve low and high offsets for partition.
Parameters: - partition (TopicPartition) – Topic+partition to return offsets for. :param float timeout: Request timeout (when cached=False).
- cached (bool) – Instead of querying the broker used cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition. :returns: Tuple of (low,high) on success or None on timeout.
Return type: tuple(int,int)
Raises: KafkaException
-
-
poll
()¶ -
poll
([timeout=None])
Consume messages, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).Parameters: timeout (float) – Maximum time to block waiting for message, event or callback. Returns: A Message object or None on timeout Return type: Message
or None-
-
position
()¶ -
position
(partitions[, timeout=None])
Retrieve current positions (offsets) for the list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1. Returns: List of topic+partitions with offset and possibly error set. Return type: list(TopicPartition) Raises: KafkaException -
-
subscribe
()¶ -
subscribe
(topics[, listener=None]) Set subscription to supplied list of topics This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with
"^"
, e.g.:consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters: - topics (list(str)) – List of topics (strings) to subscribe to.
- on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
- on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
Raises: KafkaException –
-
on_assign
(consumer, partitions)¶
-
-
unassign
()¶ Removes the current partition assignment and stops consuming. :raises: KafkaException
-
unsubscribe
()¶ Remove current subscription. :raises: KafkaException
-
Producer¶
-
class
confluent_kafka.
Producer
¶ Asynchronous Kafka Producer
-
Producer
(**kwargs)¶ Create new Producer instance using provided configuration dict.
-
len
()¶ Returns: Number of messages and Kafka protocol requests waiting to be delivered to broker. Return type: int
-
flush
()¶ -
flush
([timeout]) Param: float timeout: Maximum time to block (requires librdkafka >= v0.9.4). Returns: Number of messages still in queue.
Note
See
poll()
for a description on what callbacks may be triggered.-
-
poll
()¶ -
poll
([timeout])
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
on_delivery
callbacks fromproduce()
- ...
Parameters: timeout (float) – Maximum time to block waiting for events. Returns: Number of events processed (callbacks served) Return type: int -
-
produce
()¶ -
produce
(topic[, value][, key][, partition][, on_delivery][, timestamp])
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Parameters: - topic (str) – Topic to produce message to
- value (str|bytes) – Message payload
- key (str|bytes) – Message key
- partition (int) – Partition to produce to, elses uses the configured partitioner.
- on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed delivery - timestamp (int) – Message timestamp (CreateTime) in microseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
Return type: None
Raises: - BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded) - KafkaException – for other errors, see exception code
- NotImplementedError – if timestamp is specified without underlying library support.
-
-
Message¶
-
class
confluent_kafka.
Message
¶ The Message object represents either a single consumed or produced message, or an event (
error()
is not None).An application must check with
error()
to see if the object is a proper message (error() returns None) or an error/event.This class is not user-instantiable.
-
len
()¶ Returns: Message value (payload) size in bytes Return type: int
-
error
()¶ The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)
Return type: None or KafkaError
-
key
()¶ Returns: message key or None if not available. Return type: str|bytes or None
-
offset
()¶ Returns: message offset or None if not available. Return type: int or None
-
partition
()¶ Returns: partition number or None if not available. Return type: int or None
-
set_key
()¶ Set the field ‘Message.key’ with new value. :param: object value: Message.key. :returns: None. :rtype: None
-
set_value
()¶ Set the field ‘Message.value’ with new value. :param: object value: Message.value. :returns: None. :rtype: None
-
timestamp
()¶ Retrieve timestamp type and timestamp from message. The timestamp type is one of:
TIMESTAMP_NOT_AVAILABLE
- Timestamps not supported by brokerTIMESTAMP_CREATE_TIME
- Message creation time (or source / producer time)TIMESTAMP_LOG_APPEND_TIME
- Broker receive time
The returned timestamp should be ignored if the timestamp type is
TIMESTAMP_NOT_AVAILABLE
.The timestamp is the number of milliseconds since the epoch (UTC).
Timestamps require broker version 0.10.0.0 or later and
{'api.version.request': True}
configured on the client.Returns: tuple of message timestamp type, and timestamp. Return type: (int, int)
-
topic
()¶ Returns: topic name or None if not available. Return type: str or None
-
value
()¶ Returns: message value (payload) or None if not available. Return type: str|bytes or None
-
TopicPartition¶
-
class
confluent_kafka.
TopicPartition
¶ TopicPartition is a generic type to hold a single partition and various information about it.
It is typically used to provide a list of topics or partitions for various operations, such as
Consumer.assign()
.-
TopicPartition
(topic[, partition][, offset])¶ Instantiate a TopicPartition object.
Parameters: - topic (string) – Topic name
- partition (int) – Partition id
- offset (int) – Initial partition offset
Return type:
-
error
¶ :py:attribute: Indicates an error (with
KafkaError
) unless None.
-
offset
¶ :py:attribute: Offset (long) Either an absolute offset (>=0) or a logical offset:
OFFSET_BEGINNING
,OFFSET_END
,OFFSET_STORED
,OFFSET_INVALID
-
partition
¶ :py:attribute: Partition number (int)
-
topic
¶ :py:attribute:topic - Topic name (string)
-
KafkaError¶
-
class
confluent_kafka.
KafkaError
¶ Kafka error and event object
The KafkaError class serves multiple purposes:
- Propagation of errors
- Propagation of events
- Exceptions
This class is not user-instantiable.
Error and event constants:
Constant Description _BAD_MSG Local: Bad message format _BAD_COMPRESSION Local: Invalid compressed data _DESTROY Local: Broker handle destroyed _FAIL Local: Communication failure with broker _TRANSPORT Local: Broker transport failure _CRIT_SYS_RESOURCE Local: Critical system resource failure _RESOLVE Local: Host resolution failure _MSG_TIMED_OUT Local: Message timed out _PARTITION_EOF Broker: No more messages _UNKNOWN_PARTITION Local: Unknown partition _FS Local: File or filesystem error _UNKNOWN_TOPIC Local: Unknown topic _ALL_BROKERS_DOWN Local: All broker connections are down _INVALID_ARG Local: Invalid argument or configuration _TIMED_OUT Local: Timed out _QUEUE_FULL Local: Queue full _ISR_INSUFF Local: ISR count insufficient _NODE_UPDATE Local: Broker node update _SSL Local: SSL error _WAIT_COORD Local: Waiting for coordinator _UNKNOWN_GROUP Local: Unknown group _IN_PROGRESS Local: Operation in progress _PREV_IN_PROGRESS Local: Previous operation in progress _EXISTING_SUBSCRIPTION Local: Existing subscription _ASSIGN_PARTITIONS Local: Assign partitions _REVOKE_PARTITIONS Local: Revoke partitions _CONFLICT Local: Conflicting use _STATE Local: Erroneous state _UNKNOWN_PROTOCOL Local: Unknown protocol _NOT_IMPLEMENTED Local: Not implemented _AUTHENTICATION Local: Authentication failure _NO_OFFSET Local: No offset stored _OUTDATED Local: Outdated UNKNOWN Unknown broker error NO_ERROR Success OFFSET_OUT_OF_RANGE Broker: Offset out of range INVALID_MSG Broker: Invalid message UNKNOWN_TOPIC_OR_PART Broker: Unknown topic or partition INVALID_MSG_SIZE Broker: Invalid message size LEADER_NOT_AVAILABLE Broker: Leader not available NOT_LEADER_FOR_PARTITION Broker: Not leader for partition REQUEST_TIMED_OUT Broker: Request timed out BROKER_NOT_AVAILABLE Broker: Broker not available REPLICA_NOT_AVAILABLE Broker: Replica not available MSG_SIZE_TOO_LARGE Broker: Message size too large STALE_CTRL_EPOCH Broker: StaleControllerEpochCode OFFSET_METADATA_TOO_LARGE Broker: Offset metadata string too large NETWORK_EXCEPTION Broker: Broker disconnected before response received GROUP_LOAD_IN_PROGRESS Broker: Group coordinator load in progress GROUP_COORDINATOR_NOT_AVAILABLE Broker: Group coordinator not available NOT_COORDINATOR_FOR_GROUP Broker: Not coordinator for group TOPIC_EXCEPTION Broker: Invalid topic RECORD_LIST_TOO_LARGE Broker: Message batch larger than configured server segment size NOT_ENOUGH_REPLICAS Broker: Not enough in-sync replicas NOT_ENOUGH_REPLICAS_AFTER_APPEND Broker: Message(s) written to insufficient number of in-sync replicas INVALID_REQUIRED_ACKS Broker: Invalid required acks value ILLEGAL_GENERATION Broker: Specified group generation id is not valid INCONSISTENT_GROUP_PROTOCOL Broker: Inconsistent group protocol INVALID_GROUP_ID Broker: Invalid group.id UNKNOWN_MEMBER_ID Broker: Unknown member INVALID_SESSION_TIMEOUT Broker: Invalid session timeout REBALANCE_IN_PROGRESS Broker: Group rebalance in progress INVALID_COMMIT_OFFSET_SIZE Broker: Commit offset data size is not valid TOPIC_AUTHORIZATION_FAILED Broker: Topic authorization failed GROUP_AUTHORIZATION_FAILED Broker: Group authorization failed CLUSTER_AUTHORIZATION_FAILED Broker: Cluster authorization failed INVALID_TIMESTAMP Broker: Invalid timestamp UNSUPPORTED_SASL_MECHANISM Broker: Unsupported SASL mechanism ILLEGAL_SASL_STATE Broker: Request not valid in current SASL state UNSUPPORTED_VERSION Broker: API version not supported -
code
()¶ Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.
Returns: error/event code Return type: int
KafkaException¶
-
class
confluent_kafka.
KafkaException
¶ Kafka exception that wraps the
KafkaError
class.Use
exception.args[0]
to extract theKafkaError
object
Offset¶
Logical offset constants:
OFFSET_BEGINNING
- Beginning of partition (oldest offset)OFFSET_END
- End of partition (next offset)OFFSET_STORED
- Use stored/committed offsetOFFSET_INVALID
- Invalid/Default offset
Configuration¶
Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.:
conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup', 'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
consumer = confluent_kafka.Consumer(**conf)
The supported configuration values are dictated by the underlying librdkafka C library. For the full range of configuration properties please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The Python bindings also provide some additional configuration properties:
default.topic.config
: value is a dict of topic-level configuration properties that are applied to all used topics for the instance.error_cb(kafka.KafkaError)
: Callback for generic/global error events. This callback is served by poll().stats_cb(json_str)
: Callback for statistics data. This callback is triggered by poll() everystatistics.interval.ms
(needs to be configured separately). Function argumentjson_str
is a str instance of a JSON document containing statistics data.on_delivery(kafka.KafkaError, kafka.Message)
(Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passingcallback=callable
(oron_delivery=callable
) to the confluent_kafka.Producer.produce() function.on_commit(kafka.KafkaError, list(kafka.TopicPartition))
(Consumer): Callback used to indicate success or failure of commit requests.