Kafka Producers¶
The Confluent Platform includes the Java producer shipped with Kafka itself, the C/C++ client library librdkafka, and clients for Python, Go and .NET. This section gives a high-level overview of how the producer works, an introduction to the configuration settings for tuning, and some examples from each client library.
Concepts¶
The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. Its main function is to map each message to a topic partition and send a produce request to the leader of that partition. It does the first of these with a partitioner, which typically selects a partition using a hash function. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition. If no key is provided, then the partition is selected in a round-robin fashion to ensure an even distribution across the topic partitions.
Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas. Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.
Messages written to the partition leader are not immediately readable
by consumers regardless of the producer’s acknowledgement settings.
When all in-sync replicas have acknowledged the write, then the
message is considered committed, which makes it available for
reading. This ensures that messages cannot be lost by a broker failure
after they have already been read. Note that this implies that
messages which were acknowledged by the leader only (i.e. acks=1
)
can be lost if the partition leader fails before the replicas have
copied the message. Nevertheless, this is often a reasonable
compromise in practice to ensure durability in most cases while not
impacting throughput too significantly.
Most of the subtlety around producers is tied to achieving high throughput with batching/compression and ensuring message delivery guarantees as mentioned above. In the next section, we discuss the most common settings to tune producer behavior.
Configuration¶
The full list of configuration settings are available in the Kafka documentation. Below we highlight several of the key configuration settings and how they affect the producer’s behavior.
Core Configuration: You are required to set the
bootstrap.servers
property so that the producer can find the Kafka
cluster. Although not required, we recommend always setting a
client.id
since this allows you to easily correlate requests on
the broker with the client instance which made it. These settings are
the same for Java, C/C++, Python, Go and .NET clients.
Message Durability: You can control the durability of messages
written to Kafka through the acks
setting. The default value of
“1” requires an explicit acknowledgement from the partition leader
that the write succeeded. The strongest guarantee that Kafka provides
is with “acks=all”, which guarantees that not only did the partition
leader accept the write, but it was successfully replicated to all of
the in-sync replicas. You can also use a value of “0” to maximize
throughput, but you will have no guarantee that the message was
successfully written to the broker’s log since the broker does not
even send a response in this case. This also means that you will not
be able to determine the offset of the message. Note that for the C/C++,
Python, Go and .NET clients, this is a per-topic configuration, but
can be applied globally using the default_topic_conf
sub-configuration
in C/C++ and default.topic.config
sub-configuration in Python, Go
and .NET.
Message Ordering: In general, messages are written to the broker
in the same order that they are received by the producer client.
However, if you enable message retries by setting retries
to a
value larger than 0 (which is the default), then message reordering
may occur since the retry may occur after a following write
succeeded. To enable retries without reordering, you can set
max.in.flight.requests.per.connection
to 1 to ensure that only one
request can be sent to the broker at a time. Without retries enabled,
the broker will preserve the order of writes it receives, but there
could be gaps due to individual send failures.
Batching and Compression: Kafka producers attempt to collect sent
messages into batches to improve throughput. With the Java client, you
can use batch.size
to control the maximum size in bytes of each
message batch. To give more time for batches to fill, you can use
linger.ms
to have the producer delay sending. Compression can be
enabled with the compression.type
setting. Compression covers full
message batches, so larger batches will typically mean a higher
compression ratio.
With the C/C++, Python, Go and .NET clients, you can use
batch.num.messages
to set a limit on the number of messages contained
in each batch. To enable compression, use compression.codec
.
Queuing Limits: Use buffer.memory
to limit the total memory
that is available to the Java client for collecting unsent
messages. When this limit is hit, the producer will block on
additional sends for as long as max.block.ms
before raising an
exception. Additionally, to avoid keeping records queued indefinitely,
you can set a timeout using request.timeout.ms
. If this timeout
expires before a message can be successfully sent, then it will be removed
from the queue and an exception will be thrown.
The C/C++, Python, Go and .NET clients have similar settings. Use
queue.buffering.max.messages
to limit the total number of messages
that can be queued (for transmission, retries, or delivery reports) at
any given time. queue.buffering.max.ms
limits the amount of time
the client waits to fill up a batch before sending it to the broker.
Examples¶
Initial Setup¶
The Java producer is constructed with a standard Properties
file.
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "host1:9092,host2:9092");
config.put("acks", "all");
new KafkProducer<K, V>(config);
Configuration errors will result in a raised KafkaException
from
the constructor of KafkaProducer
. The main difference in
librdkafka is that we handle the errors for each setting directly:
char hostname[128];
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (gethostname(hostname, sizeof(hostname))) {
fprintf(stderr, "%% Failed to lookup hostname\n");
exit(1);
}
if (rd_kafka_conf_set(conf, "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
if (rd_kafka_topic_conf_set(topic_conf, "acks", "all",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Create Kafka producer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
exit(1);
}
In Python, Go and C#:
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'client.id': socket.gethostname(),
'default.topic.config': {'acks': 'all'}}
producer = Producer(conf)
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
)
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "host1:9092,host2:9092",
"client.id": socket.gethostname(),
"default.topic.config": kafka.ConfigMap{'acks': 'all'}
})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
using Confluent.Kafka;
using System.Net;
...
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "host1:9092,host2:9092" },
{ "client.id", Dns.GetHostName() },
{ "default.topic.config", new Dictionary<string, object>
{
{ "acks", "all" }
}
}
}
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
...
}
Asynchronous Writes¶
All writes are asynchronous by default. The Java producer includes a
send()
API which returns a future which can be polled to get the
result of the send.
final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
With librdkafka, you first need to create a rd_kafka_topic_t
handle for the topic you want to write to. Then you can use
rd_kafka_produce
to send messages to it. For example:
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
payload, payload_len,
key, key_len,
NULL) == -1) {
fprintf(stderr, "%% Failed to produce to topic %s: %s\n",
topic, rd_kafka_err2str(rd_kafka_errno2err(errno)));
}
You can pass topic-specific configuration in the third argument to
rd_kafka_topic_new
. Here we passed the topic_conf
we seeded
with a configuration for acknowledgments. Passing NULL
will cause
the producer to use the default configuration.
The second argument to rd_kafka_produce
can be used to set the
desired partition for the message. If set to RD_KAFKA_PARTITION_UA
,
as in this case, librdkafka will use the default partitioner to select
the partition for this message. The third argument indicates that
librdkafka should copy the payload and key, which would let us free it
upon returning.
In Python, you initiate a send by calling the produce
method, passing
in the value, and optionally a key, partition, and callback. The call will
return immediately and does not return a value.
producer.produce(topic, key="key", value="value")
Similarly, in Go you initiate a send by calling the Produce()
method, passing a Message` object and an optional ``chan Event
that can
be used to listen for the result of the send. The Message
object contains an opaque interface{}
field that can be
used to pass arbitrary data with the message to the subsequent event handler.
delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
Value: []byte(value)},
delivery_chan,
)
In C#, you initiate a send by calling one of the ProduceAsync method overloads on your Producer instance. For example:
producer.ProduceAsync("topic", key, value);
If you want to invoke some code after the write has completed you can also
provide a callback. In Java this is implemented as a Callback
object:
final ProducerRecord<K, V> = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
log.debug("Send failed for record {}", record, e);
}
});
In the Java implementation you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.
A similar feature is available in librdkafka, but we have to configure it on initialization:
static void on_delivery(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage
void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_message_errstr(rkmessage));
}
void init_rd_kafka() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);
// initialization ommitted
}
The delivery callback in librdkafka is invoked in the user’s thread by
calling rd_kafka_poll
. A common pattern is to call this function
after every call to the produce API, but this may not be sufficient to
ensure regular delivery reports if the message produce rate is not
steady. Note, however, that this API does not provide a direct way to
block for the result of any particular message delivery. If you need
to do this, then see the synchronous write example below.
In Python you can pass a callback
parameter, which can be any
callable, e.g. a lambda, function, bound method, or callable object. Although
the produce()
method enqueues the message immediately for batching,
compression and transmission to broker, it will not handle any events
(i.e. acknowledgements and callbacks they trigger) until poll()
is invoked.
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err))
else
print("Message produced: %s" % (str(msg))
producer.produce(topic, key="key", value="value", callback=acked)
# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)
In Go you can use the delivery report channel passed to Produce
to wait for the the result of the message send:
e := <-delivery_chan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(delivery_chan)
In C#, you have two options. First, you can use a variant of ProduceAsync that
returns a standard Task
object that you can await, handle using the
.ContinueWith
method, or wait on using the .Wait
or .WaitAll
methods:
var deliveryReportTask = producer.ProduceAsync("topic", key, val);
deliveryReportTask.ContinueWith(task =>
{
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
});
Alternatively, you can use a variant of .ProduceAsync
which takes an implementation of
IDeliveryHandler
. You should use use the latter approach if you require notification
of message delivery strictly in the order of broker acknowledgement (or delivery failure)
because Tasks
may complete on any thread pool thread, and so ordering is not
guaranteed.
Synchronous Writes¶
To make writes synchronous, just wait on the returned future. This would typically be a bad idea since it would kill throughput, but may be justified in some cases.
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
A similar capability could be achieved in C/C++ and Python using the
delivery callback, but it takes a bit more work. A full example can be
found here.
The Python client also contains a flush()
method which has the same
effect:
producer.produce(topic, key="key", value="value")
producer.flush()
In Go, receive from the delivery channel passed to the Produce()
method call:
delivery_chan := make(chan kafka.Event, 10000)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: "topic", Partition: kafka.PartitionAny},
Value: []byte(value)},
delivery_chan
)
e := <-delivery_chan
m := e.(*kafka.Message)
Or, to wait for all messages to be acknowledge, use the Flush()
method:
p.Flush()
Note that Flush()
will only serve the producer’s Events()
channel, not application-specified delivery
channels. If Flush()
is called and no goroutine is processing the delivery channel, its buffer may fill up and cause the
flush to timeout.
In C#, simply access the .Result
property of the Task
object returned
from .ProduceAsync
which will block until the delivery report is available:
var deliveryReport = producer.ProduceAsync("topic", key, value).Result;