JMS Connector

The JMS connector provides Akka Stream sources and sinks to connect to JMS servers.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.9"
libraryDependencies += "javax.jms" % "jms" % "1.1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-jms_2.12</artifactId>
  <version>0.9</version>
</dependency>
<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>jms</artifactId>
  <version>1.1</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.9"
  compile group: 'javax.jms', name: 'jms', version: '1.1'
}

Usage

Sending messages to JMS server

First define a jms ConnectionFactory depending on the implementation you’re using. Here we’re using Active MQ.

Scala
val connectionFactory = new ActiveMQConnectionFactory(url)
Java
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

Create a sink, that accepts and forwards Strings to the JMS server.

Scala
val jmsSink: Sink[String, NotUsed] = JmsSink(
  JmsSinkSettings(connectionFactory).withQueue("test")
)
Java
Sink<String, NotUsed> jmsSink = JmsSink.create(
        JmsSinkSettings
                .create(connectionFactory)
                .withQueue("test")
);

JmsSink is a collection of factory methods that facilitates creation of sinks.

Last step is to materialize and run the sink we have created.

Scala
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Source(in).runWith(jmsSink)
Java
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

Receiving messages from JMS server

Create a source using the same queue declaration as before.

Scala
val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
)
Java
Source<String, NotUsed> jmsSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withQueue("test")
                .withBufferSize(10)
        );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(in.size).runWith(Sink.seq)
Java
CompletionStage<List<String>> result = jmsSource
        .take(in.size())
        .runWith(Sink.seq(), materializer);

This is how you send and receive message from JMS server using this connector.

Using Topic with an JMS server

You can use JMS topic in q very similar way.

For the Sink :

Scala
val jmsTopicSink: Sink[String, NotUsed] = JmsSink(
  JmsSinkSettings(connectionFactory).withTopic("topic")
)
Java
Sink<String, NotUsed> jmsTopicSink = JmsSink.create(
        JmsSinkSettings
                .create(connectionFactory)
                .withTopic("topic")
);

For the source :

Scala
val jmsTopicSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withTopic("topic")
)
Java
Source<String, NotUsed> jmsTopicSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withTopic("topic")
                .withBufferSize(10)
        );

Such sink and source can be started the same way as in the previous example.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> jms/testOnly *.JmsConnectorsSpec
Java
sbt
> jms/testOnly *.JmsConnectorsTest
The source code for this page can be found here.