Confluent QuickstartΒΆ
You can get up and running with the full Confluent platform quickly on a single server. In this quickstart we’ll show how to run ZooKeeper, Kafka, and the Schema Registry and then write and read some Avro data to Kafka.
Download and install the Confluent platform. In this quickstart we’ll use the zip archive, but there are many other installation options.
$ wget http://packages.confluent.io/archive/2.0/confluent-2.0.1-2.11.7.zip $ unzip confluent-2.0.1-2.11.7.zip $ cd confluent-2.0.1
Here is a high-level view of the contents of the package:
confluent-2.0.1/bin/ - Driver scripts for starting/stopping services confluent-2.0.1/etc/ - Configuration files confluent-2.0.1/share/java/ - Jars
If you installed from deb or rpm packages, the contents are installed globally and you’ll need to adjust the paths used below:
/usr/bin/ - Driver scripts for starting/stopping services, prefixed with <package> names /etc/<package> - Configuration files /usr/share/java/<package> - Jars
Start Zookeeper. Since this is a long-running service, you should run it in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Start Kafka, also in its own terminal.
$ ./bin/kafka-server-start ./etc/kafka/server.properties
Start the Schema Registry, also in its own terminal.
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
Now we have all the services running and can send some Avro data to a Kafka topic. Although you would normally do this from one of your applications, we’ll use a utility provided with Kafka to send the data without having to write any code. We direct it at our local Kafka cluster, tell it to write to the topic
test
, read each line of input as an Avro message, validate the schema against the Schema Registry at the specified URL, and finally indicate the format of the data.$ ./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Once started, the process will wait for you to enter messages, one per line, and will send them immediately when you hit the
Enter
key. Try entering a couple of messages:{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
When you’re done, use
Ctrl+C
to shut down the process.Now we can check that the data was produced by using Kafka’s console consumer process to read data from the topic. We point it at the same
test
topic, our ZooKeeper instance, tell it to decode each message using Avro using the same Schema Registry URL to look up schemas, and finally tell it to start from the beginning of the topic (by default the consumer only reads messages published after it starts).$ ./bin/kafka-avro-console-consumer --topic test \ --zookeeper localhost:2181 \ --from-beginning
You should see all the messages you created in the previous step written to the console in the same format.
The consumer does not exit after reading all the messages so it can listen for and process new messages as they are published. Try keeping the consumer running and repeating step 5 – you will see messages delivered to the consumer immediately after you hit
Enter
for each message in the producer.When you’re done, shut down the consumer with
Ctrl+C
.Now let’s try to produce data to the same topic using an incompatible schema. We’ll run the producer with nearly the same command, but change the schema to expect plain integers.
$ ./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test \ --property value.schema='{"type":"int"}'
Now if you enter an integer and hit enter, you should see an exception:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146) at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49) at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
When the producer tried to send a message, it checked the schema with the Schema Registry, which returned an error indicating the schema was invalid because it does not preserve backwards compatibility (the default Schema Registry setting). The console producer simply reports this error and exits, but your own applications could handle the problem more gracefully. Most importantly, we’ve guaranteed no incompatible data was published to Kafka.
When you’re done testing, you can use
Ctrl+C
to shutdown each service, in the reverse order that you started them.
This simple guide only covered the Kafka and the Schema registry to get you started with the core services. See the documentation for each component for a quickstart guide specific to that component: