Confluent Platform Quick StartΒΆ

You can get up and running with the full Confluent platform quickly on a single server. In this quick start we’ll show how to run ZooKeeper, Kafka, and the Schema Registry and then write and read some Avro data to and from Kafka.

(If you want to start a data pipeline using Control Center, see The Control Center QuickStart Guide.)

  1. Download and install the Confluent Platform using one of the installation options. In this quick start we’ll use the zip archive.

    Here is a high-level view of the contents of the package:

    confluent-4.0.0/bin/        # Driver scripts for starting/stopping services
    confluent-4.0.0/etc/        # Configuration files
    confluent-4.0.0/share/java/ # Jars
    

    If you installed from deb or rpm packages, the contents are installed globally and you’ll need to adjust the paths used below:

    /usr/bin/                  # Confluent CLI and individual driver scripts for starting/stopping services, prefixed with <package> names
    /etc/<package>/            # Configuration files
    /usr/share/java/<package>/ # Jars
    
  2. Start Confluent services using Confluent CLI [1] . Here we will only start Zookeeper, Kafka, and Schema Registry.

    Tip

    If not already in your PATH, add Confluent’s bin directory by running: export PATH=<path-to-confluent>/bin:$PATH

    To start just Zookeeper, Kafka and Schema Registry run:

    $ confluent start schema-registry
    

    Each service reads its configuration from its property files under etc. During our quickstarts we will use default properties unless stated otherwise. After issuing the command above, the services start in order, printing a status message as follows:

    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    

    Note

    Alternatively, to manually start each service in its own terminal, the equivalent commands are:

    $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
    $ ./bin/kafka-server-start ./etc/kafka/server.properties
    $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
    
  3. Now we have all the services running and can send some Avro data to a Kafka topic. Although you would normally do this from within your applications, here we’ll use a utility provided with Kafka to send the data without having to write any code. We direct it at our local Kafka cluster, tell it to write to topic test, read each line of input as an Avro message, validate the schema against the Schema Registry at the specified URL, and finally indicate the format of the data.

    $ <path-to-confluent>/bin/kafka-avro-console-producer \
             --broker-list localhost:9092 --topic test \
             --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    

    Tip: After the producer is started, the process will wait for you to enter messages and your terminal may appear idle.

    Enter a single message per line and press the Enter key to send them immediately. Try entering a couple of messages:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    

    When you’re done, use Ctrl+C to shut down the process.

    Note

    If you press Enter with an empty line, it will be interpreted as a null value and cause an error. You can simply start the console producer again to continue sending messages.

  4. Now we can check that the data was produced by using Kafka’s console consumer process to read data from the topic. We point it at the same test topic, our ZooKeeper instance, tell it to decode each message using Avro using the same Schema Registry URL to look up schemas, and finally tell it to start from the beginning of the topic (by default the consumer only reads messages published after it starts).

    $ ./bin/kafka-avro-console-consumer --topic test \
             --zookeeper localhost:2181 \
             --from-beginning
    

    You should see all the messages you created in the previous step written to the console in the same format.

    The consumer does not exit after reading all the messages so it can listen for and process new messages as they are published. Try keeping the consumer running and repeating step 3 – you will see messages delivered to the consumer immediately after you hit Enter for each message in the producer.

    When you’re done, shut down the consumer with Ctrl+C.

  5. Now let’s try to produce data to the same topic using an incompatible schema. We’ll run the producer with nearly the same command, but change the schema to expect plain integers.

    $ ./bin/kafka-avro-console-producer \
             --broker-list localhost:9092 --topic test \
             --property value.schema='{"type":"int"}'
    

    Now if you enter an integer and hit enter, you should see the following (expected) exception:

    org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int"
    Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146)
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
           at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49)
           at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155)
           at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94)
           at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
    

    When the producer tried to send a message, it checked the schema with the Schema Registry, which returned an error indicating the schema was invalid because it does not preserve backwards compatibility (the default Schema Registry setting). The console producer simply reports this error and exits, but your own applications could handle the problem more gracefully. Most importantly, we’ve guaranteed no incompatible data was published to Kafka.

  6. When you’re done testing, you can use confluent stop to shutdown each service in the right order. To completely delete any data produced during this test and start on a clean slate next time, you may run confluent destroy instead. This command will delete all the services’ data, which are otherwise persisted across restarts.

This simple guide only covered the Kafka and the Schema registry to get you started with the core services. See the documentation for each component for a quick start guide specific to that component:

[1]Run confluent help for a complete list of the commands available with the Confluent CLI.