Connecting to a JMS Message Broker
This tutorial uses an example module to provide a fulcrum
configured to send and receive JMS messages using ActiveMQ 4.1.1
http://activemq.apache.org/
.
It is designed as an example of how to configure
NetKernel for a specific JMS implementation.
Please follow the instruction below to install and deploy it to the front-end fulcrum.
Install
Prior to installing the NetKernel module you will need to have downloaded and installed ActiveMQ 4.1.1 or later and have
a message broker started on localhost using its default settings. For more information visit:
http://activemq.apache.org/
To install the NetKernel ActiveMQ messaging module, use the module management wizard
.
- Select "Install new applications and library modules".
- Select "Install new modules from a 1060 Research distribution server".
- Select a mirror site.
- Select "ActiveMQ Test Fulcrum".
- When prompted at step 5
select "NetKernel Frontend Fulcrum". Follow the instructions to complete the download and install.
ClassPath
The jms transport requires standard javax.jms.* classes. Copy
all of your JMS broker's jar libraries to the NetKernel [install]/lib/ext/
directory so that they will be added to the NetKernel classpath and made
available to all modules. Restart NetKernel to reconfigure the classpath.
Testing
When you have installed ActiveMQ and have a message broker started on your local machine and have installed
the NetKernel module, the test message can be initiated with a web-request to
http://localhost:8080/activemq/send
.
The module contains a NetKernel JMS transport which is
configured to listen to the topic and will log the message and issue a response.
To follow the discussion we recommend that you unzip the tpt-jms-activemq.jar file to examine the configuration
and source code.
Discussion
The NetKernel JMS example module is a fulcrum containing a NetKernel JMS transport configured to work with
the ActiveMQ message broker. The transport is configured with ffcpl:/etc/JMSConfig.xml as follows...
<JMSProducer>
<!---->
<jndiContext>
<java.naming.factory.initial>org.apache.activemq.jndi.ActiveMQInitialContextFactory</java.naming.factory.initial>
<brokerURL>tcp://localhost:61616</brokerURL>
<queue.MyQueue>example.MyQueue</queue.MyQueue>
<topic.MyTopic>example.MyTopic</topic.MyTopic>
</jndiContext>
<!---->
<queueConnectionFactory>QueueConnectionFactory</queueConnectionFactory>
<!---->
<topicConnectionFactory>TopicConnectionFactory</topicConnectionFactory>
<!---->
<!---->
<queue>
<!---->
<name>MyQueue</name>
<!---->
<messageType>BytesMessage</messageType>
<!---->
<ackOnException>false</ackOnException>
</queue>
<!---->
<topic>
<!---->
<name>MyTopic</name>
<messageType>BytesMessage</messageType>
<ackOnException>false</ackOnException>
<!---->
<!---->
</topic>
</JMSProducer>
The jndiContext section provides the information about the ContextFactory to use to connect. The brokerURL provides the host and port of the
ActiveMQ message broker - in this case localhost and the default ActiveMQ port. The queue.XXXXX and topic.YYYYY entries describe logical
JNDI queues and topics. The part after the . (dot) being the logical JNDI name of the queue or topic, the text of the entry is the name that
will be used to identify messages for that queue or topic.
Each logical queue or topic has a section describing the implementation for that logical name. The messageType and exception acknowledgement are configured here.
Deconstructing the Test Case
When the test message is intiaited by making a web-request for
http://localhost:8080/activemq/send
a set of interactions
are triggered between NetKernel and the JMS broker. Here we'll follow the chain of events.
First examine the module.xml of the module. You will see that it exports a single address path ffcpl:/activemq/.* and
has the following rewrite rule as the first entry in the mapping section.
<mapping>
<rewrite>
<match>ffcpl:(/activemq/.*)</match>
<to>active:mapper+operator@ffcpl:/resources/links.xml+operand@ffcpl:$1</to>
</rewrite>
...
</mapping>
The rule maps any request matching ffcpl:/activemq/ to the XRL mapper. The module is using the active:mapper
as an indirection
layer - the mapper resolves the operand URI using the links provided as the operator resource. The links used by the mapper are
shown below....
<links basepath="ffcpl:/activemq/">
<link>
<ext>/send</ext>
<int>active:beanshell+operator@ffcpl:/resources/send.bs</int>
</link>
<link>
<ext>/topic/example.MyTopic</ext>
<int>active:beanshell+operator@ffcpl:/resources/receive.bs</int>
<args>body,header,properties</args>
</link>
<link>
<ext>/queue/example.MyQueue</ext>
<int>active:beanshell+operator@ffcpl:/resources/receive-reply.bs</int>
<args>body,header,properties</args>
</link>
</links>
So we see that the first rewrite uses the mapper to route our request for http://localhost:8080/activemq/send
to an invocation of the active:beanshell runtime with the script send.bs. That script is shown below...
import com.ten60.netkernel.urii.aspect.*;
/* example topic publishing service */
main()
{ System.out.println("***************************TEST****************************************");
req = context.createSubRequest();
req.setURI("active:jms-send");
req.addArgument("destination","jms-topic:MyTopic");
req.addArgument("body",new StringAspect("Mary had a little Laaaaamb"));
req.addArgument("header",new StringAspect("<nvp><JMSCorrelationID>tony</JMSCorrelationID><JMSReplyTo>MyQueue</JMSReplyTo></nvp>"));
req.addArgument("properties",new StringAspect("<nvp><magic>pete</magic></nvp>"));
context.issueSubRequest(req);
resp=context.createResponseFrom(new StringAspect("message sent"));
resp.setMimeType("text/plain");
}
When executed, send.bs creates a request for active:jms-send
. The destination is
set to the URI "jms-topic:myTopic" - this indicates to the NetKernel jms message client to send this to the JNDI logical topic "myTopic".
It adds a simple string message
as the body argument, sets up some ActiveMQ header values (headers vary by JMS vendor implementation - see your vendor's
documentation for details). The request is issued and finaly a response is returned indicating the message was sent.
The message is dispatched to the ActiveMQ broker. At this point the NetKernel JMS transport comes into play. It is configured in the module's
transport section and uses the configuration in /etc/JMSConfig.xml (discussed earlier). The transport is listening to the myTopic topic.
It therefore receives from the message broker the message sent from the send.bs script. The JMS transport
issues
the message as a NetKernel request into its hosting module. In this case it will issue a request of the form...
jms-queue-transport:example.myTopic+body@[...]+header@[...]+properties@[...]
This request is issued into the internal address space of the module. Therefore we can now understand the second rewrite rule in the module.xml...
<mapping>
...
<rewrite>
<match>jms-(queue|topic)-transport:(.*)</match>
<to>active:mapper+operator@ffcpl:/resources/links.xml+operand@ffcpl:/activemq/$1/$2</to>
</rewrite>
...
</mapping>
As with the /activemq/send request we see that the messages originating from the JMS transport
are rewritten to the active:mapper for routing to handler services. In this case the request is rewritten to...
active:mapper+operator@ffcpl:/resources/links.xml+operand@ffcpl:/activemq/topic/example.myTopic+body@[...]+header@[...]+properties@[...]
Looking at the links.xml file above we see that path /topic/example.myTopic is mapped to a request to active:beanshell to
execute the receive.bs together with the body, header and properties arguments from the original transport request. receive.bs
is shown below...
import com.ten60.netkernel.urii.aspect.*;
import com.ten60.netkernel.util.*;
import org.ten60.netkernel.layer1.representation.*;
import org.ten60.netkernel.xml.representation.*;
/* example message handling service */
main()
{ messageRep = context.source("this:param:body",IAspectString.class);
message = messageRep.getAspect(IAspectString.class).getString();
System.out.println("received message: "+message+" ("+messageRep.getMeta().getMimeType()+")");
header = context.sourceAspect("this:param:header", IAspectXDA.class).getXDA();
SysLogger.log(SysLogger.CONTAINER,this,header.toString());
properties = context.sourceAspect("this:param:properties", IAspectXDA.class).getXDA();
SysLogger.log(SysLogger.CONTAINER,this,properties.toString());
req = context.createSubRequest();
req.setURI("active:jms-send");
req.addArgument("destination","jms-queue:MyQueue");
req.addArgument("body",new StringAspect("hello reply"));
context.issueSubRequest(req);
}
Here we see that the body of the message is sourced as a StringAspect and the message is written to standard out.
The header and properties are also logged. Finally a new active:jms-send request is constructed. This issues a reply
to jms-queue:MyQueue with a simple message in the body. Notice that as an example of use of topics and queues, the topic
receiver service sends a response to a queue. This module's jms transport is also listening to MyQueue. Take a look at the rewrite
rules and links.xml to understand how that message gets received by the JMS transport and is handled by the receive-reply.bs script.
Other Message Brokers
JMS message broker vary widely and may require customization of the JMS transport. A HowTo is provided for setting
up and using IBM MQ Series
.