Kafka Streams Examples

In this tutorial we will run Confluent’s Kafka Music demo application for the Kafka Streams API.

The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as Top 5 songs per music genre. It exposes its latest processing results – the latest charts – via Kafka’s Interactive Queries feature and a REST API. The application’s input data is in Avro format and comes from two sources: a stream of play events (think: “song X was played”) and a stream of song metadata (“song X was written by artist Y”); see inspecting the input data in the Appendix for how the input data looks like.

More specifically, we will run the following services:

Requirements

This tutorial uses Docker Compose.

  • You must install Docker and Docker Compose.
  • The Confluent Docker images require Docker version 1.11 or greater.

Running the Kafka Music demo application

If you want to see an appetizer of what we will do in this section, take a look at the following screencast:

Screencast: Running Confluent's Kafka Music demo application (3 mins)

Ready now? Let’s start!

Clone the Confluent Docker Images repository:

# Clone the git repository
$ git clone https://github.com/confluentinc/cp-docker-images

# Change into the directory for this tutorial
$ cd cp-docker-images/examples/kafka-streams-examples

# Switch to the `v3.2.1` branch
$ git checkout v3.2.1

Now we can launch the Kafka Music demo application including the services it depends on such as Kafka.

$ docker-compose up -d

After a few seconds the application and the services are up and running. One of the started containers is continuously generating input data for the application by writing into its input topics. This allows us to look at live, real-time data when playing around with the Kafka Music application.

Now we can use our web browser or a CLI tool such as curl to interactively query the latest processing results of the Kafka Music application by accessing its REST API.

REST API example 1: list all running application instances of the Kafka Music application:

$ curl -sXGET http://localhost:7070/kafka-music/instances

# You should see output similar to following, though here
# the output is pretty-printed so that it's easier to read:
[
  {
    "host": "localhost",
    "port": 7070,
    "storeNames": [
      "all-songs",
      "song-play-count",
      "top-five-songs",
      "top-five-songs-by-genre"
    ]
  }
]

REST API example 2: get the latest Top 5 songs across all music genres:

$ curl -sXGET http://localhost:7070/kafka-music/charts/top-five

# You should see output similar to following, though here
# the output is pretty-printed so that it's easier to read:
[
  {
    "artist": "Jello Biafra And The Guantanamo School Of Medicine",
    "album": "The Audacity Of Hype",
    "name": "Three Strikes",
    "plays": 70
  },
  {
    "artist": "Hilltop Hoods",
    "album": "The Calling",
    "name": "The Calling",
    "plays": 67
  },

  ... rest omitted...
]

The REST API exposed by the Kafka Music application supports further operations. See the top-level instructions in its source code for details.

Once you’re done playing around you can stop all the services and containers with:

$ docker-compose down

We hope you enjoyed this tutorial!

Running further Confluent demo applications for the Kafka Streams API

The container named kafka-music-application, which runs the Kafka Music demo application, actually contains all of Confluent’s Kafka Streams demo applications. The demo applications are packaged in the fat jar at /app/streams-examples-3.2.1-standalone.jar inside this container. This means you can easily run any of these applications from inside the container via a command similar to:

# Example: Launch the WordCount demo application (inside the `kafka-music-application` container)
$ docker-compose exec kafka-music-application \
        java -cp /app/streams-examples-3.2.1-standalone.jar \
        io.confluent.examples.streams.WordCountLambdaExample \
        kafka:29092

Of course you can also modify the tutorial’s docker-compose.yml for repeatable deployments.

Note that you must follow the full instructions of each demo application (see its respective source code at https://github.com/confluentinc/examples). These instructions include, for example, the creation of the application’s input and output topics. Also, each demo application supports CLI arguments. Typically, the first CLI argument is the bootstrap.servers parameter and the second argument, if any, is the schema.registry.url setting.

Available endpoints from within the containers as well as on your host machine:

Endpoint Parameter Value (from within containers) Value (from your host machine)
Kafka Cluster bootstrap.servers kafka:29092 localhost:9092
Confluent Schema Registry schema.registry.url http://schema-registry:8081 http://localhost:8081
ZooKeeper ensemble zookeeper.connect zookeeper:32181 localhost:32181

The ZooKeeper endpoint is not required by Kafka Streams applications, but you need it to e.g. manually create new Kafka topics or to list available Kafka topics.

Appendix

Inspecting the input topics of the Kafka Music application

Inspect the “play-events” input topic, which contains messages in Avro format:

# Use the kafka-avro-console-consumer to read the "play-events" topic
$ docker-compose exec schema-registry \
    kafka-avro-console-consumer \
        --bootstrap-server kafka:29092 \
        --topic play-events --from-beginning

# You should see output similar to:
{"song_id":11,"duration":60000}
{"song_id":10,"duration":60000}
{"song_id":12,"duration":60000}
{"song_id":2,"duration":60000}
{"song_id":1,"duration":60000}

Inspect the “song-feed” input topic, which contains messages in Avro format:

# Use the kafka-avro-console-consumer to read the "song-feed" topic
$ docker-compose exec schema-registry \
    kafka-avro-console-consumer \
        --bootstrap-server kafka:29092 \
        --topic song-feed --from-beginning

# You should see output similar to:
{"id":1,"album":"Fresh Fruit For Rotting Vegetables","artist":"Dead Kennedys","name":"Chemical Warfare","genre":"Punk"}
{"id":2,"album":"We Are the League","artist":"Anti-Nowhere League","name":"Animal","genre":"Punk"}
{"id":3,"album":"Live In A Dive","artist":"Subhumans","name":"All Gone Dead","genre":"Punk"}
{"id":4,"album":"PSI","artist":"Wheres The Pope?","name":"Fear Of God","genre":"Punk"}

Creating new topics

You can create topics manually with the kafka-topics CLI tool, which is available on the kafka container.

 # Create a new topic named "my-new-topic", using the `kafka` container
 $ docker-compose exec kafka kafka-topics \
     --zookeeper zookeeper:32181 \
     --create --topic my-new-topic --partitions 2 --replication-factor 1

# You should see a line similar to:
Created topic "my-new-topic".

Listing available topics

You can list all available topics with the kafka-topics CLI tool, which is available on the kafka container.

# List available topics, using the `kafka` container
$ docker-compose exec kafka kafka-topics \
    --zookeeper zookeeper:32181 \
    --list

Additional topic information is displayed by running --describe instead of -list.