Installing and Configuring Replicator

Before running Replicator, you need to make sure you download Confluent Platform Enterprise from http://confluent.io/download/ (Replicator is not included in Confluent Open Source). Next extract the contents of the archive. Then, make sure you have two clusters of Kafka Brokers up and running. If you are not sure about that, you can refer to Replicator Quick Start or Confluent Platform Quick Start.

Confluent Replicator is a Kafka Connect Plugin. In order to run Replicator, you need to take the following steps:

  • Install and Configure Kafka Connect Cluster
  • Configure and run a Confluent Replicator on the Connect Cluster

In this section we will walk you through both these steps in detail, as well as review the available configuration options for Replicator.

Install and Configure Kafka Connect Cluster for Replicator

Replicator runs as a plugin (Connector) in Kafka Connect, so you’ll need to run Kafka Connect Workers before you can run Replicator. In the quickstart, we’ve seen how to run Replicator in Connect’s stand-alone mode. In stand-alone mode you run a single command, that includes both the Connect Worker configuration and Replicator configuration. The command starts both the Worker and the Replicator plugin at the same time.

Stand-alone mode is recommended for POC and small deployments where the throughput from a single Replicator node is sufficient. For larger-scale production deployments, you’ll want to run multiple Connect Workers in distributed mode.

Refer to Connect documentation to learn how to run Connect in distributed mode.

Few things to keep in mind when configuring distributed connect workers for Replicator:

Configuring origin and destination brokers

Connect clusters are associated with a cluster of Kafka brokers. The brokers of the Kafka cluster are specified in bootstrap.servers configuration parameter of the Connect workers. If you are configuring a new Connect worker cluster for running Replicator, make sure this parameter contains the destination Kafka brokers cluster. If you are planning on running Replicator on an existing Connect cluster, make sure it is already associated with the destination brokers.

Note

Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker which is responsible for writing the events to the destination cluster. Therefore we configure Replicator with information about the origin and the Worker with information about the destination.

Where to Install Connect Workers

If you are replicating events between different data centers (rather than between two Kafka clusters in the same data center), we recommend running the Connect Workers in the *destination* data center. So if you are sending data from NYC to SF, Replicator should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a data-center. If there is a network partition and you lose connectivity between the data-centers, having a consumer that is unable to connect to a cluster is much safer than a producer that can’t connect. If the consumer can’t connect, it simply won’t be able to read events - but the events are still stored in the origin Kafka cluster and can remain there for a long time. There is no risk to losing events. On the other hand, if the events were already consumed and Replicator can’t produce them due to network partition, there is always a risk that these events will accidentally get lost. So remote consuming is safer than remote producing.

Running Replicator on Existing Connect Cluster

It is ok to run Replicator on the same Connect cluster as other connectors, but in some cases it is not recommended:

  • If you are replicating data between two data centers that are far apart and thus have high latency, you’ll want to tune both the Connect Worker and Replicator appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving Replicator its own connect cluster, you can tune the Connect Workers specifically for Replicator without worrying about other connectors being affected.
  • Any changes to a connector will cause Replicator to pause while connectors are being re-assigned to workers. If you frequently start and stop connectors, you may want to run Replicator on its own cluster and allow it to run without interruptions.

Configuring Logging for Connect Cluster

Kafka Connect logging is configured in the file etc/kafka/connect-log4j.properties. By default it just writes to console, but for production deployments you will want to log to a file. Before you start Replicator, add these lines to the connect-log4j.properties file:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

And edit the log4j.rootLogger parameter to now include this newly defined appender file that you created above:

# By default
log4j.rootLogger=INFO, stdout, file

License Key

Without the license key, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license key. Then add confluent.license configuration to the Replicator configuration file (see below) followed by the key you recieved from Confluent support.

Configure and run a Confluent Replicator on the Connect Cluster

In the quickstart, we’ve seen how to run Replicator in Connect’s stand-alone mode. We’ll now see how to run Replicator on a distributed Connect cluster.

We’ll assume that at least one distributed mode Connect Worker is already up and running. If you are not sure how to do that, review the distributed mode documentation.

You can check if the Connect Worker is up and running by checking its REST API:

$ curl http://localhost:8083/
{"version":"0.10.2.1-cp1","commit":"078e7dc02a100018"}

If everything is fine, you will see a version number and commit hash for the version of the Connect worker you are running.

We’ll run Replicator by sending the Connect REST API its configuration file in JSON format (note that this is different from stand-alone mode that uses Java’s property file format). Here’s an example configuration:

{
        "name":"replicator",
        "config":{
                "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
                "tasks.max":4,
                "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "src.kafka.bootstrap.servers":"localhost:9082",
                "src.zookeeper.connect":"localhost:2171",
                "dest.zookeeper.connect":"localhost:2181",
                "topic.whitelist":"test-topic",
                "topic.rename.format":"${topic}.replica",
                "confluent.license":"eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJDb25mbHVlbnQiLCJhdWQiOiJDMDAwMDAiLCJleHAiOjE0OTk0NzIwMDAsImp0aSI6ImpJNFpCM0RjNlNoUDJXejVHd04xY2ciLCJpYXQiOjE0OTkyNjk4OTEsIm5iZiI6MTQ5OTI2OTc3MSwic3ViIjoiY29udHJvbC1jZW50ZXIiLCJtb25pdG9yaW5nIjp0cnVlfQ.dnFkb9BS95Bv47HVlpI1OhSxrbK0nWOD0eRqPCcOgWrh5Pp6H-NQOlt5qtECgPfxMkV-Z5xAdf7l6p3-Ou3q7wWVmFAb8zkUrXnz_TmCkBN117fbUsZ0WZ1GAKxU1CsACsZ5rARSicGFJ54MuibwCCcHtAEOV5_Sv39t-cTRTw-cSE_NWpYyg77V7AAIirFVDMTZTFUg9RBCVEWu59UF1iYgkvlmN4qC0TdchfnTS4XQDuJlM_opYUEbZZoFxj8UY-dMyi136DFGaVF37LSaJguXCAm3KjCar8ipvyX5oLGmHhekw9b-xoEr-j4VTW_9QSfOHNDJ_ssGIISOJjBCPA"
        }
}

You can send this to Replicator using curl. This assumes the above JSON is in a file called example-replicator.json:

curl -X POST -d @example-replicator.json  http://localhost:8083/connectors --header "content-Type:application/json"

This example demonstrates the use of some important configuration parameters. You can read an explanation of all the configuration parameters in here.

  • key.converter and value.converter - Classes used to convert Kafka records to Connect’s internal format. The Connect Worker configuration specifies global converters and those will be used if you don’t specify anything in the Replicator configuration. For Replication, however, no conversion is necessary. We just want to read bytes out of the origin cluster and write them to the destination with no changes. So we override the global converters with the ByteArrayConverter which just leaves the records as is.
  • src.kafka.bootstrap.servers - A list of brokers from the origin cluster
  • src.zookeeper.connect and dest.zookeeper.connect - Connection strings for Zookeeper in the origin and destination clusters respectively. These are used to replicate topic configuration from origin to destination.
  • topic.whitelist - An explicit list of the topics you want replicated. In this quickstart, we will replicate the topic named “test-topic.”. As you can see in the configuration list, in addition to using whitelists, you can also tell Replicator which topics to replicate using a regular expression. You should use a regular expression if you want Replicator to automatically start replicating new topics if they match a certain pattern. For example if you want to replicate all production topics, including new ones, configure Replicator to replicate topics that match prod.*. Note that if you modify the list in order to add new topics, you will need to bounce Replicator for the change to take effect.
  • topic.rename.format - A substitution string that is used to rename topics in the destination cluster. In the snippet above, we have used ${topic}.replica, where ${topic} will be substituted with the topic name from the origin cluster. That means that the test-topic we’re replicating from the origin cluster will be renamed to test-topic.replica in the destination cluster.
  • confluent.license - Without the license, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license. Then use it as we show in the example.