Managing Connectors

This section describes how to manage connectors in an active Kafka Connect environment. You will learn how to:

  • Deploy connectors bundled with Confluent Platform
  • Configure connectors in standalone and distributed environments
  • Monitor and update running connectors with the REST API
  • Install community supported connectors
  • Upgrade a connector

Using Bundled Connectors

The Confluent Platform bundles several connectors that should be used for moving data in and out of commonly used systems such as databases, HDFS, and Elasticsearch. Getting set up to use these connectors is as simple as installing the Confluent Platform. The connector plugins are already added to the CLASSPATH, so no additional installation steps are required. Simply review this section to learn how to manage connectors in general and the documentation specific to each relevant bundled connector to get started moving data.

Configuring Connectors

Connector configurations are key-value mappings. For standalone mode these are defined in a properties file and passed to the Connect process on the command line. In distributed mode, they will be included in the JSON payload sent over the REST API for the request that creates (or modifies) the connector. When choosing to deploy in standalone or distributed mode, it is useful to review the recommendations here. Both standalone and distributed connectors can be configured by using the Confluent Control Center which gives a graphical interface to update connector configurations. Most configurations are connector dependent, but there are a few settings common to all connectors:

  • name - Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class - The Java class for the connector
  • tasks.max - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • key.converter - (optional) Override the default key converter class set by the worker.
  • value.converter - (optional) Override the default value converter class set by the worker.

To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.

Note that these parameters are not used unless the corresponding converter configuration is specified in the connector configuration.

Sink connectors also have one additional option to control their input:

  • topics - A list of topics to use as input for this connector

For other options, consult the documentation for individual connectors.

Standalone Example

Below is an example of a standalone connector configuration for the bundled FileSinkConnector. Note the common configurations name, connector.class, tasks.max, and topics, and one FileStreamSinkConnector specific configuration file is specified. The file containing this configuration should be added as shown here.

name=local-file-sink
connector.class=FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=connect-test

Distributed Example

You can use the REST API to manage the connectors running on workers in distributed mode. Here is a simple example that creates a FileSinkConnector as in the standalone example. Note the URL is pointing to localhost indicating that the connect worker has been started on the same host where the curl command is run. Instructions for starting a worker in distributed mode are here.

$ curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
# Or, to use a file containing the JSON-formatted configuration
# curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors

To create a connector, you start the workers and then make a REST request to create a connector as above. Unlike many other systems, all nodes in Kafka Connect can respond to REST requests, including creating, listing, modifying, and destroying connectors (see the REST API section for details).

Managing Running Connectors

Kafka Connect’s Rest API enables administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks).

Once a Kafka Connect cluster is up and running, you can monitor and modify it. In this section, we go over a few common management tasks done via the REST API.

Using the REST Interface

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.

Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.

Connector and Task Status

You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned.

Connectors and their tasks publish status updates to a shared topic (configured with status.storage.topic) which all workers in the cluster monitor. Because the workers consume this topic asynchronously, there is typically a short delay before a state change is visible through the status API. The following states are possible for a connector or one of its tasks:

  • UNASSIGNED: The connector/task has not yet been assigned to a worker.
  • RUNNING: The connector/task is running.
  • PAUSED: The connector/task has been administratively paused.
  • FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).

In most cases, connector and task states will match, though they may be different for short periods of time when changes are occurring or if tasks have failed. For example, when a connector is first started, there may be a noticeable delay before the connector and its tasks have all transitioned to the RUNNING state. States will also diverge when tasks fail since Connect does not automatically restart failed tasks.

It’s sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. The pause state is persistent, so even if you restart the cluster, the connector will not begin message processing again until the task has been resumed. Note that there may be a delay before all of a connector’s tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted.

Common REST Examples

Below are a few common activities that are done over the REST API. These examples are shown using a worker running on localhost with default configurations and a connector named local-file-sink. We use the utility jq in some of the examples to format the response, but this is not required. The examples are intentionally simple. Advanced use of the REST API should consider the REST API references.

Get the worker’s version information

$ curl localhost:8083/ | jq
{
  "version": "0.10.0.1-cp1",
  "commit": "ea5fcd28195f168b"
}

List the connector plugins available on this worker

$ curl localhost:8083/connector-plugins | jq
[
  {
    "class": "io.confluent.connect.replicator.ReplicatorSourceConnector"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector"
  },
  {
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector"
  }
]

Listing active connectors on a worker

$ curl localhost:8083/connectors
["local-file-sink"]

Restarting a connector

$ curl -X POST localhost:8083/connectors/local-file-sink/restart
(no response printed if success)

Getting tasks for a connector

$ curl localhost:8083/connectors/local-file-sink/tasks | jq
[
  {
    "id": {
      "connector": "local-file-sink",
      "task": 0
    },
    "config": {
      "task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
      "topics": "connect-test",
      "file": "test.sink.txt"
    }
  }
]

Restarting a task

$ curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
(no response printed if success)

Pausing a connector (useful if downtime is needed for the system the connector interacts with)

$ curl -X PUT localhost:8083/connectors/local-file-sink/pause
(no response printed if success)

Resuming a connector

$ curl -X PUT localhost:8083/connectors/local-file-sink/resume
(no response printed if success)

Updating connector configuration

$ curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"FileStreamSinkConnector","file":"test.sink.txt","tasks.max":"2","topics":"connect-test","name":"local-file-sink"}' localhost:8083/connectors/local-file-sink/config
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSinkConnector",
    "file": "test.sink.txt",
    "tasks.max": "2",
    "topics": "connect-test",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    },
    {
      "connector": "local-file-sink",
      "task": 1
    }
  ]
}

Getting connector status

$ curl localhost:8083/connectors/local-file-sink/status | jq
{
  "name": "local-file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.86.101:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.86.101:8083"
    },
    {
      "state": "RUNNING",
      "id": 1,
      "worker_id": "192.168.86.101:8083"
    }
  ]
}

Getting connector configuration

$ curl localhost:8083/connectors/local-file-sink | jq
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSinkConnector",
    "file": "test.sink.txt",
    "tasks.max": "2",
    "topics": "connect-test",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    },
    {
      "connector": "local-file-sink",
      "task": 1
    }
  ]
}

Deleting a connector

$ curl -X DELETE localhost:8083/connectors/local-file-sink
(no response printed if success)

Using Community Connectors

There are a variety of connectors developed for Connect by the community. The Connector Hub has a listing. We demonstrate how to install the HDFS Sink Connector provided by Confluent in this section as if it were any other community connector:

  1. Clone the GitHub repo for the connector
$ git clone [email protected]:confluentinc/kafka-connect-hdfs.git
  1. Change into the newly cloned repo, checkout the version you want, and build the jar with maven
$ cd kafka-connect-hdfs; git checkout v3.0.1; mvn package

Note, you will want to checkout a released version typically. Here we have used the v3.0.1 release tag.

  1. Locate the packaged connector plugin with dependency jars and move to the directory you keep connector plugins
$ cp target/kafka-connect-hdfs-3.0.1-package/share/java/kafka-connect-hdfs/* /opt/confluent-3.0.1/share/java/kafka-connect-hdfs

Be sure you have added the location where the connector plugin jar is located to the CLASSPATH for workers that need to run the connector as described here.

Upgrading a Connector Plugin

Upgrading a connector is similar to upgrading any other Kafka client application. You should refer to the documentation for individual connector plugins if you have a need for rolling upgrades. The instructions below cover the procedure to upgrade any generic connector plugin by taking a short outage.

  1. Download the new connector plugin jar
  2. Stop all connect workers
  3. Install the new connector plugin jar per the instructions.
  4. Start up the workers
  5. Start up the connector (if using distributed mode)