Security¶
Configuring Workers with Security¶
If you have enabled SSL or SASL in your Kafka cluster, then you will need to make sure that the Connect workers are also configured to use them. Underneath the covers, the Connect worker is really just an advanced client. You can configure security support for it using the same configuration options used by the standard producer and consumer. For example, to enable SSL (without client authentication), you could add the following options to the worker’s configuration:
# Worker security are located at the top level
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
See the security documentation for more information on the different options that are available. The top-level settings are used by the worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (e.g. configs and offsets).
Configuring Connectors with Security¶
For the connectors to leverage security, however, you also need to override the default producer/consumer configuration that the worker uses. To do so, you simply prefix “producer” for the configuration options which the worker should use for source connectors, and “consumer” for sink connectors. Continuing the same example, you might add the following to the worker’s configuration:
# Source security settings are prefixed with "producer"
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
# Sink security settings are prefixed with "consumer"
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
As of now, there is no way to change the configuration for connectors individually, but if your server supports client authentication over SSL, it is possible to use a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores. In the example below, we show what the worker configuration might look like in this case:
# Worker authentication settings
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
# Source authentication settings
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
# Sink authentication settings
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234
ACL Considerations¶
Using separate principals for the connectors allows you to define access control lists (ACLs) with finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Additionally, you can use different keystores for source and sink connectors and enable scenarios where source connectors have only write access to a topic but sink connectors have only read access to the same topic.
Note that if you are using SASL for authentication, you must use the same principal for workers and connectors as only a single JAAS is currently supported on the client side at this time as described here.
Worker ACL Requirements¶
Workers must be given access to all the internal topics required by Connect and to the common group which all workers in the cluster join. The table below shows each required permission and the relevant configuration setting used to define its value.
Operation(s) | Resource | Configuration Item |
---|---|---|
Read/Write | Topic | config.storage.topic |
Read/Write | Topic | offsets.storage.topic |
Read/Write | Topic | status.storage.topic |
Read | Group | group.id |
See Adding ACLs for documentation on creating new ACLs from the command line.
Connector ACL Requirements¶
Source connectors must be given WRITE
permission to any topics
that they need to write to. Similarly, sink connectors need READ
permission to any topics they will read from. They also need Group
READ
permission since sink tasks depend on consumer groups
internally. Connect defines the consumer group.id
conventionally
for each sink connector as connect-{name}
where {name}
is
substituted by the name of the connector. For example, if your sink
connector is named “hdfs-logs” and it reads from a topic named “logs,”
then you could add an ACL with the following command:
$ bin/kafka-acls --authorizer-properties zookeeper.connect=<Zk host:port> \
--add --allow-principal User:<Sink Connector Principal> \
--consumer --topic logs --group connect-hdfs-logs