Configuration Options¶
Schema Registry Configuration¶
schema.registry.url
URL for schema registry. This is required for Avro message decoder to interact with schema registry.
- Type: string
- Required: Yes
max.schemas.per.subject
The max number of schema objects allowed per subject.
- Type: int
- Required: No
- Default: 1000
Camus Job Configuration¶
camus.job.name
The name of the Camus job.
- Type: string
- Required: No
- Default: “Camus Job”
camus.message.decoder.class
Decoder class for Kafka Messages to Avro Records. The default value is Confluent version of AvroMessageDecoder which integrates schema registry.
- Type: string
- Required: No
- Default: io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
etl.record.writer.provider.class
Class for writing records to HDFS/S3.
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
etl.partitioner.class
Class for partitioning Camus output. The default partitioner partitions incoming data into hourly partitions
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner
camus.work.allocator.class
Class for creating input splits from ETL requests
- Type: string
- Required: No
- Default: com.linkedin.camus.workallocater.BaseAllocator
etl.destination.path
Top-level output directory, sub-directories will be dynamically created for each topic pulled.
- Type: string
- Required: Yes
etl.execution.base.path
HDFS location where you want to keep execution files, i.e. offsets, error logs and count files.
- Type: string
- Required: Yes
etl.execution.history.path
HDFS location to keep historical execution files, usually a sub directory of
etl.execution.base.path
.- Type: string
- Required: Yes
hdfs.default.classpath.dir
All files in this directory will be added to the distributed cache and placed on the classpath for Hadoop tasks.
- Type: string
- Required: No
- Default: null
mapred.map.tasks
Max hadoop tasks to use, each task can pull multiple topic partitions.
- Type: int
- Required: No
- Default: 30
Kafka Configuration¶
kafka.brokers
List of Kafka brokers for Camus to pull metadata from.
- Type: string
- Required: Yes
kafka.max.pull.hrs
The max duration from the timestamp of the first record to the timestamp of the last record. When the max is reached, the pull will cease. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.historical.days
Events with a timestamp older than this will be discarded. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.pull.minutes.per.task
Max minutes for each mapper to pull messages.
- Type: int
- Required: No
- Default: -1
kafka.blacklist.topics
Nothing on the blacklist is pulled from Kafka.
- Type: string
- Required: No
- Default: null
kafka.whitelist.topics
If whitelist has values, only whitelisted topic are pulled from Kafka.
- Type: string
- Required: No
- Default: null
etl.output.record.delimiter
Delimiter for writing string records.
- Type: string
- Required: No
- Default: “\n”
Example Configuration¶
# The job name.
camus.job.name=Camus Job
# Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port
kafka.brokers=
# Top-level data output directory, sub-directories are dynamically created for each topic pulled
etl.destination.path=/user/username/topics
# HDFS location to keep execution files: requests, offsets, error logs and count files
etl.execution.base.path=/user/username/exec
# HDFS location of keep historical execution files, usually a sub-directory of the base.path
etl.execution.history.path=/user/username/camus/exec/history
# Concrete implementation of the decoder class to use.
camus.message.decoder.class=io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
# Max number of MapReduce tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# Max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# Events with a timestamp older than this will be discarded
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages from Kafka (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
# Only topics in whitelist are pulled from Kafka and no topics from the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=
log4j.configuration=true
# Name of the client as seen by kafka
kafka.client.name=camus
# Stops the mapper from getting inundated with decoder exceptions from the same topic
max.decoder.exceptions.to.print=5