Cassandra Connector
The Cassandra connector provides a way to provide the result of a Cassandra query as a stream of rows.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.9"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId> <version>0.9</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-cassandra_2.12", version: "0.9" }
Usage
Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, lets initialize a Cassandra session.
- Scala
-
implicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).build.connect()
- Java
-
final Session session = Cluster.builder() .addContactPoint("127.0.0.1").withPort(9042) .build().connect();
We will also need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system);
This is all preparation that we are going to need.
Source Usage
Let’s create a Cassandra statement with a query that we want to execute.
- Scala
-
val stmt = new SimpleStatement("SELECT * FROM akka_stream_scala_test.test").setFetchSize(20)
- Java
-
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);
And finally create the source using any method from the CassandraSource factory and run it.
- Scala
-
val rows = CassandraSource(stmt).runWith(Sink.seq)
- Java
-
final CompletionStage<List<Row>> rows = CassandraSource.create(stmt, session) .runWith(Sink.seq(), materializer);
Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Sink Usage
Let’s create a Cassandra Prepared statement with a query that we want to execute.
- Scala
-
val preparedStatement = session.prepare("INSERT INTO akka_stream_scala_test.test(id) VALUES (?)")
- Java
-
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");
Now lets we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.
- Scala
-
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
- Java
-
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> { return statement.bind(myInteger); };
Finally we run the sink from any source.
- Scala
-
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder) val result = source.runWith(sink)
- Java
-
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session, system.dispatcher()); CompletionStage<Done> result = source.runWith(sink, materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Test code requires Cassandra server running in the background. You can start one quickly using docker:
docker run --rm -p 9042:9042 cassandra:3
- Scala
-
sbt > cassandra/testOnly *.CassandraSourceSpec
- Java
-
sbt > cassandra/testOnly *.CassandraSourceTest