All Worker Configs¶
Below is an exhaustive list of configuration options related to Connect Workers broken into several sections. The first lists options that can be set in either standalone or distributed mode. These control basic functionality like which Kafka cluster to communicate with and what format data you’re working with. The next two sections list settings specific to standalone or distributed mode.
Common Worker Configs¶
bootstrap.serversA list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).- Type: list
- Default: [localhost:9092]
- Importance: high
key.converterConverter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.
- Type: class
- Default:
- Importance: high
value.converterConverter class for value Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.
- Type: class
- Default:
- Importance: high
internal.key.converterConverter class for internal key Connect data that implements the
Converterinterface. Used for converting data like offsets and configs.- Type: class
- Default:
- Importance: low
internal.value.converterConverter class for offset value Connect data that implements the
Converterinterface. Used for converting data like offsets and configs.- Type: class
- Default:
- Importance: low
offset.flush.interval.msInterval at which to try committing offsets for tasks.
- Type: long
- Default: 60000
- Importance: low
offset.flush.timeout.msMaximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.
- Type: long
- Default: 5000
- Importance: low
rest.advertised.host.nameIf this is set, this is the hostname that will be given out to other workers to connect to.
- Type: string
- Importance: low
rest.advertised.portIf this is set, this is the port that will be given out to other workers to connect to.
- Type: int
- Importance: low
rest.host.nameHostname for the REST API. If this is set, it will only bind to this interface.
- Type: string
- Importance: low
rest.portPort for the REST API to listen on.
- Type: int
- Default: 8083
- Importance: low
task.shutdown.graceful.timeout.msAmount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.
- Type: long
- Default: 5000
- Importance: low
Standalone Worker Configuration¶
In addition to the common worker configuration options, the following are available in standalone mode.
offset.storage.file.filenameThe file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.
- Type: string
- Default: “”
- Importance: high
Distributed Worker Configuration¶
In addition to the common worker configuration options, the following are available in distributed mode.
group.idA unique string that identifies the Connect cluster group this worker belongs to.
- Type: string
- Default: “”
- Importance: high
config.storage.topicThe topic to store connector and task configuration data in. This must be the same for all workers with the same
group.id- Type: string
- Default: “”
- Importance: high
offset.storage.topicThe topic to store offset data for connectors in. This must be the same for all workers with the same
group.id- Type: string
- Default: “”
- Importance: high
heartbeat.interval.msThe expected time between heartbeats to the group coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the worker’s session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than
session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.- Type: int
- Default: 3000
- Importance: high
session.timeout.msThe timeout used to detect failures when using Kafka’s group management facilities.
- Type: int
- Default: 30000
- Importance: high
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.
- Type: password
- Importance: high
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.
- Type: string
- Importance: high
ssl.keystore.passwordThe store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured.
- Type: password
- Importance: high
ssl.truststore.locationThe location of the trust store file.
- Type: string
- Importance: high
ssl.truststore.passwordThe password for the trust store file.
- Type: password
- Importance: high
connections.max.idle.msClose idle connections after the number of milliseconds specified by this config.
- Type: long
- Default: 540000
- Importance: medium
receive.buffer.bytesThe size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
- Type: int
- Default: 32768
- Importance: medium
request.timeout.msThe configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
- Type: int
- Default: 40000
- Importance: medium
sasl.kerberos.service.nameThe Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.
- Type: string
- Importance: medium
security.protocolProtocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
- Type: string
- Default: “PLAINTEXT”
- Importance: medium
send.buffer.bytesThe size of the TCP send buffer (SO_SNDBUF) to use when sending data.
- Type: int
- Default: 131072
- Importance: medium
ssl.enabled.protocolsThe list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.
- Type: list
- Default: [TLSv1.2, TLSv1.1, TLSv1]
- Importance: medium
ssl.keystore.typeThe file format of the key store file. This is optional for client. Default value is JKS
- Type: string
- Default: “JKS”
- Importance: medium
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
- Type: string
- Default: “TLS”
- Importance: medium
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
- Type: string
- Importance: medium
ssl.truststore.typeThe file format of the trust store file. Default value is JKS.
- Type: string
- Default: “JKS”
- Importance: medium
worker.sync.timeout.msWhen the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining.
- Type: int
- Default: 3000
- Importance: medium
worker.unsync.backoff.msWhen the worker is out of sync with other workers and fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.
- Type: int
- Default: 300000
- Importance: medium
client.idAn id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
- Type: string
- Default: “”
- Importance: low
metadata.max.age.msThe period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.
- Type: long
- Default: 300000
- Importance: low
metric.reportersA list of classes to use as metrics reporters. Implementing the
MetricReporterinterface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.- Type: list
- Default: []
- Importance: low
metrics.num.samplesThe number of samples maintained to compute metrics.
- Type: int
- Default: 2
- Importance: low
metrics.sample.window.msThe number of samples maintained to compute metrics.
- Type: long
- Default: 30000
- Importance: low
reconnect.backoff.msThe amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
- Type: long
- Default: 50
- Importance: low
retry.backoff.msThe amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
- Type: long
- Default: 100
- Importance: low
sasl.kerberos.kinit.cmdKerberos kinit command path. Default is /usr/bin/kinit
- Type: string
- Default: “/usr/bin/kinit”
- Importance: low
sasl.kerberos.min.time.before.reloginLogin thread sleep time between refresh attempts.
- Type: long
- Default: 60000
- Importance: low
sasl.kerberos.ticket.renew.jitterPercentage of random jitter added to the renewal time.
- Type: double
- Default: 0.05
- Importance: low
sasl.kerberos.ticket.renew.window.factorLogin thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.
- Type: double
- Default: 0.8
- Importance: low
ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.
- Type: list
- Importance: low
ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate.
- Type: string
- Importance: low
ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
- Type: string
- Default: “SunX509”
- Importance: low
ssl.trustmanager.algorithmThe algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
- Type: string
- Default: “PKIX”
- Importance: low