MQTT Connector
The MQTT connector provides Akka Stream sources to connect to MQTT servers.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "0.9"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-mqtt_2.12</artifactId> <version>0.9</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-mqtt_2.12", version: "0.9" }
Usage
First we need to define various settings, that are required when connecting to an MQTT server.
- Scala
-
val connectionSettings = MqttConnectionSettings( "tcp://localhost:1883", "test-scala-client", new MemoryPersistence )
- Java
-
final MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", "test-java-client", new MemoryPersistence() );
Here we used MqttConnectionSettings factory to set the address of the server, client ID, which needs to be unique for every client, and client persistence implementation (MemoryPersistence) which allows to control reliability guarantees.
Then let’s create a source that is going to connect to the MQTT server upon materialization and receive messages that are sent to the subscribed topics.
- Scala
-
val settings = MqttSourceSettings( connectionSettings.withClientId("source-spec/source"), Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce) ) val mqttSource = MqttSource(settings, bufferSize = 8)
- Java
-
final MqttSourceSettings settings = MqttSourceSettings.create( connectionSettings.withClientId("source-test/source") ).withSubscriptions( Pair.create("source-test/topic1", MqttQoS.atMostOnce()), Pair.create("source-test/topic2", MqttQoS.atMostOnce()) ); final Integer bufferSize = 8; final Source<MqttMessage, CompletionStage<Done>> mqttSource = MqttSource.create(settings, bufferSize);
And finally run the source.
- Scala
-
val (subscriptionFuture, result) = mqttSource .map(m => s"${m.topic}_${m.payload.utf8String}") .take(messageCount * 2) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
final Pair<CompletionStage<Done>, CompletionStage<List<String>>> result = mqttSource .map(m -> m.topic() + "-" + m.payload().utf8String()) .take(messageCount * 2) .toMat(Sink.seq(), Keep.both()) .run(materializer);
This source has a materialized value (Future in Scala API and CompletionStage in Java API) which is completed when the subscription to the MQTT broker has been completed.
To publish messages to the MQTT server create a sink and run it.
- Scala
-
Source(messages).runWith(MqttSink(connectionSettings.withClientId("source-spec/sink"), MqttQoS.AtLeastOnce))
- Java
-
Source.from(messages).runWith(MqttSink.create( connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce()), materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > mqtt/testOnly *.MqttSourceSpec
- Java
-
sbt > mqtt/testOnly *.MqttSourceTest