.. _connect_userguide: User Guide ========== Kafka Connect is a tool for copying data between Kafka and a variety of other systems, ranging from relational databases, to logs and metrics, to Hadoop and data warehouses, to NoSQL data stores, to search indexes, and more. Kafka Connect makes it easy to integrate all your data via Kafka, making it `available as realtime streams `_. For example, you can use Kafka Connect to: * Stream changes from a relational database to make events available with low latency for stream processing applications. * Import realtime logs and metrics into Kafka and process them to detect anomalies. * Import data from a primary data store and data stores and feed a collection of secondary indexes such as search and caches. * Design an ETL pipeline by loading data into Kafka from your primary data storage systems, performing filtering, transformations, and enrichment with a stream processing framework, and deliver the prepared data to HDFS or your data warehouse. As an additional benefit, all your data is made available for stream processing by application developers. Getting Started --------------- The :ref:`quickstart ` describes how to get started in standalone mode. It demonstrates an end-to-end job, importing data from one "system" (the filesystem) into Kafka, then exporting that same data from the Kafka topic to another "system" the console. Each of the following sections will cover the following in more detail: * Kafka Connect concepts and running in standalone and distributed modes * Configuring connectors * Configuring workers * Interacting with Kafka Connect via the REST API Connectors, Tasks, and Workers ------------------------------ Kafka Connect's core concept that users interact with is a **connector**. Connectors (or a **connector instance**) is a logical job that is responsible for managing the copying of data between Kafka and another system. Each connector can instantiate a set of **tasks** that actually copy the data. By allowing the connector to break a single job into many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with very little configuration. **Connector plugins** are jars that add the classes that implement a connector. Sometimes we refer to both connector instances and connector plugins as simply "connectors", but it should always be clear from the context which is being referred to (e.g., "install a connector" refers to the plugin, and "check the status of a connector" refers to a connector instance). .. figure:: images/connector-model.png :align: center Logical view of an input stream, a source connector, the tasks it creates, and the data flow from the source system into Kafka. Offsets ~~~~~~~ As connectors run, Kafka Connect tracks **offsets** for each one so that connectors can resume from their previous position in the event of failures or graceful restarts for maintenance. These offsets are similar to Kafka's offsets in that they track the current position in the stream of data being copied and because each connector may need to track many offsets for different **partitions** of the stream. However, they are different because the format of the offset is defined by the system data is being loaded from and therefore may not simply be a ``long`` as they are for Kafka topics. For example, when loading data from a database, the offset might be a transaction ID that identifies a position in the database changelog. Users generally do not need to worry about the format of offsets, especially since they differ from connector to connector. However, Kafka Connect does require persistent storage for offset data to ensure it can recover from faults, and users will need to configure this storage. These settings, which depend on the way you decide to run Kafka Connect, are discussed in the next section. Workers ~~~~~~~ Connectors and tasks are logical units of work and must be scheduled to execute in a process. Kafka Connect calls these processes **workers** and has to types of workers: standalone and distributed. Standalone Mode +++++++++++++++ Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks. Since it is a single process, it requires minimal configuration. To execute a worker in standalone mode, run the following command:: bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...] The first parameter is always a configuration file for the worker. Described below in the section on configuring workers, this configuration gives you control over settings such as the Kafka cluster to use and the serialization format. All additional parameters should be connector configuration files. Each file contains a single connector configuration. If you run multiple standalone instances on the same host, there are a couple of settings that must differ between each instance: * ``offset.storage.file.filename`` - storage for connector offsets, which are stored on the local filesystem in standalone mode; using the same file will lead to offset data being deleted or overwritten with different values * ``rest.port`` - the port the REST interface listens on for HTTP requests Distributed Mode ++++++++++++++++ Standalone mode is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host. However, because there is only a single process, it also has more limited functionality: scalability is limited to the single process and there is no fault tolerance beyond any monitoring you add to the single process. Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same ``group.id`` and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers. .. figure:: images/worker-model-basics.png :align: center A three-node Kafka Connect distributed mode cluster. Connectors (monitoring the source or sink system for changes that require reconfiguring tasks) and tasks (copying a subset of a connector's data) are automatically balanced across the active workers. The division of work between tasks is shown by the partitions that each task is assigned. Unlike standalone mode where connector configurations are generally passed in via the command line, interaction with a distributed-mode cluster is via the REST API. To create a connector, you start the workers and then make a REST request to create a connector (see the REST API section for details). Unlike many other systems, Kafka Connect workers do not have a special "leader" process that you have to interact with to use the REST API; all nodes can respond to REST requests, including creating, listing, modifying, and destroying connectors. In distributed mode, the workers need to be able to discover each other and have shared storage for connector configuration and offset data. In addition to the usual worker settings, you'll want to ensure you've configured the following for this cluster: * ``group.id`` - an ID that uniquely identifies the Kafka Connect cluster these workers belong to. Ensure this is unique for all groups that work with a Kafka cluster. * ``config.storage.topic`` - the Kafka topic to store connector and task configuration state in. Although this topic can be auto-created if your cluster has auto topic creation enabled, it is highly recommended that you create it before starting the Kafka Connect cluster. This topic should **always** have a single partition and be highly replicated (3x or more). * ``offset.storage.topic`` - the Kafka topic to store connector offset state in. Although this topic can be auto-created if your cluster has auto topic creation enabled, it is highly recommended that you create it before starting the Kafka Connect cluster. To support large Kafka Connect clusters, this topic should have a large number of partitions (e.g. 25 or 50, just like Kafka's built-in ``__offsets`` topic) and highly replicated (3x or more). To start a distributed worker, create a worker configuration just as you would with standalone mode, but also setting the above config options. Then, simply start the worker process with that configuration:: bin/connect-distributed worker.properties Distributed mode does not have any additional command line parameters. New workers will either start a new group or join an existing one, if other instances are already running, and then wait for work to do. You can use the REST API to manage the connectors running in the cluster. See the REST API section for more details, but here is a simple example that creates a connector that reads from `stdin` and produces each line as a message to the topic `connect-test`:: curl -X POST -H "Content-Type: application/json" --data '{"name": "local-console-source", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max":"1", "topic":"connect-test" }}' http://localhost:8083/connectors # Or, to use a file containing the JSON-formatted configuration # curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors In distributed mode, if you run more than one worker per host (for example, if you are testing distributed mode locally during development), the following settings must have different values for each instance: * ``rest.port`` - the port the REST interface listens on for HTTP requests Configuring Connectors ---------------------- Connector configurations are key-value mappings. For standalone mode these are defined in a properties file and passed to the Connect process on the command line. In distributed mode, they will be included in the JSON payload for the request that creates (or modifies) the connector. Most configurations are connector dependent, but there are a few settings common to all connectors: * ``name`` - Unique name for the connector. Attempting to register again with the same name will fail. * ``connector.class`` - The Java class for the connector * ``tasks.max`` - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism. Sink connectors also have one additional option to control their input: * ``topics`` - A list of topics to use as input for this connector For other options, consult the documentation for :ref:`individual connectors `. Installing Connector Plugins ---------------------------- Confluent Platform ships with commonly used connector plugins that have been tested with the rest of the platform. However, Kafka Connect is designed to be extensible so that it is easy for other developers to create new connectors and for users to run them with minimal effort. All that is required to install a new plugin is to place it in the ``CLASSPATH`` of the Kafka Connect process. All the scripts for running Kafka Connect will use the ``CLASSPATH`` environment variable if it is set when they are invoked, making it easy to run with additional connector plugins:: export CLASSPATH=/path/to/my/connectors/* bin/connect-standalone standalone.properties new-custom-connector.properties Configuring Workers ------------------- Whether you're running standalone or distributed mode, Kafka Connect workers are configured by passing a properties file containing any required or overridden options as the first parameter to the worker process. Some example configuration files are included with the Confluent Platform to help you get started. We recommend using the files ``etc/schema-registry/connect-avro-[standalone|distributed].properties`` as a starting point because they include the necessary configuration to use Confluent Platform's Avro converters that integrate with the Schema Registry. They are configured to work well with Kafka and Schema Registry services running locally, making it easy to test Kafka Connect locally; using them with production deployments should only require adjusting the hostnames for Kafka and Schema Registry. An exhaustive listing of options is provided below, broken into several sections. The first lists options that can be set in either standalone or distributed mode. These control basic functionality like which Kafka cluster to communicate with and what format data you're working with. The next two sections list settings specific to standalone or distributed mode. Finally, although it should not usually be required, the final section describes how to override the settings for the producers and consumers that are created internally by Kafka Connect. Common Worker Configs ~~~~~~~~~~~~~~~~~~~~~ ``bootstrap.servers`` A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). * Type: list * Default: [localhost:9092] * Importance: high ``key.converter`` Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``value.converter`` Converter class for value Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON. * Type: class * Default: * Importance: high ``internal.key.converter`` Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs. * Type: class * Default: * Importance: low ``internal.value.converter`` Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs. * Type: class * Default: * Importance: low ``offset.flush.interval.ms`` Interval at which to try committing offsets for tasks. * Type: long * Default: 60000 * Importance: low ``offset.flush.timeout.ms`` Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. * Type: long * Default: 5000 * Importance: low ``rest.advertised.host.name`` If this is set, this is the hostname that will be given out to other workers to connect to. * Type: string * Importance: low ``rest.advertised.port`` If this is set, this is the port that will be given out to other workers to connect to. * Type: int * Importance: low ``rest.host.name`` Hostname for the REST API. If this is set, it will only bind to this interface. * Type: string * Importance: low ``rest.port`` Port for the REST API to listen on. * Type: int * Default: 8083 * Importance: low ``task.shutdown.graceful.timeout.ms`` Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially. * Type: long * Default: 5000 * Importance: low Standalone Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In addition to ``offset.storage.file.filename`` The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off. * Type: string * Default: "" * Importance: high Distributed Worker Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In addition to the common worker configuration options, the following are available in distributed mode. ``group.id`` A unique string that identifies the Connect cluster group this worker belongs to. * Type: string * Default: "" * Importance: high ``config.storage.topic`` The topic to store connector and task configuration data in. This *must* be the same for all workers with the same ``group.id`` * Type: string * Default: "" * Importance: high ``offset.storage.topic`` The topic to store offset data for connectors in. This *must* be the same for all workers with the same ``group.id`` * Type: string * Default: "" * Importance: high ``heartbeat.interval.ms`` The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. * Type: int * Default: 3000 * Importance: high ``session.timeout.ms`` The timeout used to detect failures when using Kafka's group management facilities. * Type: int * Default: 30000 * Importance: high ``ssl.key.password`` The password of the private key in the key store file. This is optional for client. * Type: password * Importance: high ``ssl.keystore.location`` The location of the key store file. This is optional for client and can be used for two-way authentication for client. * Type: string * Importance: high ``ssl.keystore.password`` The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. * Type: password * Importance: high ``ssl.truststore.location`` The location of the trust store file. * Type: string * Importance: high ``ssl.truststore.password`` The password for the trust store file. * Type: password * Importance: high ``connections.max.idle.ms`` Close idle connections after the number of milliseconds specified by this config. * Type: long * Default: 540000 * Importance: medium ``receive.buffer.bytes`` The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. * Type: int * Default: 32768 * Importance: medium ``request.timeout.ms`` The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. * Type: int * Default: 40000 * Importance: medium ``sasl.kerberos.service.name`` The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. * Type: string * Importance: medium ``security.protocol`` Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported. * Type: string * Default: "PLAINTEXT" * Importance: medium ``send.buffer.bytes`` The size of the TCP send buffer (SO_SNDBUF) to use when sending data. * Type: int * Default: 131072 * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default. * Type: list * Default: [TLSv1.2, TLSv1.1, TLSv1] * Importance: medium ``ssl.keystore.type`` The file format of the key store file. This is optional for client. Default value is JKS * Type: string * Default: "JKS" * Importance: medium ``ssl.protocol`` The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. * Type: string * Default: "TLS" * Importance: medium ``ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Importance: medium ``ssl.truststore.type`` The file format of the trust store file. Default value is JKS. * Type: string * Default: "JKS" * Importance: medium ``worker.sync.timeout.ms`` When the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining. * Type: int * Default: 3000 * Importance: medium ``worker.unsync.backoff.ms`` When the worker is out of sync with other workers and fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining. * Type: int * Default: 300000 * Importance: medium ``client.id`` An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. * Type: string * Default: "" * Importance: low ``metadata.max.age.ms`` The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. * Type: long * Default: 300000 * Importance: low ``metric.reporters`` A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. * Type: list * Default: [] * Importance: low ``metrics.num.samples`` The number of samples maintained to compute metrics. * Type: int * Default: 2 * Importance: low ``metrics.sample.window.ms`` The number of samples maintained to compute metrics. * Type: long * Default: 30000 * Importance: low ``reconnect.backoff.ms`` The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. * Type: long * Default: 50 * Importance: low ``retry.backoff.ms`` The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop. * Type: long * Default: 100 * Importance: low ``sasl.kerberos.kinit.cmd`` Kerberos kinit command path. Default is /usr/bin/kinit * Type: string * Default: "/usr/bin/kinit" * Importance: low ``sasl.kerberos.min.time.before.relogin`` Login thread sleep time between refresh attempts. * Type: long * Default: 60000 * Importance: low ``sasl.kerberos.ticket.renew.jitter`` Percentage of random jitter added to the renewal time. * Type: double * Default: 0.05 * Importance: low ``sasl.kerberos.ticket.renew.window.factor`` Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. * Type: double * Default: 0.8 * Importance: low ``ssl.cipher.suites`` A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported. * Type: list * Importance: low ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm to validate server hostname using server certificate. * Type: string * Importance: low ``ssl.keymanager.algorithm`` The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: "SunX509" * Importance: low ``ssl.trustmanager.algorithm`` The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. * Type: string * Default: "PKIX" * Importance: low Overriding Producer & Consumer Settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internally, Kafka Connect uses the standard Java producer and consumers to communicate with Kafka. Kafka Connect configures these producer and consumer instances with good defaults. Most importantly, it is configured with important settings that ensure data from sources will be delivered to Kafka in order and without any loss. The most critical site-specific options, such as the Kafka bootstrap servers, are already exposed via the standard worker configuration. Occasionally, you may have an application that needs to adjust the default settings. One example is a standalone process that runs a log file connector. For the logs being collected, you might prefer low-latency, best-effort delivery -- minor loss in the case of connectivity issues might be acceptable for this application in order to avoid any data buffering on the client, keeping the log collection as lean as possible. All `new producer configs `_ and `new consumer configs `_ can be overridden by prefixing them with ``producer.`` or ``consumer.``, respectively. For example:: producer.retries=1 consumer.max.partition.fetch.bytes=10485760 would override the producers to only retry sending messages once and increase the default amount of data fetched from a partition per request to 10 MB. Note that these configuration changes are applied to *all* connectors running on the worker. You should be especially careful making any changes to these settings when running distributed mode workers. REST Interface -------------- Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port ``8083``. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required. Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more. Currently the only top-level resource is ``Connector``, with sub-resources for listing configuration settings and tasks. Content Types ~~~~~~~~~~~~~ Currently the REST API only supports ``application/json`` as both the request and response entity content type. Your requests *should* specify the expected content type of the response via the HTTP ``Accept`` header:: Accept: application/json and *should* specify the content type of the request entity (if one is included) via the ``Content-Type`` header:: Content-Type: application/json Statuses & Errors ~~~~~~~~~~~~~~~~~ The REST API will return standards-compliant HTTP statuses. Clients *should* check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them. When possible, all endpoints will use a standard error message format for all errors (status codes in the 400 or 500 range). For example, a request entity that omits a required field may generate the following response: .. sourcecode:: http HTTP/1.1 422 Unprocessable Entity Content-Type: application/json { "error_code": 422, "message": "config may not be empty" } Connectors ~~~~~~~~~~ .. http:get:: /connectors Get a list of active connectors :>json array connectors: List of connector names **Example request**: .. sourcecode:: http GET /connectors HTTP/1.1 Host: connect.example.com Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json ["my-jdbc-source", "my-hdfs-sink"] .. http:post:: /connectors Create a new connector, returning the current connector info if successful. :>json string name: Name of the connector to create :>json map config: Configuration parameters for the connector. All values should be strings. :json map config: Configuration parameters for the connector. All values should be strings. :