Installing and Configuring Kafka Connect

This section describes how you can install and configure a Kafka Connect instance.

Getting Started

The quickstart describes how to get started in standalone mode. It demonstrates an end-to-end job, importing data from one “system” (the filesystem) into Kafka, then exporting that same data from the Kafka topic to another “system” the console. This section is for users comfortable with the concepts and quickstart. Those interested configuring Kafka Connect with security should go here. This section covers the following:

  • Planning a Kafka Connect installation
  • Running workers in standalone and distributed modes
  • Installing connector plugins
  • Configuring workers

Planning for Installation

When getting going with Kafka Connect, there are a few considerations to be aware of to help your environment scale to the long term needs of your data pipeline. This section aims to provide some context around those decisions.

Prerequisites

Kafka Connect has only one hard prerequisite in order to get started: a set of Kafka brokers. However, as your cluster grows, there are a couple of items that are helpful to consider ahead of time:

  • Interal Topic Creation

As we will talk about in more detail it is important to create Kafka Connect’s required internal topics ahead of time with a high replication factor and sufficient number of partitions for the expected load. This helps avoid recalibrating these topics later on.

  • Schema Registry

Although the Schema Registry is not a required service for Kafka Connect, it enables you to easily use Avro as the common data format for all connectors. This keeps the need to write custom code at a minimum and standardizes your data in a flexible format. Additionally, you get the benefits of enforced compatibility rules for schema evolution.

Standalone vs. Distributed

As we discussed in the concepts section, workers can be run in two different modes. It is useful to identify which mode works best for your environment before getting started. For development or environments that lend themselves to single agents (e.g. sending logs from webservers to Kafka), standalone mode is well suited. In use cases where a single source or sink may require heavy data volumes (e.g. sending data from Kafka to HDFS), distributed mode is more flexible in terms of scalability and offers the added advantage of a highly available service to minimize downtime. In the end, the choice is up to the installer, but knowing what you are moving and where you are moving it with Kakfa Connect will help inform this decision. We recommend distributed mode for production deployments for ease of management and scalability.

Deployment Considerations

Kafka Connect workers can be deployed in a number of ways, each with their own benefits. Workers lend themselves well to being run in containers in managed environments such as YARN, Mesos, or Docker Swarm as all state is stored in Kafka, making the local processes themselves stateless. We provide Docker images and documentation for getting started with those images is here. By design, Kafka Connect does not automatically handle restarting or scaling workers which means your existing clustering solutions can continue to be used transparently.

Additionally, Kafka Connect workers are simply JVM processes and as such can be run on shared machines that have sufficient resources. The resource limit depends heavily on the types of connectors being run by the workers, but in most cases users should be aware of CPU and memory bounds when running workers concurrently on a single machine.

Hardware requirements for Kafka Connect workers are similar to that of the standard Java producers and consumers. For those deployments that are expected to send large messages, more memory will be required. Those that make heavy use of compression will require more powerful CPUs. We recommend starting with the default heap size setting and monitoring the JMX metrics and the system to be sure CPU, memory, and network (recommend 10GbE and up) are sufficient for load.

Installing Connector Plugins

Confluent Platform ships with commonly used connector plugins that have been tested with the rest of the platform. Additionally, Kafka Connect is designed to be extensible so that it is easy for other developers to create new connectors and for users to run them with minimal effort.

To install a new plugin place it in the CLASSPATH of the Kafka Connect process. All the scripts for running Kafka Connect will use the CLASSPATH environment variable if it is set when they are invoked, making it easy to run with additional connector plugins:

$ export CLASSPATH=/path/to/my/connectors/*
$ bin/connect-standalone standalone.properties new-custom-connector.properties

Running Workers

Standalone Mode

To execute a worker in standalone mode, run the following command:

$ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

The first parameter is always a configuration file for the worker as described below. This configuration gives you control over settings such as the Kafka cluster to use and the serialization format. All additional parameters are connector configuration files. Each file contains a single connector configuration.

If you run multiple standalone instances on the same host, there are a couple of settings that must be unique between each instance:

  • offset.storage.file.filename - storage for connector offsets, which are stored on the local filesystem in standalone mode; using the same file will lead to offset data being deleted or overwritten with different values
  • rest.port - the port the REST interface listens on for HTTP requests

Distributed Mode

To start a distributed worker, create a worker configuration just as you would with standalone mode, but the options for distributed workers specified. We recommend creating the required Kafka topics manually as follows:

# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1

# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50

# status.storage.topic=connect-status
$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10

Then, simply start the worker process:

$ bin/connect-distributed worker.properties

Distributed mode does not have any additional command line parameters other than a worker configuration file. New workers will either start a new group or join an existing one based on the worker properties provided. Workers then coordinate similarly to consumer groups to distribute the work to be done. This is different from standalone mode where users may optionally provide connector configurations at the command line as only a single worker instance exists and no coordination is required in standalone mode. Use the REST API to manage the connectors when running in distributed mode as described here.

In distributed mode, if you run more than one worker per host (for example, if you are testing distributed mode locally during development), the following settings must have different values for each instance:

  • rest.port - the port the REST interface listens on for HTTP requests

Configuring Workers

Whether you’re running standalone or distributed mode, Kafka Connect workers are configured by passing a properties file containing any required or overridden options as the first parameter to the worker process. Some example configuration files are included with the Confluent Platform to help you get started. We recommend using the files etc/schema-registry/connect-avro-[standalone|distributed].properties as a starting point because they include the necessary configuration to use Confluent Platform’s Avro converters that integrate with the Schema Registry. They are configured to work well with Kafka and Schema Registry services running locally, making it easy to test Kafka Connect locally; using them with production deployments should only require adjusting the hostnames for Kafka and Schema Registry.

An exhaustive listing of options is provided in the References. This section calls out a few commonly updated configurations.

Common Worker Configs

bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

  • Type: list
  • Default: [localhost:9092]
  • Importance: high
key.converter

Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
value.converter

Converter class for value Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.

  • Type: class
  • Default:
  • Importance: high
internal.key.converter

Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs.

  • Type: class
  • Default:
  • Importance: low
internal.value.converter

Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs.

  • Type: class
  • Default:
  • Importance: low
rest.host.name

Hostname for the REST API. If this is set, it will only bind to this interface.

  • Type: string
  • Importance: low
rest.port

Port for the REST API to listen on.

  • Type: int
  • Default: 8083
  • Importance: low

Standalone Worker Configuration

In addition to the common worker configuration options, the following are available in standalone mode.

offset.storage.file.filename

The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.

  • Type: string
  • Default: “”
  • Importance: high

Distributed Worker Configuration

In distributed mode, the workers need to be able to discover each other and share both connector configurations and offset data. This requires workers to have matching values for group.id to form a Connect cluster group and have access to three Kafka topics which are defined by the properties config.storage.topic, offset.storage.topic, and status.storage.topic. These properties are specified in the properties file passed to the connect worker when started in distributed mode as shown here. Detailed descriptions of these properties and guidelines for creating the required topics are below.

group.id

A unique string that identifies the Connect cluster group this worker belongs to.

  • Type: string
  • Default: “”
  • Importance: high
config.storage.topic

The topic to store connector and task configuration data in. This must be the same for all workers with the same group.id. This topic should always have a single partition and be highly replicated (3x or more).

  • Type: string
  • Default: “”
  • Importance: high
offset.storage.topic

The topic to store offset data for connectors in. This must be the same for all workers with the same group.id. To support large Kafka Connect clusters, this topic should have a large number of partitions (e.g. 25 or 50, just like Kafka’s built-in __consumer_offsets topic) and highly replicated (3x or more).

  • Type: string
  • Default: “”
  • Importance: high
status.storage.topic

The Kafka topic to store status updates for connectors and tasks. This topic can have multiple partitions and should be highly replicated (3x or more).

  • Type: string
  • Default: “”
  • Importance: high

Configuring Converters

In going through the common worker configurations, you will have noticed the key.converter and value.converter properties where you specify a converters to use. Each converter implementation will have its own associated configuration requirements. To configure a converter specific property, you prepend the connect property where the converter has been specified to the converter property. This is an example worker property file snippet showing that the AvroConverter bundled with the Schema Registry requires the URL for the Schema Registry to be passed as a property:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

We recommend using the AvroConverter for your Kafka Connect data. Those with a need to use JSON for Connect data can use the JsonConverter bundled with Kafka. An example of using the JsonConverter without schemas for converting keys looks like:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false

Overriding Producer & Consumer Settings

Internally, Kafka Connect uses the standard Java producer and consumers to communicate with Kafka. Kafka Connect configures these producer and consumer instances with good defaults. Most importantly, it is configured with important settings that ensure data from sources will be delivered to Kafka in order and without any loss. The most critical site-specific options, such as the Kafka bootstrap servers, are already exposed via the standard worker configuration.

Occasionally, you may have an application that needs to adjust the default settings. One example is a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery and minor data loss in the case of connectivity issues might be acceptable for this application in order to avoid any data buffering on the client, keeping the log collection as lean as possible.

All new producer configs and new consumer configs can be overridden by prefixing them with producer. or consumer., respectively. For example:

producer.retries=1
consumer.max.partition.fetch.bytes=10485760

would override the producers to only retry sending messages once and increase the default amount of data fetched from a partition per request to 10 MB.

Note that these configuration changes are applied to all connectors running on the worker. You should be especially careful making any changes to these settings when running distributed mode workers.

Upgrading Kafka Connect Workers

Documentation for upgrading your Kafka Connect workers is found in the platform upgrade section. To upgrade individual connectors, please see our documentation on upgrading connector plugins.