JMS Connector
The JMS connector provides Akka Stream sources and sinks to connect to JMS servers.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.9" libraryDependencies += "javax.jms" % "jms" % "1.1"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-jms_2.12</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.9" compile group: 'javax.jms', name: 'jms', version: '1.1' }
Usage
Sending messages to JMS server
First define a jms ConnectionFactory
depending on the implementation you’re using. Here we’re using Active MQ.
- Scala
-
val connectionFactory = new ActiveMQConnectionFactory(url)
- Java
-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);
Create a sink, that accepts and forwards Strings to the JMS server.
- Scala
-
val jmsSink: Sink[String, NotUsed] = JmsSink( JmsSinkSettings(connectionFactory).withQueue("test") )
- Java
-
Sink<String, NotUsed> jmsSink = JmsSink.create( JmsSinkSettings .create(connectionFactory) .withQueue("test") );
JmsSink is a collection of factory methods that facilitates creation of sinks.
Last step is to materialize and run the sink we have created.
- Scala
-
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") Source(in).runWith(jmsSink)
- Java
-
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); Source.from(in).runWith(jmsSink, materializer);
Receiving messages from JMS server
Create a source using the same queue declaration as before.
- Scala
-
val jmsSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") )
- Java
-
Source<String, NotUsed> jmsSource = JmsSource .textSource(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withBufferSize(10) );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = jmsSource.take(in.size).runWith(Sink.seq)
- Java
-
CompletionStage<List<String>> result = jmsSource .take(in.size()) .runWith(Sink.seq(), materializer);
This is how you send and receive message from JMS server using this connector.
Using Topic with an JMS server
You can use JMS topic in q very similar way.
For the Sink :
- Scala
-
val jmsTopicSink: Sink[String, NotUsed] = JmsSink( JmsSinkSettings(connectionFactory).withTopic("topic") )
- Java
-
Sink<String, NotUsed> jmsTopicSink = JmsSink.create( JmsSinkSettings .create(connectionFactory) .withTopic("topic") );
For the source :
- Scala
-
val jmsTopicSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withTopic("topic") )
- Java
-
Source<String, NotUsed> jmsTopicSource = JmsSource .textSource(JmsSourceSettings .create(connectionFactory) .withTopic("topic") .withBufferSize(10) );
Such sink and source can be started the same way as in the previous example.
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > jms/testOnly *.JmsConnectorsSpec
- Java
-
sbt > jms/testOnly *.JmsConnectorsTest