Security

Configuring Workers with Security

If you have enabled SSL or SASL in your Kafka cluster, then you will need to make sure that the Connect workers are also configured to use them. Underneath the covers, the Connect worker is really just an advanced client. You can configure security support for it using the same configuration options used by the standard producer and consumer. For example, to enable SSL (without client authentication), you could add the following options to the worker’s configuration:

# Worker security are located at the top level
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234

See the security documentation for more information on the different options that are available. The top-level settings are used by the worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (e.g. configs and offsets).

Configuring Connectors with Security

For the connectors to leverage security, however, you also need to override the default producer/consumer configuration that the worker uses. To do so, you simply prefix “producer” for the configuration options which the worker should use for source connectors, and “consumer” for sink connectors. Continuing the same example, you might add the following to the worker’s configuration:

# Source security settings are prefixed with "producer"
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234

# Sink security settings are prefixed with "consumer"
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234

As of now, there is no way to change the configuration for connectors individually, but if your server supports client authentication over SSL, it is possible to use a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores. In the example below, we show what the worker configuration might look like in this case:

# Worker authentication settings
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234

# Source authentication settings
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234

# Sink authentication settings
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234

ACL Considerations

Using separate principals for the connectors allows you to define access control lists (ACLs) with finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Additionally, you can use different keystores for source and sink connectors and enable scenarios where source connectors have only write access to a topic but sink connectors have only read access to the same topic.

Note that if you are using SASL for authentication, you must use the same principal for workers and connectors as only a single JAAS is currently supported on the client side at this time as described here.

Worker ACL Requirements

Workers must be given access to all the internal topics required by Connect and to the common group which all workers in the cluster join. The table below shows each required permission and the relevant configuration setting used to define its value.

Operation(s) Resource Configuration Item
Read/Write Topic config.storage.topic
Read/Write Topic offsets.storage.topic
Read/Write Topic status.storage.topic
Read Group group.id

See Adding ACLs for documentation on creating new ACLs from the command line.

Connector ACL Requirements

Source connectors must be given WRITE permission to any topics that they need to write to. Similarly, sink connectors need READ permission to any topics they will read from. They also need Group READ permission since sink tasks depend on consumer groups internally. Connect defines the consumer group.id conventionally for each sink connector as connect-{name} where {name} is substituted by the name of the connector. For example, if your sink connector is named “hdfs-logs” and it reads from a topic named “logs,” then you could add an ACL with the following command:

$ bin/kafka-acls --authorizer-properties zookeeper.connect=<Zk host:port> \
   --add --allow-principal User:<Sink Connector Principal> \
   --consumer --topic logs --group connect-hdfs-logs