Welcome to Confluent’s Apache Kafka Python client documentation

Indices and tables

confluent_kafka — Confluent’s Apache Kafka Python client

Consumer

class confluent_kafka.Consumer

High-level Kafka Consumer

Consumer(**kwargs)
Create new Consumer instance using provided configuration dict.
Special configuration properties:
on_commit: Optional callback will be called when a commit request has succeeded or failed.
on_commit(err, partitions)
Parameters:
  • consumer (Consumer) – Consumer instance.
  • err (KafkaError) – Commit error object, or None on success.
  • partitions (list(TopicPartition)) – List of partitions with their committed offsets or per-partition errors.
assign()
assign(partitions)

Set consumer partition assignment to the provided list of TopicPartition and starts consuming.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming.
assignment()
assignment()

Returns the current partition assignment.

Returns:List of assigned topic+partitions.
Return type:list(TopicPartition)
Raises:KafkaException
close()

Close down and terminate the Kafka Consumer.

Actions performed:

  • Stops consuming
  • Commits offsets - except if the consumer property ‘enable.auto.commit’ is set to False
  • Leave consumer group
Return type:None
commit()
commit([message=None][, offsets=None][, async=True])

Commit a message or a list of offsets.

message and offsets are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead. The consumer relies on your use of this method if you have set ‘enable.auto.commit’ to False

Parameters:
  • message (confluent_kafka.Message) – Commit message’s offset+1.
  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
  • async (bool) – Asynchronous commit, return immediately.
Return type:

None

Raises:

KafkaException

committed()
committed(partitions[, timeout=None])

Retrieve committed offsets for the list of partitions.

Parameters:
  • partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
  • timeout (float) – Request timeout
Returns:

List of topic+partitions with offset and possibly error set.

Return type:

list(TopicPartition)

Raises:

KafkaException

get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])

Retrieve low and high offsets for partition.

Parameters:
  • partition (TopicPartition) – Topic+partition to return offsets for. :param float timeout: Request timeout (when cached=False).
  • cached (bool) – Instead of querying the broker used cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition. :returns: Tuple of (low,high) on success or None on timeout.
Return type:

tuple(int,int)

Raises:

KafkaException

poll()
poll([timeout=None])

Consume messages, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameters:timeout (float) – Maximum time to block waiting for message, event or callback.
Returns:A Message object or None on timeout
Return type:Message or None
position()
position(partitions[, timeout=None])

Retrieve current positions (offsets) for the list of partitions.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Returns:List of topic+partitions with offset and possibly error set.
Return type:list(TopicPartition)
Raises:KafkaException
subscribe()
subscribe(topics[, listener=None])

Set subscription to supplied list of topics This replaces a previous subscription.

Regexp pattern subscriptions are supported by prefixing the topic string with "^", e.g.:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters:
  • topics (list(str)) – List of topics (strings) to subscribe to.
  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
Raises:

KafkaException

on_assign(consumer, partitions)
on_revoke(consumer, partitions)
Parameters:
  • consumer (Consumer) – Consumer instance.
  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.
unassign()

Removes the current partition assignment and stops consuming. :raises: KafkaException

unsubscribe()

Remove current subscription. :raises: KafkaException

Producer

class confluent_kafka.Producer

Asynchronous Kafka Producer

Producer(**kwargs)

Create new Producer instance using provided configuration dict.

len()
Returns:Number of messages and Kafka protocol requests waiting to be delivered to broker.
Return type:int
flush()
flush([timeout])
Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.
Param:float timeout: Maximum time to block (requires librdkafka >= v0.9.4).
Returns:Number of messages still in queue.

Note

See poll() for a description on what callbacks may be triggered.

poll()
poll([timeout])

Polls the producer for events and calls the corresponding callbacks (if registered).

Callbacks:

Parameters:timeout (float) – Maximum time to block waiting for events.
Returns:Number of events processed (callbacks served)
Return type:int
produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp])

Produce message to topic. This is an asynchronous operation, an application may use the callback (alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Parameters:
  • topic (str) – Topic to produce message to
  • value (str|bytes) – Message payload
  • key (str|bytes) – Message key
  • partition (int) – Partition to produce to, elses uses the configured partitioner.
  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
  • timestamp (int) – Message timestamp (CreateTime) in microseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
Return type:

None

Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)
  • KafkaException – for other errors, see exception code
  • NotImplementedError – if timestamp is specified without underlying library support.

Message

class confluent_kafka.Message

The Message object represents either a single consumed or produced message, or an event (error() is not None).

An application must check with error() to see if the object is a proper message (error() returns None) or an error/event.

This class is not user-instantiable.

len()
Returns:Message value (payload) size in bytes
Return type:int
error()

The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)

Return type:None or KafkaError
key()
Returns:message key or None if not available.
Return type:str|bytes or None
offset()
Returns:message offset or None if not available.
Return type:int or None
partition()
Returns:partition number or None if not available.
Return type:int or None
set_key()

Set the field ‘Message.key’ with new value. :param: object value: Message.key. :returns: None. :rtype: None

set_value()

Set the field ‘Message.value’ with new value. :param: object value: Message.value. :returns: None. :rtype: None

timestamp()

Retrieve timestamp type and timestamp from message. The timestamp type is one of:

  • TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker
  • TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time)
  • TIMESTAMP_LOG_APPEND_TIME - Broker receive time

The returned timestamp should be ignored if the timestamp type is TIMESTAMP_NOT_AVAILABLE.

The timestamp is the number of milliseconds since the epoch (UTC).

Timestamps require broker version 0.10.0.0 or later and {'api.version.request': True} configured on the client.

Returns:tuple of message timestamp type, and timestamp.
Return type:(int, int)
topic()
Returns:topic name or None if not available.
Return type:str or None
value()
Returns:message value (payload) or None if not available.
Return type:str|bytes or None

TopicPartition

class confluent_kafka.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it.

It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

TopicPartition(topic[, partition][, offset])

Instantiate a TopicPartition object.

Parameters:
  • topic (string) – Topic name
  • partition (int) – Partition id
  • offset (int) – Initial partition offset
Return type:

TopicPartition

error

:py:attribute: Indicates an error (with KafkaError) unless None.

offset

:py:attribute: Offset (long) Either an absolute offset (>=0) or a logical offset: OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID

partition

:py:attribute: Partition number (int)

topic

:py:attribute:topic - Topic name (string)

KafkaError

class confluent_kafka.KafkaError

Kafka error and event object

The KafkaError class serves multiple purposes:

  • Propagation of errors
  • Propagation of events
  • Exceptions

This class is not user-instantiable.

Error and event constants:

Constant Description
_BAD_MSG Local: Bad message format
_BAD_COMPRESSION Local: Invalid compressed data
_DESTROY Local: Broker handle destroyed
_FAIL Local: Communication failure with broker
_TRANSPORT Local: Broker transport failure
_CRIT_SYS_RESOURCE Local: Critical system resource failure
_RESOLVE Local: Host resolution failure
_MSG_TIMED_OUT Local: Message timed out
_PARTITION_EOF Broker: No more messages
_UNKNOWN_PARTITION Local: Unknown partition
_FS Local: File or filesystem error
_UNKNOWN_TOPIC Local: Unknown topic
_ALL_BROKERS_DOWN Local: All broker connections are down
_INVALID_ARG Local: Invalid argument or configuration
_TIMED_OUT Local: Timed out
_QUEUE_FULL Local: Queue full
_ISR_INSUFF Local: ISR count insufficient
_NODE_UPDATE Local: Broker node update
_SSL Local: SSL error
_WAIT_COORD Local: Waiting for coordinator
_UNKNOWN_GROUP Local: Unknown group
_IN_PROGRESS Local: Operation in progress
_PREV_IN_PROGRESS Local: Previous operation in progress
_EXISTING_SUBSCRIPTION Local: Existing subscription
_ASSIGN_PARTITIONS Local: Assign partitions
_REVOKE_PARTITIONS Local: Revoke partitions
_CONFLICT Local: Conflicting use
_STATE Local: Erroneous state
_UNKNOWN_PROTOCOL Local: Unknown protocol
_NOT_IMPLEMENTED Local: Not implemented
_AUTHENTICATION Local: Authentication failure
_NO_OFFSET Local: No offset stored
_OUTDATED Local: Outdated
_TIMED_OUT_QUEUE Local: Timed out in queue
_UNSUPPORTED_FEATURE Local: Required feature not supported by broker
_WAIT_CACHE Local: Awaiting cache update
_INTR Local: Operation interrupted
_KEY_SERIALIZATION Local: Key serialization error
_VALUE_SERIALIZATION Local: Value serialization error
_KEY_DESERIALIZATION Local: Key deserialization error
_VALUE_DESERIALIZATION Local: Value deserialization error
UNKNOWN Unknown broker error
NO_ERROR Success
OFFSET_OUT_OF_RANGE Broker: Offset out of range
INVALID_MSG Broker: Invalid message
UNKNOWN_TOPIC_OR_PART Broker: Unknown topic or partition
INVALID_MSG_SIZE Broker: Invalid message size
LEADER_NOT_AVAILABLE Broker: Leader not available
NOT_LEADER_FOR_PARTITION Broker: Not leader for partition
REQUEST_TIMED_OUT Broker: Request timed out
BROKER_NOT_AVAILABLE Broker: Broker not available
REPLICA_NOT_AVAILABLE Broker: Replica not available
MSG_SIZE_TOO_LARGE Broker: Message size too large
STALE_CTRL_EPOCH Broker: StaleControllerEpochCode
OFFSET_METADATA_TOO_LARGE Broker: Offset metadata string too large
NETWORK_EXCEPTION Broker: Broker disconnected before response received
GROUP_LOAD_IN_PROGRESS Broker: Group coordinator load in progress
GROUP_COORDINATOR_NOT_AVAILABLE Broker: Group coordinator not available
NOT_COORDINATOR_FOR_GROUP Broker: Not coordinator for group
TOPIC_EXCEPTION Broker: Invalid topic
RECORD_LIST_TOO_LARGE Broker: Message batch larger than configured server segment size
NOT_ENOUGH_REPLICAS Broker: Not enough in-sync replicas
NOT_ENOUGH_REPLICAS_AFTER_APPEND Broker: Message(s) written to insufficient number of in-sync replicas
INVALID_REQUIRED_ACKS Broker: Invalid required acks value
ILLEGAL_GENERATION Broker: Specified group generation id is not valid
INCONSISTENT_GROUP_PROTOCOL Broker: Inconsistent group protocol
INVALID_GROUP_ID Broker: Invalid group.id
UNKNOWN_MEMBER_ID Broker: Unknown member
INVALID_SESSION_TIMEOUT Broker: Invalid session timeout
REBALANCE_IN_PROGRESS Broker: Group rebalance in progress
INVALID_COMMIT_OFFSET_SIZE Broker: Commit offset data size is not valid
TOPIC_AUTHORIZATION_FAILED Broker: Topic authorization failed
GROUP_AUTHORIZATION_FAILED Broker: Group authorization failed
CLUSTER_AUTHORIZATION_FAILED Broker: Cluster authorization failed
INVALID_TIMESTAMP Broker: Invalid timestamp
UNSUPPORTED_SASL_MECHANISM Broker: Unsupported SASL mechanism
ILLEGAL_SASL_STATE Broker: Request not valid in current SASL state
UNSUPPORTED_VERSION Broker: API version not supported
TOPIC_ALREADY_EXISTS Broker: Topic already exists
INVALID_PARTITIONS Broker: Invalid number of partitions
INVALID_REPLICATION_FACTOR Broker: Invalid replication factor
INVALID_REPLICA_ASSIGNMENT Broker: Invalid replica assignment
INVALID_CONFIG Broker: Configuration is invalid
NOT_CONTROLLER Broker: Not controller for cluster
INVALID_REQUEST Broker: Invalid request
UNSUPPORTED_FOR_MESSAGE_FORMAT Broker: Message format on broker does not support request
POLICY_VIOLATION Broker: Isolation policy volation
OUT_OF_ORDER_SEQUENCE_NUMBER Broker: Broker received an out of order sequence number
DUPLICATE_SEQUENCE_NUMBER Broker: Broker received a duplicate sequence number
INVALID_PRODUCER_EPOCH Broker: Producer attempted an operation with an old epoch
INVALID_TXN_STATE Broker: Producer attempted a transactional operation in an invalid state
INVALID_PRODUCER_ID_MAPPING Broker: Producer attempted to use a producer id which is not currently assigned to its transactional
INVALID_TRANSACTION_TIMEOUT Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction
CONCURRENT_TRANSACTIONS Broker: Producer attempted to update a transaction while another concurrent operation on the same tr
TRANSACTION_COORDINATOR_FENCED Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current
TRANSACTIONAL_ID_AUTHORIZATION_FAILED Broker: Transactional Id authorization failed
SECURITY_DISABLED Broker: Security features are disabled
OPERATION_NOT_ATTEMPTED Broker: Operation not attempted
code()

Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.

Returns:error/event code
Return type:int
name()

Returns the enum name for error/event.

Returns:error/event enum name string
Return type:str
str()

Returns the human-readable error/event string.

Returns:error/event message string
Return type:str

KafkaException

class confluent_kafka.KafkaException

Kafka exception that wraps the KafkaError class.

Use exception.args[0] to extract the KafkaError object

Offset

Logical offset constants:

  • OFFSET_BEGINNING - Beginning of partition (oldest offset)
  • OFFSET_END - End of partition (next offset)
  • OFFSET_STORED - Use stored/committed offset
  • OFFSET_INVALID - Invalid/Default offset

Configuration

Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.:

conf = {'bootstrap.servers': 'mybroker.com',
        'group.id': 'mygroup', 'session.timeout.ms': 6000,
        'on_commit': my_commit_callback,
        'default.topic.config': {'auto.offset.reset': 'smallest'}}
consumer = confluent_kafka.Consumer(**conf)

The supported configuration values are dictated by the underlying librdkafka C library. For the full range of configuration properties please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

The Python bindings also provide some additional configuration properties:

  • default.topic.config: value is a dict of topic-level configuration properties that are applied to all used topics for the instance.
  • error_cb(kafka.KafkaError): Callback for generic/global error events. This callback is served by poll().
  • stats_cb(json_str): Callback for statistics data. This callback is triggered by poll() every statistics.interval.ms (needs to be configured separately). Function argument json_str is a str instance of a JSON document containing statistics data.
  • on_delivery(kafka.KafkaError, kafka.Message) (Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passing callback=callable (or on_delivery=callable) to the confluent_kafka.Producer.produce() function.
  • on_commit(kafka.KafkaError, list(kafka.TopicPartition)) (Consumer): Callback used to indicate success or failure of commit requests.

Changelog

  • Handle null/None values during deserialization
  • Allow to pass custom schema registry instance.
  • None conf values are now converted to NULL rather than the string “None” (#133)
  • Fix memory leaks when certain exceptions were raised.
  • Handle delivery.report.only.error in Python (#84)
  • Proper use of Message error string on Producer (#129)
  • Now Flake8 clean
  • Unlock GIL for Consumer_close’s rd_kafka_destroy()
  • Unlock GIL on Producer’s librdkafka destroy call (#107)
  • Add optional timeout argument to Producer.flush() (#105)
  • Added offset constants
  • Added Consumer.get_watermark_offsets() (#31)
  • Added Consumer.assignment() API
  • Add timestamp= arg to produce()
  • replace from .cimpl import * with explicit names. (#87)
  • Dont delete unset tlskey (closes #78)
  • AvroConsumer for handling schema registry (#80)
  • Fix open issue #73 – TopicPartition_str0 broken on Mac OS X (#83)
  • Producer client for handling avro schemas (#40, @roopahc, @criccomini)
  • enable.auto.commit behavior consequences on close() and commit() (#77)
  • Consumer, Producer, TopicPartition classes are now sub-classable
  • commit() without msg/offset args would call C commit() twice (#71)
  • Consumer: set up callstate on dealloc to allow callbacks (#66)
  • Added statistics callback support (#43)
  • Add timestamp() to Messages
  • on_commit: handle NULL offsets list (on error)
  • Fix 32-bit arch build warnings
  • Destroy rd_kafka_t handle on consumer.close() (#30)
  • Handle None error_cb and dr_cb
  • Added CallState to track per-thread C call state (fixes #19)
  • Make sure to GC on_commit callable
  • PR-3 - Add /usr/local/lib to library_dirs in setup
  • PR-4 - Py3: use bytes for Message payload and key
  • PR-5 - Removed hard coded c extentions lib/include paths
  • PR-9 - Use consistent syntax highlighting (e.g. prefix commands with $)
  • PR-17 - Version bump to 0.9.1.2