Configuration Options¶
Schema Registry Configuration¶
schema.registry.url
URL for schema registry. This is required for Avro message decoder to interact with schema registry.
- Type: string
- Required: Yes
max.schemas.per.subject
The max number of schema objects allowed per subject.
- Type: int
- Required: No
- Default: 1000
is.new.producer
Set to true if the data Camus is consuming from Kafka was created using the new producer, or set to false if it was created with the old producer.
- Type: boolean
- Required: No
- Default: true
Camus Job Configuration¶
camus.job.name
The name of the Camus job.
- Type: string
- Required: No
- Default: “Camus Job”
camus.message.decoder.class
Decoder class for Kafka Messages to Avro Records. The default value is Confluent version of AvroMessageDecoder which integrates schema registry.
- Type: string
- Required: No
- Default: io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
etl.record.writer.provider.class
Class for writing records to HDFS/S3.
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
etl.partitioner.class
Class for partitioning Camus output. The default partitioner partitions incoming data into hourly partitions
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner
camus.work.allocator.class
Class for creating input splits from ETL requests
- Type: string
- Required: No
- Default: com.linkedin.camus.workallocater.BaseAllocator
etl.destination.path
Top-level output directory, sub-directories will be dynamically created for each topic pulled.
- Type: string
- Required: Yes
etl.execution.base.path
HDFS location where you want to keep execution files, i.e. offsets, error logs and count files.
- Type: string
- Required: Yes
etl.execution.history.path
HDFS location to keep historical execution files, usually a sub directory of
etl.execution.base.path
.- Type: string
- Required: Yes
hdfs.default.classpath.dir
All files in this directory will be added to the distributed cache and placed on the classpath for Hadoop tasks.
- Type: string
- Required: No
- Default: null
mapred.map.tasks
Max hadoop tasks to use, each task can pull multiple topic partitions.
- Type: int
- Required: No
- Default: 30
Kafka Configuration¶
kafka.brokers
List of Kafka brokers for Camus to pull metadata from.
- Type: string
- Required: Yes
kafka.max.pull.hrs
The max duration from the timestamp of the first record to the timestamp of the last record. When the max is reached, the pull will cease. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.historical.days
Events with a timestamp older than this will be discarded. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.pull.minutes.per.task
Max minutes for each mapper to pull messages.
- Type: int
- Required: No
- Default: -1
kafka.blacklist.topics
Nothing on the blacklist is pulled from Kafka.
- Type: string
- Required: No
- Default: null
kafka.whitelist.topics
If whitelist has values, only whitelisted topic are pulled from Kafka.
- Type: string
- Required: No
- Default: null
etl.output.record.delimiter
Delimiter for writing string records.
- Type: string
- Required: No
- Default: “\n”
Example Configuration¶
# The job name.
camus.job.name=Camus Job
# Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port
kafka.brokers=
# Top-level data output directory, sub-directories are dynamically created for each topic pulled
etl.destination.path=/user/username/topics
# HDFS location to keep execution files: requests, offsets, error logs and count files
etl.execution.base.path=/user/username/exec
# HDFS location of keep historical execution files, usually a sub-directory of the base.path
etl.execution.history.path=/user/username/camus/exec/history
# Concrete implementation of the decoder class to use.
camus.message.decoder.class=io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
# Max number of MapReduce tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# Max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# Events with a timestamp older than this will be discarded
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages from Kafka (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
# Only topics in whitelist are pulled from Kafka and no topics from the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=
log4j.configuration=true
# Name of the client as seen by kafka
kafka.client.name=camus
# Stops the mapper from getting inundated with decoder exceptions from the same topic
max.decoder.exceptions.to.print=5