Cassandra Connector

The Cassandra connector provides a way to provide the result of a Cassandra query as a stream of rows.

Artifacts

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-alpakka-cassandra" % "0.1-RC1"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId>
  <version>0.1-RC1</version>
</dependency>
Gradle
dependencies {
  compile group: "com.typesafe.akka", name: "akka-stream-alpakka-cassandra_2.12", version: "0.1-RC1"
}

Usage

Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, lets initialize a Cassandra session.

Scala
implicit val session = Cluster.builder
  .addContactPoint("127.0.0.1").withPort(9042)
  .build.connect()
Java
final Session session = Cluster.builder()
  .addContactPoint("127.0.0.1").withPort(9042)
  .build().connect();

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

Let’s create a Cassandra statement with a query that we want to execute.

Scala
val stmt = new SimpleStatement("SELECT * FROM akka_stream_scala_test.test").setFetchSize(20)
Java
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);

And finally create the source using any method from the CassandraSource factory and run it.

Scala
val rows = CassandraSource(stmt)
  .runWith(Sink.seq)
Java
final CompletionStage<List<Row>> rows = CassandraSource.create(stmt, session)
  .runWith(Sink.seq(), materializer);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires Cassandra server running in the background. You can start one quickly using docker:

docker run --rm -p 9042:9042 cassandra:3

Scala
sbt
> cassandra/testOnly *.CassandraSourceSpec
Java
sbt
> cassandra/testOnly *.CassandraSourceTest