MongoDB Connector
The MongoDB connector allows you to read and save documents. You can query as a stream of documents from MongoSource or update documents in a collection with MongoSink.
This connector is based off the mongo-scala-driver and does not have a java interface.
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.15"- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-mongodb_2.12</artifactId> <version>0.15</version> </dependency>- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mongodb_2.12', version: '0.15' }
Usage
Sources provided by this connector need a prepared session to communicate with MongoDB server. First, lets initialize a MongoDB connection.
- Scala
-
private val client = MongoClient(s"mongodb://localhost:27017") private val db = client.getDatabase("alpakka-mongo") private val numbersColl = db.getCollection("numbers")
We will also need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
This is all preparation that we are going to need.
Source Usage
Let’s create a source from a MongoDB collection observable, which can optionally take a filter.
- Scala
-
val source: Source[Document, NotUsed] = MongoSource(numbersColl.find())
And finally we can run it.
- Scala
-
val rows: Future[Seq[Document]] = source.runWith(Sink.seq)
Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Flow and Sink Usage
Each of these sink factory methods have a corresponding factory in insertOne which will emit the written document or result of the operation downstream.
Insert
We can use a Source of documents to save them to a mongo collection using insertOne or insertMany.
- Scala
-
val source: Source[Document, NotUsed] = ??? source.runWith(MongoSink.insertOne(parallelism = 2, collection = numbersColl))
Insert Many
Insert many can be used if you have a collection of documents to insert at once.
- Scala
-
val source: Source[Seq[Document], NotUsed] = ??? source.runWith(MongoSink.insertMany(parallelism = 2, collection = numbersColl))
Update
We can update documents with a Source of DocumentUpdate which is a filter and a update definition. Use either updateOne or updateMany if the filter should target one or many documents.
- Scala
-
import org.mongodb.scala.model.{Filters, Updates} val source: Source[DocumentUpdate, NotUsed] = Source .single(DocumentUpdate(filter = Filters.eq("id", 1), update = Updates.set("updateValue", 0))) source.runWith(MongoSink.updateOne(2, numbersColl))
Delete
We can delete documents with a Source of filters. Use either deleteOne or deleteMany if the filter should target one or many documents.
- Scala
-
val source: Source[Bson, NotUsed] = Source.single(Filters.eq("id", 1)) source.runWith(MongoSink.deleteOne(2, numbersColl))
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Test code requires MongoDB server running in the background. You can start one quickly using docker:
docker run --rm mongo
- Scala
-
sbt > mongodb/test