const ( // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) // TimestampCreateTime indicates timestamp set by producer (source time) TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME) // TimestampLogAppendTime indicates timestamp set set by broker (store time) TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) )
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
Earliest offset (logical)
const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
Latest offset (logical)
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
Invalid/unspecified offset
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
Use stored offset
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
Any partition (for partitioning), or unspecified value (for all other cases)
func LibraryVersion() (int, string)
LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.
type AssignedPartitions struct { Partitions []TopicPartition }
AssignedPartitions consumer group rebalance event: assigned partition set
func (e AssignedPartitions) String() string
type BrokerMetadata struct { ID int32 Host string Port int }
BrokerMetadata contains per-broker metadata
type ConfigMap map[string]ConfigValue
ConfigMap is a map contaning standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.
func (m ConfigMap) Set(kv string) error
Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.
func (m ConfigMap) SetKey(key string, value ConfigValue) error
SetKey sets configuration property key to value. For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map.
type ConfigValue interface{}
ConfigValue supports the following types:
bool, int, string, any type with the standard String() interface
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements a High-level Apache Kafka Consumer instance
func NewConsumer(conf *ConfigMap) (*Consumer, error)
NewConsumer creates a new high-level Consumer instance.
Supported special configuration properties:
go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. If set to true the app must handle the AssignedPartitions and RevokedPartitions events and call Assign() and Unassign() respectively. go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental) go.events.channel.size (int, 1000) - Events() channel size
WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
Assign an atomic set of partitions to consume. This replaces the current assignment.
func (c *Consumer) Close() (err error)
Close Consumer instance. The object is no longer usable after this call.
func (c *Consumer) Commit() ([]TopicPartition, error)
Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.
func (c *Consumer) Events() chan Event
Events returns the Events channel (if enabled)
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.
func (c *Consumer) Poll(timeoutMs int) (event Event)
Poll the consumer for messages or events.
The following callbacks may be triggered:
Subscribe()'s rebalanceCb
Returns nil on timeout, else an Event
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.
func (c *Consumer) String() string
Strings returns a human readable name for a Consumer instance
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
Subscribe to a single topic This replaces the current subscription
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.
func (c *Consumer) Unassign() (err error)
Unassign the current set of partitions to consume.
func (c *Consumer) Unsubscribe() (err error)
Unsubscribe from the current subscription, if any.
type Error struct {
// contains filtered or unexported fields
}
Error provides a Kafka-specific error container
func (e Error) Code() ErrorCode
Code returns the ErrorCode of an Error
func (e Error) Error() string
Error returns a human readable representation of an Error Same as Error.String()
func (e Error) String() string
String returns a human readable representation of an Error
type ErrorCode int
ErrorCode is the integer representation of local and broker error codes
const ( // ErrBadMsg Local: Bad message format ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG) // ErrBadCompression Local: Invalid compressed data ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION) // ErrDestroy Local: Broker handle destroyed ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY) // ErrFail Local: Communication failure with broker ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL) // ErrTransport Local: Broker transport failure ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT) // ErrCritSysResource Local: Critical system resource failure ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE) // ErrResolve Local: Host resolution failure ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE) // ErrMsgTimedOut Local: Message timed out ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) // ErrPartitionEOF Broker: No more messages ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF) // ErrUnknownPartition Local: Unknown partition ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) // ErrFs Local: File or filesystem error ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS) // ErrUnknownTopic Local: Unknown topic ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) // ErrAllBrokersDown Local: All broker connections are down ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) // ErrInvalidArg Local: Invalid argument or configuration ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG) // ErrTimedOut Local: Timed out ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT) // ErrQueueFull Local: Queue full ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL) // ErrIsrInsuff Local: ISR count insufficient ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF) // ErrNodeUpdate Local: Broker node update ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE) // ErrSsl Local: SSL error ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL) // ErrWaitCoord Local: Waiting for coordinator ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD) // ErrUnknownGroup Local: Unknown group ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP) // ErrInProgress Local: Operation in progress ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS) // ErrPrevInProgress Local: Previous operation in progress ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS) // ErrExistingSubscription Local: Existing subscription ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION) // ErrAssignPartitions Local: Assign partitions ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) // ErrRevokePartitions Local: Revoke partitions ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) // ErrConflict Local: Conflicting use ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT) // ErrState Local: Erroneous state ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE) // ErrUnknownProtocol Local: Unknown protocol ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL) // ErrNotImplemented Local: Not implemented ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) // ErrAuthentication Local: Authentication failure ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION) // ErrNoOffset Local: No offset stored ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET) // ErrOutdated Local: Outdated ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED) // ErrTimedOutQueue Local: Timed out in queue ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) // ErrUnknown Unknown broker error ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN) // ErrNoError Success ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR) // ErrOffsetOutOfRange Broker: Offset out of range ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE) // ErrInvalidMsg Broker: Invalid message ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG) // ErrUnknownTopicOrPart Broker: Unknown topic or partition ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) // ErrInvalidMsgSize Broker: Invalid message size ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE) // ErrLeaderNotAvailable Broker: Leader not available ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) // ErrNotLeaderForPartition Broker: Not leader for partition ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) // ErrRequestTimedOut Broker: Request timed out ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT) // ErrBrokerNotAvailable Broker: Broker not available ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE) // ErrReplicaNotAvailable Broker: Replica not available ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE) // ErrMsgSizeTooLarge Broker: Message size too large ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH) // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE) // ErrNetworkException Broker: Broker disconnected before response received ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION) // ErrGroupLoadInProgress Broker: Group coordinator load in progress ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS) // ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) // ErrNotCoordinatorForGroup Broker: Not coordinator for group ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) // ErrTopicException Broker: Invalid topic ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION) // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE) // ErrNotEnoughReplicas Broker: Not enough in-sync replicas ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS) // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND) // ErrInvalidRequiredAcks Broker: Invalid required acks value ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS) // ErrIllegalGeneration Broker: Specified group generation id is not valid ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL) // ErrInvalidGroupID Broker: Invalid group.id ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID) // ErrUnknownMemberID Broker: Unknown member ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) // ErrInvalidSessionTimeout Broker: Invalid session timeout ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT) // ErrRebalanceInProgress Broker: Group rebalance in progress ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS) // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE) // ErrTopicAuthorizationFailed Broker: Topic authorization failed ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) // ErrGroupAuthorizationFailed Broker: Group authorization failed ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED) // ErrClusterAuthorizationFailed Broker: Cluster authorization failed ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED) // ErrInvalidTimestamp Broker: Invalid timestamp ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP) // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM) // ErrIllegalSaslState Broker: Request not valid in current SASL state ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE) // ErrUnsupportedVersion Broker: API version not supported ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION) )
func (c ErrorCode) String() string
String returns a human readable representation of an error code
type Event interface { // String returns a human-readable representation of the event String() string }
Event generic interface
type Handle interface {
// contains filtered or unexported methods
}
Handle represents a generic client handle containing common parts for both Producer and Consumer.
type Message struct { TopicPartition TopicPartition Value []byte Key []byte Timestamp time.Time TimestampType TimestampType Opaque interface{} }
Message represents a Kafka message
func (m *Message) String() string
String returns a human readable representation of a Message. Key and payload are not represented.
type Metadata struct { Brokers []BrokerMetadata Topics map[string]TopicMetadata OriginatingBroker BrokerMetadata }
Metadata contains broker and topic metadata for all (matching) topics
type Offset int64
Offset type (int64) with support for canonical names
func NewOffset(offset interface{}) (Offset, error)
NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
func OffsetTail(relativeOffset Offset) Offset
OffsetTail returns the logical offset relativeOffset from current end of partition
func (o Offset) Set(offset interface{}) error
Set offset value, see NewOffset()
func (o Offset) String() string
type OffsetsCommitted struct { Error error Offsets []TopicPartition }
OffsetsCommitted reports committed offsets
func (o OffsetsCommitted) String() string
type PartitionEOF TopicPartition
PartitionEOF consumer reached end of partition
func (p PartitionEOF) String() string
type PartitionMetadata struct { ID int32 Error Error Leader int32 Replicas []int32 Isrs []int32 }
PartitionMetadata contains per-partition metadata
type Producer struct {
// contains filtered or unexported fields
}
Producer implements a High-level Apache Kafka Producer instance
func NewProducer(conf *ConfigMap) (*Producer, error)
NewProducer creates a new high-level Producer instance.
conf is a *ConfigMap with standard librdkafka configuration properties, see here:
Supported special configuration properties:
go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance). These batches do not relate to Kafka message batches in any way. go.delivery.reports (bool, true) - Forward per-message delivery reports to the Events() channel. go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
func (p *Producer) Close()
Close a Producer instance. The Producer object or its channels are no longer usable after this call.
func (p *Producer) Events() chan Event
Events returns the Events channel (read)
func (p *Producer) Flush(timeoutMs int) int
Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.
func (p *Producer) Len() int
Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. Includes messages on ProduceChannel.
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, or on the Producer object's Events() channel if not. Returns an error if message could not be enqueued.
func (p *Producer) ProduceChannel() chan *Message
ProduceChannel returns the produce *Message channel (write)
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.
func (p *Producer) String() string
String returns a human readable name for a Producer instance
type RebalanceCb func(*Consumer, Event) error
RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions
type RevokedPartitions struct { Partitions []TopicPartition }
RevokedPartitions consumer group rebalance event: revoked partition set
func (e RevokedPartitions) String() string
type TimestampType int
TimestampType is a the Message timestamp type or source
func (t TimestampType) String() string
type TopicMetadata struct { Topic string Partitions []PartitionMetadata Error Error }
TopicMetadata contains per-topic metadata
type TopicPartition struct { Topic *string Partition int32 Offset Offset Error error }
TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
func (p TopicPartition) String() string