...

Package kafka

import "github.com/confluentinc/confluent-kafka-go/kafka"
Overview
Index

Overview ▾

Package kafka provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library.

High-level Consumer

* Decide if you want to read messages and events from the `.Events()` channel (set `"go.events.channel.enable": true`) or by calling `.Poll()`.

* Create a Consumer with `kafka.NewConsumer()` providing at least the `bootstrap.servers` and `group.id` configuration properties.

* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) to join the group with the specified subscription set. Subscriptions are atomic, calling `.Subscribe*()` again will leave the group and rejoin with the new set of topics.

* Start reading events and messages from either the `.Events` channel or by calling `.Poll()`.

* When the group has rebalanced each client member is assigned a (sub-)set of topic+partitions. By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass `"go.application.rebalance.enable": true` to the `NewConsumer()` call mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}` which defaults to the `latest` message) and then call `.Assign(partitions)` to start consuming. If you don't need to modify the initial offsets you will not need to call `.Assign()`, the client will do so automatically for you if you dont.

* As messages are fetched they will be made available on either the `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.

* Handle messages, events and errors to your liking.

* When you are done consuming call `.Close()` to commit final offsets and leave the consumer group.

Producer

* Create a Producer with `kafka.NewProducer()` providing at least the `bootstrap.servers` configuration properties.

* Messages may now be produced either by sending a `*kafka.Message` on the `.ProduceChannel` or by calling `.Produce()`.

* Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` and you should check `msg.TopicPartition.Error` for `nil` to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil `chan Event` channel to `.Produce()`. If no delivery reports are wanted they can be completely disabled by setting configuration property `"go.delivery.reports": false`.

* When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function `.Flush()` that will block until all message deliveries are done or the provided timeout elapses.

* Finally call `.Close()` to decommission the producer.

Events

Apart from emitting messages and delivery reports the client also communicates with the application through a number of different event types. An application may choose to handle or ignore these events.

Consumer events

* `*kafka.Message` - a fetched message.

* `AssignedPartitions` - The assigned partition set for this client following a rebalance. Requires `go.application.rebalance.enable`

* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. `AssignedPartitions` and `RevokedPartitions` are symetrical. Requires `go.application.rebalance.enable`

* `PartitionEOF` - Consumer has reached the end of a partition. NOTE: The consumer will keep trying to fetch new messages for the partition.

* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).

Producer events

* `*kafka.Message` - delivery report for produced message. Check `.TopicPartition.Error` for delivery result.

Generic events for both Consumer and Producer

* `KafkaError` - client (error codes are prefixed with _) or broker error. These errors are normally just informational since the client will try its best to automatically recover (eventually).

Hint: If your application registers a signal notification (signal.Notify) makes sure the signals channel is buffered to avoid possible complications with blocking Poll() calls.

Index ▾

Constants
func LibraryVersion() (int, string)
type AssignedPartitions
func (e AssignedPartitions) String() string
type BrokerMetadata
type ConfigMap
func (m ConfigMap) Set(kv string) error
func (m ConfigMap) SetKey(key string, value ConfigValue) error
type ConfigValue
type Consumer
func NewConsumer(conf *ConfigMap) (*Consumer, error)
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
func (c *Consumer) Close() (err error)
func (c *Consumer) Commit() ([]TopicPartition, error)
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
func (c *Consumer) Events() chan Event
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
func (c *Consumer) Poll(timeoutMs int) (event Event)
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
func (c *Consumer) String() string
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
func (c *Consumer) Unassign() (err error)
func (c *Consumer) Unsubscribe() (err error)
type Error
func (e Error) Code() ErrorCode
func (e Error) Error() string
func (e Error) String() string
type ErrorCode
func (c ErrorCode) String() string
type Event
type Handle
type Message
func (m *Message) String() string
type Metadata
type Offset
func NewOffset(offset interface{}) (Offset, error)
func OffsetTail(relativeOffset Offset) Offset
func (o Offset) Set(offset interface{}) error
func (o Offset) String() string
type OffsetsCommitted
func (o OffsetsCommitted) String() string
type PartitionEOF
func (p PartitionEOF) String() string
type PartitionMetadata
type Producer
func NewProducer(conf *ConfigMap) (*Producer, error)
func (p *Producer) Close()
func (p *Producer) Events() chan Event
func (p *Producer) Flush(timeoutMs int) int
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
func (p *Producer) Len() int
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
func (p *Producer) ProduceChannel() chan *Message
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
func (p *Producer) String() string
type RebalanceCb
type RevokedPartitions
func (e RevokedPartitions) String() string
type TimestampType
func (t TimestampType) String() string
type TopicMetadata
type TopicPartition
func (p TopicPartition) String() string

Package files

build_dynamic.go config.go consumer.go error.go event.go generated_errors.go handle.go kafka.go message.go metadata.go misc.go producer.go testhelpers.go

Constants

const (
    // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
    TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
    // TimestampCreateTime indicates timestamp set by producer (source time)
    TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
    // TimestampLogAppendTime indicates timestamp set set by broker (store time)
    TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
)
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)

Earliest offset (logical)

const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)

Latest offset (logical)

const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)

Invalid/unspecified offset

const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)

Use stored offset

const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)

Any partition (for partitioning), or unspecified value (for all other cases)

func LibraryVersion

func LibraryVersion() (int, string)

LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.

type AssignedPartitions

type AssignedPartitions struct {
    Partitions []TopicPartition
}

AssignedPartitions consumer group rebalance event: assigned partition set

func (AssignedPartitions) String

func (e AssignedPartitions) String() string

type BrokerMetadata

type BrokerMetadata struct {
    ID   int32
    Host string
    Port int
}

BrokerMetadata contains per-broker metadata

type ConfigMap

type ConfigMap map[string]ConfigValue

ConfigMap is a map contaning standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md

The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.

func (ConfigMap) Set

func (m ConfigMap) Set(kv string) error

Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.

func (ConfigMap) SetKey

func (m ConfigMap) SetKey(key string, value ConfigValue) error

SetKey sets configuration property key to value. For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map.

type ConfigValue

type ConfigValue interface{}

ConfigValue supports the following types:

bool, int, string, any type with the standard String() interface

type Consumer

type Consumer struct {
    // contains filtered or unexported fields
}

Consumer implements a High-level Apache Kafka Consumer instance

func NewConsumer

func NewConsumer(conf *ConfigMap) (*Consumer, error)

NewConsumer creates a new high-level Consumer instance.

Supported special configuration properties:

go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
                                     If set to true the app must handle the AssignedPartitions and
                                     RevokedPartitions events and call Assign() and Unassign()
                                     respectively.
go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
go.events.channel.size (int, 1000) - Events() channel size

WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.

func (*Consumer) Assign

func (c *Consumer) Assign(partitions []TopicPartition) (err error)

Assign an atomic set of partitions to consume. This replaces the current assignment.

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close Consumer instance. The object is no longer usable after this call.

func (*Consumer) Commit

func (c *Consumer) Commit() ([]TopicPartition, error)

Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitMessage

func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)

CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)

CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.

func (*Consumer) Events

func (c *Consumer) Events() chan Event

Events returns the Events channel (if enabled)

func (*Consumer) GetMetadata

func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event Event)

Poll the consumer for messages or events.

Will block for at most timeoutMs milliseconds

The following callbacks may be triggered:

Subscribe()'s rebalanceCb

Returns nil on timeout, else an Event

func (*Consumer) QueryWatermarkOffsets

func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.

func (*Consumer) String

func (c *Consumer) String() string

Strings returns a human readable name for a Consumer instance

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

Subscribe to a single topic This replaces the current subscription

func (*Consumer) SubscribeTopics

func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)

SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.

func (*Consumer) Unassign

func (c *Consumer) Unassign() (err error)

Unassign the current set of partitions to consume.

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe() (err error)

Unsubscribe from the current subscription, if any.

type Error

type Error struct {
    // contains filtered or unexported fields
}

Error provides a Kafka-specific error container

func (Error) Code

func (e Error) Code() ErrorCode

Code returns the ErrorCode of an Error

func (Error) Error

func (e Error) Error() string

Error returns a human readable representation of an Error Same as Error.String()

func (Error) String

func (e Error) String() string

String returns a human readable representation of an Error

type ErrorCode

type ErrorCode int

ErrorCode is the integer representation of local and broker error codes

const (
    // ErrBadMsg Local: Bad message format
    ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG)
    // ErrBadCompression Local: Invalid compressed data
    ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION)
    // ErrDestroy Local: Broker handle destroyed
    ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY)
    // ErrFail Local: Communication failure with broker
    ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL)
    // ErrTransport Local: Broker transport failure
    ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT)
    // ErrCritSysResource Local: Critical system resource failure
    ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE)
    // ErrResolve Local: Host resolution failure
    ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE)
    // ErrMsgTimedOut Local: Message timed out
    ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT)
    // ErrPartitionEOF Broker: No more messages
    ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF)
    // ErrUnknownPartition Local: Unknown partition
    ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
    // ErrFs Local: File or filesystem error
    ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS)
    // ErrUnknownTopic Local: Unknown topic
    ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
    // ErrAllBrokersDown Local: All broker connections are down
    ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)
    // ErrInvalidArg Local: Invalid argument or configuration
    ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG)
    // ErrTimedOut Local: Timed out
    ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
    // ErrQueueFull Local: Queue full
    ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL)
    // ErrIsrInsuff Local: ISR count insufficient
    ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF)
    // ErrNodeUpdate Local: Broker node update
    ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE)
    // ErrSsl Local: SSL error
    ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL)
    // ErrWaitCoord Local: Waiting for coordinator
    ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD)
    // ErrUnknownGroup Local: Unknown group
    ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP)
    // ErrInProgress Local: Operation in progress
    ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS)
    // ErrPrevInProgress Local: Previous operation in progress
    ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS)
    // ErrExistingSubscription Local: Existing subscription
    ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION)
    // ErrAssignPartitions Local: Assign partitions
    ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
    // ErrRevokePartitions Local: Revoke partitions
    ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
    // ErrConflict Local: Conflicting use
    ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT)
    // ErrState Local: Erroneous state
    ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE)
    // ErrUnknownProtocol Local: Unknown protocol
    ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL)
    // ErrNotImplemented Local: Not implemented
    ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED)
    // ErrAuthentication Local: Authentication failure
    ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION)
    // ErrNoOffset Local: No offset stored
    ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET)
    // ErrOutdated Local: Outdated
    ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED)
    // ErrTimedOutQueue Local: Timed out in queue
    ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
    // ErrUnknown Unknown broker error
    ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
    // ErrNoError Success
    ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR)
    // ErrOffsetOutOfRange Broker: Offset out of range
    ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE)
    // ErrInvalidMsg Broker: Invalid message
    ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG)
    // ErrUnknownTopicOrPart Broker: Unknown topic or partition
    ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
    // ErrInvalidMsgSize Broker: Invalid message size
    ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE)
    // ErrLeaderNotAvailable Broker: Leader not available
    ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
    // ErrNotLeaderForPartition Broker: Not leader for partition
    ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION)
    // ErrRequestTimedOut Broker: Request timed out
    ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)
    // ErrBrokerNotAvailable Broker: Broker not available
    ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE)
    // ErrReplicaNotAvailable Broker: Replica not available
    ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE)
    // ErrMsgSizeTooLarge Broker: Message size too large
    ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
    // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode
    ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH)
    // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large
    ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE)
    // ErrNetworkException Broker: Broker disconnected before response received
    ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION)
    // ErrGroupLoadInProgress Broker: Group coordinator load in progress
    ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS)
    // ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available
    ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
    // ErrNotCoordinatorForGroup Broker: Not coordinator for group
    ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP)
    // ErrTopicException Broker: Invalid topic
    ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION)
    // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size
    ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE)
    // ErrNotEnoughReplicas Broker: Not enough in-sync replicas
    ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS)
    // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas
    ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND)
    // ErrInvalidRequiredAcks Broker: Invalid required acks value
    ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS)
    // ErrIllegalGeneration Broker: Specified group generation id is not valid
    ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
    // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol
    ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL)
    // ErrInvalidGroupID Broker: Invalid group.id
    ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID)
    // ErrUnknownMemberID Broker: Unknown member
    ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
    // ErrInvalidSessionTimeout Broker: Invalid session timeout
    ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT)
    // ErrRebalanceInProgress Broker: Group rebalance in progress
    ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS)
    // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid
    ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE)
    // ErrTopicAuthorizationFailed Broker: Topic authorization failed
    ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
    // ErrGroupAuthorizationFailed Broker: Group authorization failed
    ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED)
    // ErrClusterAuthorizationFailed Broker: Cluster authorization failed
    ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED)
    // ErrInvalidTimestamp Broker: Invalid timestamp
    ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP)
    // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism
    ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM)
    // ErrIllegalSaslState Broker: Request not valid in current SASL state
    ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE)
    // ErrUnsupportedVersion Broker: API version not supported
    ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)
)

func (ErrorCode) String

func (c ErrorCode) String() string

String returns a human readable representation of an error code

type Event

type Event interface {
    // String returns a human-readable representation of the event
    String() string
}

Event generic interface

type Handle

type Handle interface {
    // contains filtered or unexported methods
}

Handle represents a generic client handle containing common parts for both Producer and Consumer.

type Message

type Message struct {
    TopicPartition TopicPartition
    Value          []byte
    Key            []byte
    Timestamp      time.Time
    TimestampType  TimestampType
    Opaque         interface{}
}

Message represents a Kafka message

func (*Message) String

func (m *Message) String() string

String returns a human readable representation of a Message. Key and payload are not represented.

type Metadata

type Metadata struct {
    Brokers []BrokerMetadata
    Topics  map[string]TopicMetadata

    OriginatingBroker BrokerMetadata
}

Metadata contains broker and topic metadata for all (matching) topics

type Offset

type Offset int64

Offset type (int64) with support for canonical names

func NewOffset

func NewOffset(offset interface{}) (Offset, error)

NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"

func OffsetTail

func OffsetTail(relativeOffset Offset) Offset

OffsetTail returns the logical offset relativeOffset from current end of partition

func (Offset) Set

func (o Offset) Set(offset interface{}) error

Set offset value, see NewOffset()

func (Offset) String

func (o Offset) String() string

type OffsetsCommitted

type OffsetsCommitted struct {
    Error   error
    Offsets []TopicPartition
}

OffsetsCommitted reports committed offsets

func (OffsetsCommitted) String

func (o OffsetsCommitted) String() string

type PartitionEOF

type PartitionEOF TopicPartition

PartitionEOF consumer reached end of partition

func (PartitionEOF) String

func (p PartitionEOF) String() string

type PartitionMetadata

type PartitionMetadata struct {
    ID       int32
    Error    Error
    Leader   int32
    Replicas []int32
    Isrs     []int32
}

PartitionMetadata contains per-partition metadata

type Producer

type Producer struct {
    // contains filtered or unexported fields
}

Producer implements a High-level Apache Kafka Producer instance

func NewProducer

func NewProducer(conf *ConfigMap) (*Producer, error)

NewProducer creates a new high-level Producer instance.

conf is a *ConfigMap with standard librdkafka configuration properties, see here:

Supported special configuration properties:

go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance).
                                  These batches do not relate to Kafka message batches in any way.
go.delivery.reports (bool, true) - Forward per-message delivery reports to the
                                   Events() channel.
go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)

func (*Producer) Close

func (p *Producer) Close()

Close a Producer instance. The Producer object or its channels are no longer usable after this call.

func (*Producer) Events

func (p *Producer) Events() chan Event

Events returns the Events channel (read)

func (*Producer) Flush

func (p *Producer) Flush(timeoutMs int) int

Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.

func (*Producer) GetMetadata

func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.

func (*Producer) Len

func (p *Producer) Len() int

Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. Includes messages on ProduceChannel.

func (*Producer) Produce

func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error

Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, or on the Producer object's Events() channel if not. Returns an error if message could not be enqueued.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *Message

ProduceChannel returns the produce *Message channel (write)

func (*Producer) QueryWatermarkOffsets

func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.

func (*Producer) String

func (p *Producer) String() string

String returns a human readable name for a Producer instance

type RebalanceCb

type RebalanceCb func(*Consumer, Event) error

RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions

type RevokedPartitions

type RevokedPartitions struct {
    Partitions []TopicPartition
}

RevokedPartitions consumer group rebalance event: revoked partition set

func (RevokedPartitions) String

func (e RevokedPartitions) String() string

type TimestampType

type TimestampType int

TimestampType is a the Message timestamp type or source

func (TimestampType) String

func (t TimestampType) String() string

type TopicMetadata

type TopicMetadata struct {
    Topic      string
    Partitions []PartitionMetadata
    Error      Error
}

TopicMetadata contains per-topic metadata

type TopicPartition

type TopicPartition struct {
    Topic     *string
    Partition int32
    Offset    Offset
    Error     error
}

TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.

func (TopicPartition) String

func (p TopicPartition) String() string