The nmr component is an adapter to the Normalized Message Router (NMR) in ServiceMix, which is intended for use by Camel applications deployed directly into the OSGi container. You can exchange objects with NMR and not only XML like this is the case with the JBI specification. The interest of this component is that you can interconnect camel routes deployed in different OSGI bundles.
By contrast, the JBI component is intended for use by Camel applications deployed into the ServiceMix JBI container.
The NMR component is provided with Apache ServiceMix. It is not distributed with Fuse Mediation Router. To install the NMR component in ServiceMix, enter the following command in the ServiceMix console window:
features install nmr
You also need to instantiate the NMR component. You can do this by editing your Spring
configuration file, META-INF/spring/*.xml
, and adding the following
bean
instance:
<beans xmlns:osgi="http://www.springframework.org/schema/osgi" ... > ... <bean id="nmr" class="org.apache.servicemix.camel.nmr.ServiceMixComponent"> <property name="nmr"> <osgi:reference interface="org.apache.servicemix.nmr.api.NMR" /> </property> </bean> ... </beans>
The following code:
from("nmr:MyServiceEndpoint")
Automatically exposes a new endpoint to the bus with endpoint name
MyServiceEndpoint
(see #URI-format).
When an NMR endpoint appears at the end of a route, for example:
to("nmr:MyServiceEndpoint")
The messages sent by this producer endpoint are sent to the already deployed JBI endpoint.
An NMR endpoint supports the following options:
Option | Default Value | Description |
---|---|---|
synchronous
|
false
|
When this is set to true on
a consumer endpoint, an incoming, synchronous NMR Exchange will be handled on
the sender's thread instead of being handled on a new thread of the NMR
endpoint's thread pool |
runAsSubject
|
false
|
When this is set to true on a consumer endpoint, the endpoint will
be invoked on behalf of the Subject that is set on the
Exchange (i.e. the call to
Subject.getSubject(AccessControlContext) returns the
Subject instance) |
timeout
|
0
|
When this is set to a value greater than 0 , the producer endpoint
will time out if it doesn't receive a response from the NMR within the given
timeout period (in milliseconds). Configuring a timeout value will switch to
using synchronous interactions with the NMR instead of the usual asynchronous
messaging. |
Consumer:
// consume nmr exchanges asynchronously from("nmr:MyServiceEndpoint") // consume nmr exchanges synchronously and use the same thread as defined by NMR ThreadPool from("nmr:MyServiceEndpoint?synchronous=true").to()
Producer:
// produce nmr exchanges asynchronously from()...to("nmr:MyServiceEndpoint") // produce nmr exchanges synchronously and wait till 10s to receive response from()...to("nmr:MyServiceEndpoint?timeout=10000")
If you are using a stream type as the message body, you should be aware that a stream
is only capable of being read once. So if you enable DEBUG
logging,
the body is usually logged and thus read. To deal with this, Camel has a
streamCaching
option that can cache the stream, enabling you to
read it multiple times.
from("nmr:MyEndpoint").streamCaching().to("xslt:transform.xsl", "bean:doSomething");
From Camel 1.5 onwards, the stream caching is default
enabled, so it is not necessary to set the streamCaching()
option. In
Camel 2.0 we store big input streams (by default,
over 64K) in a temp
file using CachedOutputStream
.
When you close the input stream, the temp file will be deleted.
NMR camel routes can be tested using the Fuse Mediation Router unit test approach even if they will
be deployed next in different bundles in an OSGi runtime. With this aim in view, you
will extend the ServiceMixNMR
Mock class,
org.apache.servicemix.camel.nmr.AbstractComponentTest
, which will
create an NMR bus, register the Fuse Mediation Router NMR Component and the endpoints defined into
the Fuse Mediation Router routes.
public class ExchangeUsingNMRTest extends AbstractComponentTest { @Test public void testProcessing() throws InterruptedException { MockEndpoint mock = getMockEndpoint("mock:simple"); mock.expectedBodiesReceived("Simple message body"); template.sendBody("direct:simple", "Simple message body"); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { returnnew RouteBuilder() { @Override public void configure() throws Exception { from("direct:simple").to("nmr:simple"); from("nmr:simple?synchronous=true").to("mock:simple"); } }; } }