Configuration Options¶
Schema Registry Configuration¶
schema.registry.urlURL for schema registry. This is required for Avro message decoder to interact with schema registry.
- Type: string
- Required: Yes
max.schemas.per.subjectThe max number of schema objects allowed per subject.
- Type: int
- Required: No
- Default: 1000
is.new.producerSet to true if the data Camus is consuming from Kafka was created using the new producer, or set to false if it was created with the old producer.
- Type: boolean
- Required: No
- Default: true
Camus Job Configuration¶
camus.job.nameThe name of the Camus job.
- Type: string
- Required: No
- Default: “Camus Job”
camus.message.decoder.classDecoder class for Kafka Messages to Avro Records. The default value is Confluent version of AvroMessageDecoder which integrates schema registry.
- Type: string
- Required: No
- Default: io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
etl.record.writer.provider.classClass for writing records to HDFS/S3.
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
etl.partitioner.classClass for partitioning Camus output. The default partitioner partitions incoming data into hourly partitions
- Type: string
- Required: No
- Default: com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner
camus.work.allocator.classClass for creating input splits from ETL requests
- Type: string
- Required: No
- Default: com.linkedin.camus.workallocater.BaseAllocator
etl.destination.pathTop-level output directory, sub-directories will be dynamically created for each topic pulled.
- Type: string
- Required: Yes
etl.execution.base.pathHDFS location where you want to keep execution files, i.e. offsets, error logs and count files.
- Type: string
- Required: Yes
etl.execution.history.pathHDFS location to keep historical execution files, usually a sub directory of
etl.execution.base.path.- Type: string
- Required: Yes
hdfs.default.classpath.dirAll files in this directory will be added to the distributed cache and placed on the classpath for Hadoop tasks.
- Type: string
- Required: No
- Default: null
mapred.map.tasksMax hadoop tasks to use, each task can pull multiple topic partitions.
- Type: int
- Required: No
- Default: 30
Kafka Configuration¶
kafka.brokersList of Kafka brokers for Camus to pull metadata from.
- Type: string
- Required: Yes
kafka.max.pull.hrsThe max duration from the timestamp of the first record to the timestamp of the last record. When the max is reached, the pull will cease. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.historical.daysEvents with a timestamp older than this will be discarded. -1 means no limit.
- Type: int
- Required: No
- Default: -1
kafka.max.pull.minutes.per.taskMax minutes for each mapper to pull messages.
- Type: int
- Required: No
- Default: -1
kafka.blacklist.topicsNothing on the blacklist is pulled from Kafka.
- Type: string
- Required: No
- Default: null
kafka.whitelist.topicsIf whitelist has values, only whitelisted topic are pulled from Kafka.
- Type: string
- Required: No
- Default: null
etl.output.record.delimiterDelimiter for writing string records.
- Type: string
- Required: No
- Default: “\n”
Example Configuration¶
# The job name.
camus.job.name=Camus Job
# Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port
kafka.brokers=
# Top-level data output directory, sub-directories are dynamically created for each topic pulled
etl.destination.path=/user/username/topics
# HDFS location to keep execution files: requests, offsets, error logs and count files
etl.execution.base.path=/user/username/exec
# HDFS location of keep historical execution files, usually a sub-directory of the base.path
etl.execution.history.path=/user/username/camus/exec/history
# Concrete implementation of the decoder class to use.
camus.message.decoder.class=io.confluent.camus.etl.kafka.coders.AvroMessageDecoder
# Max number of MapReduce tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# Max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# Events with a timestamp older than this will be discarded
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages from Kafka (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
# Only topics in whitelist are pulled from Kafka and no topics from the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=
log4j.configuration=true
# Name of the client as seen by kafka
kafka.client.name=camus
# Stops the mapper from getting inundated with decoder exceptions from the same topic
max.decoder.exceptions.to.print=5