SecurityΒΆ
If you have enabled SSL or SASL in your Kafka cluster, then you will need to make sure that the Connect workers are also configured to use them. Underneath the covers, the Connect worker is really just an advanced client. You can configure security support for it using the same configuration options used by the standard producer and consumer. For example, to enable SSL (without client authentication), you could add the following options to the worker’s configuration:
# Worker security are located at the top level
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
See the security documentation for more information on the different options that are available. The top-level settings are used by the worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (e.g. configs and offsets).
For the connectors to leverage security, however, you also need to override the default producer/consumer configuration that the worker uses. To do so, you simply prefix “producer” for the configuration options which the worker should use for source connectors, and “consumer” for sink connectors. Continuing the same example, you might add the following to the worker’s configuration:
# Source security settings are prefixed with "producer"
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
# Sink security settings are prefixed with "consumer"
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
As of now, there is no way to change the configuration for connectors individually, but if your server supports client authentication over SSL, it is possible to use a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores. In the example below, we show what the worker configuration might look like in this case:
# Worker authentication settings
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
# Source authentication settings
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
producer.ssl.keystore.location=/var/private/ssl/kafka.connector.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
# Sink authentication settings
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.ssl.keystore.location=/var/private/ssl/kafka.connector.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234
Using separate principals for the connectors allows you to define access control lists (ACLs) with some finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Although we have used the same principal for sources and sinks in this example, they could be different as well. This would allow you to prevent sink connectors from writing to any topics.
Note that if you are using SASL for authentication, you must use the same principal for workers and connectors.
At a minimum, the worker must be given access to all the internal topics it needs to access and to the common group which all workers in the cluster join. The table below shows each required permission and the relevant configuration setting used to define its value.
Operation(s) | Resource | Configuration Item |
---|---|---|
Read/Write | Topic | config.storage.topic |
Read/Write | Topic | offsets.storage.topic |
Read/Write | Topic | status.storage.topic |
Read | Group | group.id |
See Adding ACLs for documentation on creating new ACLs from the command line.
Source connectors must be given WRITE
permission to any topics
that they need to write to. Similarly, sink connectors need READ
permission to any topics they will read from. They also need Group
READ
permission since sink tasks depend on consumer groups
internally. Connect defines the consumer group.id
conventionally
for each sink connector as connect-{name}
where {name}
is
substituted by the name of the connector. For example, if your sink
connector is named “hdfs-logs” and it reads from a topic named “logs,”
then you could add an ACL with the following command:
$ bin/kafka-acls --authorizer-properties zookeeper.connect=<Zk host:port> \
--add --allow-principal User:<Sink Connector Principal> \
--consumer --topic logs --group connect-hdfs-logs