JMS Connector
The JMS connector provides Akka Stream sources and sinks to connect to JMS providers.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.15" libraryDependencies += "javax.jms" % "jms" % "1.1"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-jms_2.12</artifactId> <version>0.15</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.15" compile group: 'javax.jms', name: 'jms', version: '1.1' }
Usage
The JMS message model supports several types of message body (see javax.jms.Message) and Alpakka currently supports messages with a body containing a String, a Serializable object, a Map or a byte array.
First define a jms javax.jms.ConnectionFactory depending on the implementation you’re using. Here we’re using Active MQ.
- Scala
-
val connectionFactory = new ActiveMQConnectionFactory(url) val connectionFactory = new ActiveMQConnectionFactory(url) // This is done here to send arbitrary objects. Otherwise activemq would forbid it. // See therefore http://activemq.apache.org/objectmessage.html connectionFactory.setTrustAllPackages(true) val connectionFactory = new ActiveMQConnectionFactory(url) val connectionFactory = new ActiveMQConnectionFactory(url)
- Java
-
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url); // This is done here to send arbitrary objects. Otherwise activemq would forbid it. // See therefore http://activemq.apache.org/objectmessage.html connectionFactory.setTrustAllPackages(true); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);
The created javax.jms.ConnectionFactory is then used for the creation of the different jms sinks or sources (see below).
Sending messages to a JMS provider
Use a case class with the subtype of JmsMessage to wrap the messages you want to send and optionally set their properties. JmsSinkJmsSink contains factory methods to facilitate the creation of sinks according to the message type (see below for an example).
Sending text messages with properties to a JMS provider
Create a sink, that accepts and forwards JmsTextMessages to the JMS provider:
- Scala
-
val jmsSink: Sink[String, NotUsed] = JmsSink.textSink( JmsSinkSettings(connectionFactory).withQueue("test") )
- Java
-
Sink<String, NotUsed> jmsSink = JmsSink.textSink( JmsSinkSettings .create(connectionFactory) .withQueue("test") );
Last step is to materialize and run the sink(s) we have created.
- Scala
-
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") Source(in).runWith(jmsSink)
- Java
-
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); Source.from(in).runWith(jmsSink, materializer);
Sending byte messages to a JMS provider
Create a sink, that accepts and forwards JmsByteMessages to the JMS provider.
- Scala
-
val jmsSink: Sink[Array[Byte], NotUsed] = JmsSink.bytesSink( JmsSinkSettings(connectionFactory).withQueue("test") )
- Java
-
Sink<byte[], NotUsed> jmsSink = JmsSink.bytesSink( JmsSinkSettings .create(connectionFactory) .withQueue("test") );
Last step is to materialize and run the sink(s) we have created.
- Scala
-
val in = "ThisIsATest".getBytes(Charset.forName("UTF-8")) Source.single(in).runWith(jmsSink)
- Java
-
byte[] in = "ThisIsATest".getBytes(Charset.forName("UTF-8")); Source.single(in).runWith(jmsSink, materializer);
Sending map messages to a JMS provider
Create a sink, that accepts and forwards JmsMapMessages to the JMS provider:
- Scala
-
val jmsSink: Sink[Map[String, Any], NotUsed] = JmsSink.mapSink( JmsSinkSettings(connectionFactory).withQueue("test") )
- Java
-
Sink<Map<String, Object>, NotUsed> jmsSink = JmsSink.mapSink( JmsSinkSettings .create(connectionFactory) .withQueue("test") );
Last step is to materialize and run the sink(s) we have created.
- Scala
-
val in = List( Map[String, Any]( "string" -> "value", "int value" -> 42, "double value" -> 43.toDouble, "short value" -> 7.toShort, "boolean value" -> true, "long value" -> 7.toLong, "bytearray" -> "AStringAsByteArray".getBytes(Charset.forName("UTF-8")), "byte" -> 1.toByte ) ) Source(in).runWith(jmsSink)
- Java
-
Map<String, Object> in = new HashMap<>(); in.put("string value", "value"); in.put("int value", 42); in.put("double value", 43.0); in.put("short value", (short) 7); in.put("boolean value", true); in.put("long value", 7L); in.put("bytearray", "AStringAsByteArray".getBytes(Charset.forName("UTF-8"))); in.put("byte", (byte) 1); Source.single(in).runWith(jmsSink, materializer);
Sending object messages to a JMS provider
Create a sink, that accepts and forwards JmsObjectMessages to the JMS provider:
- Scala
-
val jmsSink: Sink[Serializable, NotUsed] = JmsSink.objectSink( JmsSinkSettings(connectionFactory).withQueue("test") )
- Java
-
Sink<java.io.Serializable, NotUsed> jmsSink = JmsSink.objectSink( JmsSinkSettings .create(connectionFactory) .withQueue("test") );
Last step is to materialize and run the sink(s) we have created.
- Scala
-
val in = DummyObject("ThisIsATest") Source.single(in).runWith(jmsSink) - Java
-
java.io.Serializable in = new DummyJavaTests("javaTest"); Source.single(in).runWith(jmsSink, materializer);
Sending messages with properties to a JMS provider
For every JmsMessage you can set jms properties.
- Scala
-
val msgsIn = (1 to 10).toList.map { n => JmsTextMessage(n.toString) .withProperty("Number", n) .withProperty("IsOdd", n % 2 == 1) .withProperty("IsEven", n % 2 == 0) }
- Java
-
private List<JmsTextMessage> createTestMessageList() { List<Integer> intsIn = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<JmsTextMessage> msgsIn = new ArrayList<>(); for (Integer n : intsIn) { Map<String, Object> properties = new HashMap<>(); properties.put("Number", n); properties.put("IsOdd", n % 2 == 1); properties.put("IsEven", n % 2 == 0); msgsIn.add(JmsTextMessage.create(n.toString(), properties)); } return msgsIn; } List<JmsTextMessage> msgsIn = createTestMessageList(); List<JmsTextMessage> msgsIn = createTestMessageList().stream() .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsType.create("type"))) .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsCorrelationId.create("correlationId"))) .map(jmsTextMessage -> jmsTextMessage.withHeader(JmsReplyTo.queue("test-reply"))) .collect(Collectors.toList());
Receiving messages from a JMS provider
JmsSourceJmsSource contains factory methods to facilitate the creation of sinks according to the message type (see below for an example).
Receiving String messages from a JMS provider
Create a source:
- Scala
-
val jmsSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") )
- Java
-
Source<String, NotUsed> jmsSource = JmsSource .textSource(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withBufferSize(10) );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = jmsSource.take(in.size).runWith(Sink.seq)
- Java
-
CompletionStage<List<String>> result = jmsSource .take(in.size()) .runWith(Sink.seq(), materializer);
Receiving byte array messages from a JMS provider
Create a source:
- Scala
-
val jmsSource: Source[Array[Byte], NotUsed] = JmsSource.bytesSource( JmsSourceSettings(connectionFactory).withQueue("test") )
- Java
-
Source<byte[], NotUsed> jmsSource = JmsSource .bytesSource(JmsSourceSettings .create(connectionFactory) .withQueue("test") );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = jmsSource.take(1).runWith(Sink.head)
- Java
-
CompletionStage<byte[]> result = jmsSource .take(1) .runWith(Sink.head(), materializer);
Receiving Serializable object messages from a JMS provider
Create a source:
- Scala
-
val jmsSource: Source[java.io.Serializable, NotUsed] = JmsSource.objectSource( JmsSourceSettings(connectionFactory).withQueue("test") )
- Java
-
Source<java.io.Serializable, NotUsed> jmsSource = JmsSource .objectSource(JmsSourceSettings .create(connectionFactory) .withQueue("test") );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = jmsSource.take(1).runWith(Sink.head)
- Java
-
CompletionStage<java.io.Serializable> result = jmsSource .take(1) .runWith(Sink.head(), materializer);
Receiving Map messages from a JMS provider
Create a source:
- Scala
-
val jmsSource: Source[Map[String, Any], NotUsed] = JmsSource.mapSource( JmsSourceSettings(connectionFactory).withQueue("test") )
- Java
-
Source<Map<String, Object>, NotUsed> jmsSource = JmsSource .mapSource(JmsSourceSettings .create(connectionFactory) .withQueue("test") );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = jmsSource.take(1).runWith(Sink.seq)
- Java
-
CompletionStage<Map<String, Object>> resultStage = jmsSource .take(1) .runWith(Sink.head(), materializer);
Receiving javax.jms.Messages from a JMS provider
Create a source:
- Scala
-
val jmsSource: Source[Message, NotUsed] = JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers") )
- Java
-
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withBufferSize(10) ); Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withBufferSize(10) );
The bufferSize
parameter controls the maximum number of messages to prefetch before applying backpressure.
Run the source and specify the amount of messages to take:
- Scala
-
val result: Future[Seq[Message]] = jmsSource.take(msgsIn.size).runWith(Sink.seq)
- Java
-
CompletionStage<List<Message>> result = jmsSource .take(msgsIn.size()) .runWith(Sink.seq(), materializer); CompletionStage<List<Message>> result = jmsSource .take(msgsIn.size()) .runWith(Sink.seq(), materializer);
Receiving javax.jms.Messages messages from a JMS provider with Client Acknowledgement
Create a javax.jms.Message source:
- Scala
-
val jmsSource: Source[Message, NotUsed] = JmsSource( JmsSourceSettings(connectionFactory) .withQueue("numbers") .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge) )
- Java
-
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge()) );
The acknowledgeMode
(AcknowledgeMode) parameter controls the JMS acknowledge mode parameter, see javax.jms.Connection.createSession.
Run the source and take the same amount of messages as we previously sent to it acknowledging them.
- Scala
-
val result = jmsSource .take(msgsIn.size) .map { case textMessage: TextMessage => val text = textMessage.getText textMessage.acknowledge() text } .runWith(Sink.seq)
- Java
-
CompletionStage<List<String>> result = jmsSource .take(msgsIn.size()) .map(message -> { String text = ((ActiveMQTextMessage)message).getText(); message.acknowledge(); return text; }) .runWith(Sink.seq(), materializer);
Receiving javax.jms.Messages from a JMS provider with a selector
Create a javax.jms.Message source specifying a JMS selector expression:
- Scala
-
val jmsSource = JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers").withSelector("IsOdd = TRUE") )
- Java
-
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings .create(connectionFactory) .withQueue("test") .withBufferSize(10) .withSelector("IsOdd = TRUE") );
Verify that we are only receiving messages according to the selector:
- Scala
-
val oddMsgsIn = msgsIn.filter(msg => msg.body.toInt % 2 == 1) val result = jmsSource.take(oddMsgsIn.size).runWith(Sink.seq) // We should have only received the odd numbers in the list result.futureValue.zip(oddMsgsIn).foreach { case (out, in) => out.getIntProperty("Number") shouldEqual in.properties("Number") out.getBooleanProperty("IsOdd") shouldEqual in.properties("IsOdd") out.getBooleanProperty("IsEven") shouldEqual in.properties("IsEven") // Make sure we are only receiving odd numbers out.getIntProperty("Number") % 2 shouldEqual 1 }
- Java
-
List<JmsTextMessage> oddMsgsIn = msgsIn.stream() .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1) .collect(Collectors.toList()); assertEquals(5, oddMsgsIn.size()); CompletionStage<List<Message>> result = jmsSource .take(oddMsgsIn.size()) .runWith(Sink.seq(), materializer); List<Message> outMessages = result.toCompletableFuture().get(4, TimeUnit.SECONDS); int msgIdx = 0; for (Message outMsg : outMessages) { assertEquals(outMsg.getIntProperty("Number"), oddMsgsIn.get(msgIdx).properties().get("Number").get()); assertEquals(outMsg.getBooleanProperty("IsOdd"), oddMsgsIn.get(msgIdx).properties().get("IsOdd").get()); assertEquals(outMsg.getBooleanProperty("IsEven"), (oddMsgsIn.get(msgIdx).properties().get("IsEven").get())); assertEquals(1, outMsg.getIntProperty("Number") % 2); msgIdx++; }
Using Topic with an JMS provider
You can use JMS topic in a very similar way.
For the Sink :
- Scala
-
val jmsTopicSink: Sink[String, NotUsed] = JmsSink.textSink( JmsSinkSettings(connectionFactory).withTopic("topic") )
- Java
-
Sink<String, NotUsed> jmsTopicSink = JmsSink.textSink( JmsSinkSettings .create(connectionFactory) .withTopic("topic") );
For the source :
- Scala
-
val jmsTopicSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withTopic("topic") )
- Java
-
Source<String, NotUsed> jmsTopicSource = JmsSource .textSource(JmsSourceSettings .create(connectionFactory) .withTopic("topic") .withBufferSize(10) );
Such sink and source can be started the same way as in the previous example.
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > jms/testOnly *.JmsConnectorsSpec
- Java
-
sbt > jms/testOnly *.JmsConnectorsTest
Using IBM MQ
You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory
or a TopicConnectionFactory
and creating a JmsSourceSettings
or JmsSinkSettings
from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:
docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9
MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults
Create a JmsSource to an IBM MQ Queue
The MQQueueConnectionFactory
needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1
queue manager and a DEV.APP.SVRCONN
channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT
.
- Scala
-
import com.ibm.mq.jms.MQQueueConnectionFactory import com.ibm.msg.client.wmq.common.CommonConstants val QueueManagerName = "QM1" val TestChannelName = "DEV.APP.SVRCONN" // Create the IBM MQ QueueConnectionFactory val queueConnectionFactory = new MQQueueConnectionFactory() queueConnectionFactory.setQueueManager(QueueManagerName) queueConnectionFactory.setChannel(TestChannelName) // Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) val TestQueueName = "DEV.QUEUE.1" val jmsSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(queueConnectionFactory).withQueue(TestQueueName) )
- Java
-
import com.ibm.mq.jms.MQQueueConnectionFactory; import com.ibm.msg.client.wmq.common.CommonConstants; String queueManagerName = "QM1"; String testChannelName = "DEV.APP.SVRCONN"; // Create the IBM MQ QueueConnectionFactory MQQueueConnectionFactory queueConnectionFactory = new MQQueueConnectionFactory(); queueConnectionFactory.setQueueManager(queueManagerName); queueConnectionFactory.setChannel(testChannelName); // Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT); String testQueueName = "DEV.QUEUE.1"; Source<String, NotUsed> jmsSource = JmsSource.textSource( JmsSourceSettings .create(queueConnectionFactory) .withQueue(testQueueName) );
Create a JmsSink to an IBM MQ Topic
The IBM MQ docker container sets up a dev/
topic, which is used in the example below.
- Scala
-
import com.ibm.mq.jms.MQTopicConnectionFactory import com.ibm.msg.client.wmq.common.CommonConstants val QueueManagerName = "QM1" val TestChannelName = "DEV.APP.SVRCONN" // Create the IBM MQ TopicConnectionFactory val topicConnectionFactory = new MQTopicConnectionFactory() topicConnectionFactory.setQueueManager(QueueManagerName) topicConnectionFactory.setChannel(TestChannelName) // Connect to IBM MQ over TCP/IP topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) val TestTopicName = "dev/" val jmsTopicSink: Sink[String, NotUsed] = JmsSink( JmsSinkSettings(topicConnectionFactory).withTopic(TestTopicName) )
- Java
-
import com.ibm.mq.jms.MQTopicConnectionFactory; import com.ibm.msg.client.wmq.common.CommonConstants; String queueManagerName = "QM1"; String testChannelName = "DEV.APP.SVRCONN"; // Create the IBM MQ TopicConnectionFactory val topicConnectionFactory = new MQTopicConnectionFactory(); topicConnectionFactory.setQueueManager(queueManagerName); topicConnectionFactory.setChannel(testChannelName); // Connect to IBM MQ over TCP/IP topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT); String testTopicName = "dev/"; Sink<String, NotUsed> jmsTopicSink = JmsSink.textSink( JmsSinkSettings .create(topicConnectionFactory) .withTopic(testTopicName) );