Class Producer
Implements a high-level Apache Kafka producer (without serialization).
[UNSTABLE-API] We are considering making this class private in a future version so as to limit API surface area. Prefer to use the serializing producer Producer<TKey, TValue> where possible. Please let us know if you find the GetSerializingProducer<TKey, TValue>(ISerializer<TKey>, ISerializer<TValue>) method useful.
Inheritance
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Producer : IDisposable
Constructors
Producer(IEnumerable<KeyValuePair<String, Object>>)
Initializes a new Producer instance.
Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<KeyValuePair<System.String, System.Object>> | config | librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. |
Producer(IEnumerable<KeyValuePair<String, Object>>, Boolean, Boolean)
Initializes a new Producer instance.
Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPoll, bool disableDeliveryReports)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<KeyValuePair<System.String, System.Object>> | config | librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. |
| System.Boolean | manualPoll | If true, does not start a dedicated polling thread to trigger events or receive delivery reports - you must call the Poll method periodically instead. Typically you should set this parameter to false. |
| System.Boolean | disableDeliveryReports | If true, disables delivery report notification. Note: if set to true and you use a ProduceAsync variant that returns a Task, the Tasks will never complete. Typically you should set this parameter to false. Set it to true for "fire and forget" semantics and a small boost in performance. |
Properties
Name
Gets the name of this producer instance. Contains (but is not equal to) the client.id configuration parameter.
Declaration
public string Name { get; }
Property Value
| Type | Description |
|---|---|
| System.String |
Remarks
This name will be unique across all producer instances in a given application which allows log messages to be associated with the corresponding instance.
Methods
AddBrokers(String)
Adds one or more brokers to the Producer's list of initial bootstrap brokers.
Note: Additional brokers are discovered automatically as soon as the Producer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.
Declaration
public int AddBrokers(string brokers)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | brokers | Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter. |
Returns
| Type | Description |
|---|---|
| System.Int32 | The number of brokers added. This value includes brokers that may have been specified a second time. |
Remarks
There is currently no API to remove existing configured, added or learnt brokers.
Dispose()
Releases all resources used by this Producer.
Declaration
public void Dispose()
Remarks
You will often want to call Flush() before disposing a Producer instance.
Flush()
Equivalent to Flush(Int32) with infinite timeout.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush()
Returns
| Type | Description |
|---|---|
| System.Int32 | Refer to Flush(Int32). |
Flush(Int32)
Wait until all outstanding produce requests and delievery report callbacks are completed.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(int millisecondsTimeout)
Parameters
| Type | Name | Description |
|---|---|---|
| System.Int32 | millisecondsTimeout | The maximum time to block in milliseconds or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
| Type | Description |
|---|---|
| System.Int32 | The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by an internal librdkafka implementation detail). |
Remarks
This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the millisecondsTimeout parameter.
A related default.topic.config configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver the message before giving up and so also affects the maximum time a call to Flush may block.
Flush(TimeSpan)
Wait until all outstanding produce requests and delievery report callbacks are completed. Refer to Flush(Int32) for more information.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan | timeout | The maximum length of time to block. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
| Type | Description |
|---|---|
| System.Int32 | The current librdkafka out queue length. Refer to Flush(Int32) for more information. |
GetMetadata()
Refer to GetMetadata(Boolean, String, TimeSpan)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata()
Returns
| Type | Description |
|---|---|
| Metadata |
GetMetadata(Boolean, String)
Refer to GetMetadata(Boolean, String, TimeSpan)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic)
Parameters
| Type | Name | Description |
|---|---|---|
| System.Boolean | allTopics | |
| System.String | topic |
Returns
| Type | Description |
|---|---|
| Metadata |
GetMetadata(Boolean, String, TimeSpan)
Query the cluster for metadata (blocking).
- allTopics = true - request all topics from cluster
- allTopics = false, topic = null - request only locally known topics.
- allTopics = false, topic = valid - request specific topic
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic, TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| System.Boolean | allTopics | |
| System.String | topic | |
| TimeSpan | timeout |
Returns
| Type | Description |
|---|---|
| Metadata |
GetSerializingProducer<TKey, TValue>(ISerializer<TKey>, ISerializer<TValue>)
Returns a serializing producer that uses this Producer to produce messages. The same underlying Producer can be used as the basis of many serializing producers (potentially with different TKey and TValue types). Threadsafe.
Declaration
public ISerializingProducer<TKey, TValue> GetSerializingProducer<TKey, TValue>(ISerializer<TKey> keySerializer, ISerializer<TValue> valueSerializer)
Parameters
| Type | Name | Description |
|---|---|---|
| ISerializer<TKey> | keySerializer | The key serializer. |
| ISerializer<TValue> | valueSerializer | The value serializer. |
Returns
| Type | Description |
|---|---|
| ISerializingProducer<TKey, TValue> |
Type Parameters
| Name | Description |
|---|---|
| TKey | The key type. |
| TValue | The value type. |
ListGroup(String)
Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | group | The group of interest. |
Returns
| Type | Description |
|---|---|
| GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroup(String, TimeSpan)
Get information pertaining to a particular group in the Kafka cluster (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | group | The group of interest. |
| TimeSpan | timeout | The maximum period of time the call may block. |
Returns
| Type | Description |
|---|---|
| GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroups(TimeSpan)
Get information pertaining to all groups in the Kafka cluster (blocking)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan | timeout | The maximum period of time the call may block. |
Returns
| Type | Description |
|---|---|
| List<GroupInfo> |
Poll()
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled. Blocks until there is a callback event ready to be served.
Declaration
public int Poll()
Returns
| Type | Description |
|---|---|
| System.Int32 | Returns the number of events served. |
Poll(Int32)
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.
Declaration
public int Poll(int millisecondsTimeout)
Parameters
| Type | Name | Description |
|---|---|---|
| System.Int32 | millisecondsTimeout | The maximum period of time to block (in milliseconds) if no callback events are waiting or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
| Type | Description |
|---|---|
| System.Int32 | Returns the number of events served. |
Poll(TimeSpan)
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.
Declaration
public int Poll(TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan | timeout | The maximum period of time to block if no callback events are waiting. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
| Type | Description |
|---|---|
| System.Int32 | Returns the number of events served. |
ProduceAsync(String, Byte[], Byte[])
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, byte[] val)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Byte[] | val |
Returns
| Type | Description |
|---|---|
| Task<Message> |
ProduceAsync(String, Byte[], Byte[], IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Byte[] | val | |
| IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength |
Returns
| Type | Description |
|---|---|
| Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| System.Boolean | blockIfQueueFull |
Returns
| Type | Description |
|---|---|
| Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| System.Boolean | blockIfQueueFull | |
| IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| System.Int32 | partition |
Returns
| Type | Description |
|---|---|
| Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, IDeliveryHandler deliveryHandler)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| System.Int32 | partition | |
| IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean)
Asynchronously send a single message to the broker.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | The target topic. |
| System.Byte[] | key | null, or a byte array that contains the message key. |
| System.Int32 | keyOffset | for non-null values, the offset into the key array of the
sub-array to use as the message key.
if |
| System.Int32 | keyLength | for non-null keys, the length of the sequence of bytes that
constitutes the key.
if |
| System.Byte[] | val | null, or a byte array that contains the message value. |
| System.Int32 | valOffset | for non-null values, the offset into the val array of the
sub-array to use as the message value.
if |
| System.Int32 | valLength | for non-null values, the length of the sequence of bytes that
constitutes the value.
if |
| System.Int32 | partition | The target partition (if -1, this is determined by the partitioner configured for the topic). |
| System.Boolean | blockIfQueueFull | Whether or not to block if the send queue is full. If false, a KafkaExcepion (with Error.Code == ErrorCode.Local_QueueFull) will be thrown if an attempt is made to produce a message and the send queue is full. Warning: blockIfQueueFull is set to true, background polling is disabled and Poll is not being called in another thread, this will block indefinitely. |
Returns
| Type | Description |
|---|---|
| Task<Message> | A Task which will complete with the corresponding delivery report for this request. |
Remarks
If you require strict ordering of delivery reports to be maintained, you should use a variant of ProduceAsync that takes an IDeliveryHandler parameter, not a variant that returns a Task<Message> because Tasks are completed on arbitrary thread pool threads and can be executed out of order.
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed).
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
| Type | Name | Description |
|---|---|---|
| System.String | topic | |
| System.Byte[] | key | |
| System.Int32 | keyOffset | |
| System.Int32 | keyLength | |
| System.Byte[] | val | |
| System.Int32 | valOffset | |
| System.Int32 | valLength | |
| System.Int32 | partition | |
| System.Boolean | blockIfQueueFull | |
| IDeliveryHandler | deliveryHandler |
Remarks
Notification of delivery reports is via an IDeliveryHandler instance. Use IDeliveryHandler variants of ProduceAsync if you require notification of delivery reports strictly in the order they were acknowledged by the broker / failed (failure may be via broker or local). IDeliveryHandler.HandleDeliveryReport callbacks are executed on the Poll thread.
Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
QueryWatermarkOffsets(TopicPartition)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
| Type | Name | Description |
|---|---|---|
| TopicPartition | topicPartition | The topic/partition of interest. |
Returns
| Type | Description |
|---|---|
| WatermarkOffsets | The requested WatermarkOffsets. |
QueryWatermarkOffsets(TopicPartition, TimeSpan)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TopicPartition | topicPartition | The topic/partition of interest. |
| TimeSpan | timeout | The maximum period of time the call may block. |
Returns
| Type | Description |
|---|---|
| WatermarkOffsets | The requested WatermarkOffsets. |
Events
OnError
Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic
Declaration
public event EventHandler<Error> OnError
Event Type
| Type | Description |
|---|---|
| EventHandler<Error> |
Remarks
Called on the Producer poll thread.
OnLog
Raised when there is information that should be logged.
Declaration
public event EventHandler<LogMessage> OnLog
Event Type
| Type | Description |
|---|---|
| EventHandler<LogMessage> |
Remarks
By default not many log messages are generated.
You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.
Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.
OnStatistics
Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics
Declaration
public event EventHandler<string> OnStatistics
Event Type
| Type | Description |
|---|---|
| EventHandler<System.String> |
Remarks
You can enable statistics and set the statistics interval using the statistics.interval.ms configuration parameter (disabled by default).
Called on the Producer poll thread.