The JMS component allows messages to be sent to (or consumed from) a JMS Queue or Topic. The implementation of
the JMS Component uses Spring's JMS support for declarative transactions, using Spring's
JmsTemplate
for sending and a MessageListenerContainer
for consuming.
For users with Fuse Mediation Router 1.6.1 or older | |
---|---|
JMS consumers have a bad default in Fuse Mediation Router 1.6.1 or older. The
<from uri="jms:queue:foo"/> By adding the <from uri="jms:queue:foo&axMessagesPerTask=-1"/> This has been fixed in Fuse Mediation Router 1.6.2/2.0. |
Using ActiveMQ | |
---|---|
If you are using Apache ActiveMQ, you should prefer the ActiveMQ component as it has been particularly optimized for ActiveMQ. All of the options and samples on this page are also valid for the ActiveMQ component. |
Using JMS API 1.0.2 | |
---|---|
The old JMS API 1.0.2 has been @deprecated in Camel 2.1 and will be removed in Camel 2.2 release. Its no longer provided in Spring 3.0 which we want to be able to support out of the box in Camel 2.2+ releases. |
jms:[queue:|topic:]destinationName[?options]
Where destinationName
is a JMS queue or topic name. By default, the
destinationName
is interpreted as a queue name. For example, to connect
to the queue, FOO.BAR, use:
jms:FOO.BAR
You can include the optional queue:
prefix, if you prefer:
jms:queue:FOO.BAR
To connect to a topic, you must include the topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
jms:topic:Stocks.Prices
You can append query options to the URI in the following format,
?option=value&option=value&...
If you are using ActiveMQ | |
---|---|
Note that the JMS component reuses Spring 2's
|
If you wish to use durable topic subscriptions, you need to specify both clientId and durableSubscriptionName. Note that the value of the clientId
must be unique and can only be used by a single JMS connection instance in your entire
network. You may prefer to use Virtual Topics instead
to avoid this limitation. More background on durable messaging here.
When using message headers, the JMS specification states that header names must be valid Java identifiers. So, by default, Fuse Mediation Router ignores any headers that do not match this rule. So try to name your headers as if they are valid Java identifiers. One benefit of doing this is that you can then use your headers inside a JMS Selector (whose SQL92 syntax mandates Java identifier syntax for headers).
From Fuse Mediation Router 1.4 onwards, a simple strategy for mapping header names is used by default. The strategy is to replace any dots in the header name with the underscore character and to reverse the replacement when the header name is restored from a JMS message sent over the wire. What does this mean? No more losing method names to invoke on a bean component, no more losing the filename header for the File Component, and so on.
The current header name strategy for accepting header names in Fuse Mediation Router is as follows:
Replace all dots with underscores (for example,
org.apache.camel.MethodName
becomesorg_apache_camel_MethodName
).Test if the name is a valid java identifier using the JDK core classes.
If the test success, the header is added and sent over the wire; otherwise it is dropped (and logged at
DEBUG
level).
In Fuse Mediation Router 2.0 the strategy for mapping header names has been changed to use the following replacement strategy:
Dots are replaced by
_DOT_
and the replacement is reversed when Fuse Mediation Router consume the messageHyphen is replaced by
_HYPHEN_
and the replacement is reversed when Fuse Mediation Router consumes the message
You can configure many different properties on the JMS endpoint which map to properties on the JMSConfiguration POJO. Note: Many of these properties map to properties on Spring JMS, which Fuse Mediation Router uses for sending and receiving messages. So you can get more information about these properties by consulting the relevant Spring documentation.
Option | Default Value | Description |
---|---|---|
acceptMessagesWhileStopping
|
false
|
Specifies whether the consumer accept messages while it is stopping. |
acknowledgementModeName
|
AUTO_ACKNOWLEDGE
|
The JMS acknowledgement name, which is one of: TRANSACTED ,
CLIENT_ACKNOWLEDGE , AUTO_ACKNOWLEDGE ,
DUPS_OK_ACKNOWLEDGE
|
acknowledgementMode
|
-1
|
The JMS acknowledgement mode defined as an Integer. Allows you to set vendor-specific
extensions to the acknowledgment mode. For the regular modes, it is preferable to use the
acknowledgementModeName instead. |
alwaysCopyMessage
|
false
|
If true , Fuse Mediation Router will always make a JMS message copy of the message
when it is passed to the producer for sending. Copying the message is needed in some
situations, such as when a replyToDestinationSelectorName is set
(incidentally, Fuse Mediation Router will set the alwaysCopyMessage option to
true , if a replyToDestinationSelectorName is set)
|
autoStartup
|
true
|
Specifies whether the consumer container should auto-startup. |
cacheLevelName
|
- | Sets the cache level by name for the underlying JMS resources.
Possible values are:
CACHE_AUTO ,
CACHE_CONNECTION ,
CACHE_CONSUMER ,
CACHE_NONE , and
CACHE_SESSION . See the
Spring documentation
and see the warning above. The default setting is
CACHE_CONSUMER for Camel 2.7.1 or older.
Camel 2.8 uses CACHE_AUTO as default value. |
cacheLevel
|
- | Sets the cache level by ID for the underlying JMS resources.
See cacheLevelName option for more details. |
clientId
|
null
|
Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance. It is typically only required for durable topic subscriptions. You may prefer to use Virtual Topics instead. |
consumerType
|
Default
|
The consumer type to use, which can be one of: Simple ,
Default or ServerSessionPool . The consumer type
determines which Spring JMS listener to use. Default will use
org.springframework.jms.listener.DefaultMessageListenerContainer ,
Simple will use
org.springframework.jms.listener.SimpleMessageListenerContainer , and
ServerSessionPool will use
org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer .
If the option, useVersion102=true , Camel will use the JMS 1.0.2 Spring
classes instead. ServerSessionPool is @deprecated and will be removed in Camel 2.0. This option has been removed
from Camel 2.7 onwards. |
concurrentConsumers
|
1
|
Specifies the default number of concurrent consumers. |
connectionFactory
|
null
|
The default JMS connection factory to use for the
listenerConnectionFactory and
templateConnectionFactory , if neither is specified. |
deliveryPersistent
|
true
|
Specifies whether persistent delivery is used by default. |
destination
|
null
|
Fuse Mediation Router 2.0: Specifies the JMS Destination object to use on this endpoint. |
destinationName
|
null
|
Fuse Mediation Router 2.0: Specifies the JMS destination name to use on this endpoint. |
destinationResolver
|
null
|
A pluggable
org.springframework.jms.support.destination.DestinationResolver that
allows you to use your own resolver (for example, to lookup the real destination in a JNDI
registry). |
disableReplyTo
|
false
|
If true , ignore the JMSReplyTo header and so
treat messages as InOnly by default and do not send a reply back.
|
durableSubscriptionName
|
null
|
The durable subscriber name for specifying durable topic subscriptions. The
clientId option must be configured
as well. |
disableTimeToLive
|
false
|
Camel 2.8: Use this option to force disabling time to live. For example when you do request/reply over JMS, then Camel will by default use the requestTimeout value as time to live on the message being send. The problem is that the sender and receiver systems have to have their clocks synchronized, so they are in sync. This is not always so easy to archive. So you can use disableTimeToLive=true to not set a time to live value on the send message. Then the message will not expire on the receiver system. See below in section About time to live for more details. |
eagerLoadingOfProperties
|
false
|
Enables eager loading of JMS properties as soon as a message is received, which is generally inefficient, because the JMS properties might not be required. But this feature can sometimes catch early any issues with the underlying JMS provider and the use of JMS properties. This feature can also be used for testing purposes, to ensure JMS properties can be understood and handled correctly. |
forceSendOriginalMessage
|
false
|
Camel 2.7: When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination if you touch the headers (get or set) during the route. Set this option to true to force Camel to send the original JMS message that was received. |
exceptionListener
|
null
|
Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions. |
explicitQosEnabled
|
false
|
Set if the deliveryMode , priority or
timeToLive qualities of service should be used when sending messages.
This option is based on Spring's JmsTemplate . The
deliveryMode , priority and
timeToLive options are applied to the current endpoint. This
contrasts with the preserveMessageQos option, which operates at message
granularity, reading QoS properties exclusively from the Fuse Mediation Router In message headers.
|
exposeListenerSession
|
true
|
Specifies whether the listener session should be exposed when consuming messages. |
idleTaskExecutionLimit
|
1
|
Specifies the limit for idle executions of a receive task, not having received any
message within its execution. If this limit is reached, the task will shut down and leave
receiving to other executing tasks (in the case of dynamic scheduling; see the
maxConcurrentConsumers setting). |
jmsMessageType
|
null
|
Fuse Mediation Router 2.0: Allows you to force the use of a specific
javax.jms.Message implementation for sending JMS messages. Possible
values are: Bytes , Map , Object ,
Stream , Text . By default, Fuse Mediation Router would determine
which JMS message type to use from the In body type. This option allows you to specify it.
|
jmsKeyFormatStrategy
|
default
|
Fuse Mediation Router 2.0: Pluggable strategy for encoding and
decoding JMS keys so they can be compliant with the JMS specification. Fuse Mediation Router provides
two implementations out of the box: default and
passthrough . The default strategy will safely
marshal dots and hyphens (. and -). The passthrough
strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS
header keys contain illegal characters. You can provide your own implementation of the
org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it
using the # notation. |
jmsOperations
|
null
|
Allows you to use your own implementation of the
org.springframework.jms.core.JmsOperations interface. Fuse Mediation Router uses
JmsTemplate as default. Can be used for testing purpose, but not used
much as stated in the spring API docs. |
lazyCreateTransactionManager
|
true
|
Fuse Mediation Router 2.0: If true , Fuse Mediation Router will
create a JmsTransactionManager , if there is no
transactionManager injected when option
transacted=true . |
listenerConnectionFactory
|
null
|
The JMS connection factory used for consuming messages. |
mapJmsMessage
|
true
|
Fuse Mediation Router 1.6.2/2.0: Specifies whether Fuse Mediation Router should
auto map the received JMS message to an appropiate payload type, such as
javax.jms.TextMessage to a String etc. See section
about how mapping works below for more details. |
maxConcurrentConsumers
|
1
|
Specifies the maximum number of concurrent consumers. |
maxMessagesPerTask
|
-1
|
The number of messages per task. -1 is unlimited. |
messageConverter
|
null
|
Fuse Mediation Router 1.6.2/2.0: To use a custom Spring
org.springframework.jms.support.converter.MessageConverter so you can
be 100% in control how to map to/from a javax.jms.Message . |
messageIdEnabled
|
true
|
When sending, specifies whether message IDs should be added. The message IDs are
generated using the UUID generator registered with the CamelContext —for
details, see Built-In UUID Generators in Programing EIP Components. |
messageTimestampEnabled
|
true
|
Specifies whether timestamps should be enabled by default on sending messages. |
password
|
null
|
The password for the connector factory. |
priority
|
4
|
Values greater than 1 specify the message priority when sending (where 0 is the lowest
priority and 9 is the highest). The explicitQosEnabled option must also be enabled in order for this option to have any effect.
|
pubSubNoLocal
|
false
|
Specifies whether to inhibit the delivery of messages published by its own connection. |
receiveTimeout
|
None | The timeout for receiving messages (in milliseconds). |
recoveryInterval
|
5000
|
Specifies the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds. |
preserveMessageQos
|
false
|
Camel 2.0: Set to true , if you want
to send message using the QoS settings specified on the message, instead of the QoS
settings on the JMS endpoint. The following three headers are considered
JMSPriority , JMSDeliveryMode , and
JMSExpiration . You can provide all or only some of them. If not
provided, Camel will fall back to use the values from the endpoint instead. So, when using
this option, the headers override the values from the endpoint. The
explicitQosEnabled option, by contrast, will only use options set on
the endpoint, and not values from the message header. |
replyTo
|
null
|
Provides an explicit ReplyTo destination, which overrides any incoming value of
Message.getJMSReplyTo() . If you do
Request Reply
over JMS then read the section further below for more details. |
replyToDestinationSelectorName
|
null
|
Sets the JMS Selector using the fixed name to be used so you can filter out your own replies from the others when using a shared queue (that is, if you are not using a temporary reply queue). |
replyToDeliveryPersistent
|
true
|
Specifies whether to use persistent delivery by default for replies. |
requestTimeout
|
20000
|
The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. See below in section About time to live for more details. |
selector
|
null
|
Sets the JMS Selector, which is an SQL 92 predicate that is used to filter messages within the broker. You may have to encode special characters such as = as %3D |
subscriptionDurable
|
false
|
@deprecated: Enabled by default, if you specify a
durableSubscriberName and a clientId . |
taskExecutor
|
null
|
Allows you to specify a custom task executor for consuming messages. |
taskExecutorSpring2
|
null
|
To use when using Spring 2.x with Camel. Allows you to specify a custom task executor for consuming messages. |
templateConnectionFactory
|
null
|
The JMS connection factory used for sending messages. |
timeToLive
|
null
|
When sending messages, specifies the time-to-live of the message (in milliseconds). The
explicitQosEnabled option must also
be enabled in order for this option to have any effect. |
transacted
|
false
|
Specifies whether to use transacted mode for sending/receiving messages using the InOnly Exchange Pattern. See the section Enabling Transacted Consumption for more details. |
transactedInOut
|
false
|
@deprecated: Specifies whether to use transacted mode for sending messages using the InOut Exchange Pattern. Applies only to producer endpoints. See section Enabling Transacted Consumption for more details. |
transactionManager
|
null
|
The Spring transaction manager to use. |
transactionName
|
null
|
The name of the transaction to use. |
transactionTimeout
|
null
|
The timeout value of the transaction, if using transacted mode. |
transferException
|
false
|
Camel 2.0: If enabled and you are using Request
Reply messaging (InOut) and an Exchange failed
on the consumer side, then the caused Exception will be send back in
response as a javax.jms.ObjectMessage . If the client is Camel, the
returned Exception is rethrown. This allows you to use Camel JMS as a bridge in your routing - for example, using
persistent queues to enable robust routing. Notice that if you also have transferExchange enabled, this option takes precedence. The
caught exception is required to be serializable. The original Exception
on the consumer side can be wrapped in an outer exception such as
org.apache.camel.RuntimeCamelException when returned to the producer.
|
transferExchange
|
false
|
Fuse Mediation Router 2.0: You can transfer the exchange over the
wire instead of just the body and headers. The following fields are transferred: In body,
Out body, Fault body, In headers, Out headers, Fault headers, exchange properties,
exchange exception. This requires that the objects are serializable. Fuse Mediation Router will exclude
any non-serializable objects and log it at WARN level. |
username
|
null
|
The username for the connector factory. |
useMessageIDAsCorrelationID
|
false
|
Specifies whether JMSMessageID should always be used as
JMSCorrelationID for InOut messages.
|
useVersion102
|
false
|
@deprecated (removed from Camel 2.5 onwards): Specifies whether the old JMS API should be used. |
Fuse Mediation Router automatically maps messages between javax.jms.Message
and
org.apache.camel.Message
.
When sending a JMS message, Fuse Mediation Router converts the message body to the following JMS message types:
Body Type | JMS Message | Comment |
---|---|---|
String
|
javax.jms.TextMessage
|
|
org.w3c.dom.Node
|
javax.jms.TextMessage
|
The DOM will be converted to String . |
Map
|
javax.jms.MapMessage
|
|
java.io.Serializable
|
javax.jms.ObjectMessage
|
|
byte[]
|
javax.jms.BytesMessage
|
|
java.io.File
|
javax.jms.BytesMessage
|
|
java.io.Reader
|
javax.jms.BytesMessage
|
|
java.io.InputStream
|
javax.jms.BytesMessage
|
|
java.nio.ByteBuffer
|
javax.jms.BytesMessage
|
When receiving a JMS message, Fuse Mediation Router converts the JMS message to the following body type:
JMS Message | Body Type |
---|---|
javax.jms.TextMessage
|
String
|
javax.jms.BytesMessage
|
byte[]
|
javax.jms.MapMessage
|
Map<String, Object>
|
javax.jms.ObjectMessage
|
Object
|
Available as of Fuse Mediation Router 1.6.2/2.0
You can use the mapJmsMessage
option to disable the auto-mapping above.
If disabled, Fuse Mediation Router will not try to map the received JMS message, but instead uses it
directly as the payload. This allows you to avoid the overhead of mapping and let Fuse Mediation Router
just pass through the JMS message. For instance, it even allows you to route
javax.jms.ObjectMessage
JMS messages with classes you do not have on the classpath.
Available as of Fuse Mediation Router 1.6.2/2.0
You can use the messageConverter
option to do the mapping yourself in a
Spring org.springframework.jms.support.converter.MessageConverter
class.
For example, in the route below we use a custom message converter when sending a message to the JMS order queue:
from("file://inbox/order").to("jms:queue:order?messageConverter=#myMessageConverter");
You can also use a custom message converter when consuming from a JMS destination.
Available as of Fuse Mediation Router 2.0
You can use the jmsMessageType option on the endpoint URL
to force a specific message type for all messages. In the route below, we poll files from a
folder and send them as javax.jms.TextMessage
as we have forced the JMS
producer endpoint to use text messages:
from("file://inbox/order").to("jms:queue:order?jmsMessageType=Text");
You can also specify the message type to use for each messabe by setting the header with
the key CamelJmsMessageType
. For example:
from("file://inbox/order").setHeader("CamelJmsMessageType", JmsMessageType.Text).to("jms:queue:order");
The possible values are defined in the enum
class,
org.apache.camel.jms.JmsMessageType
.
The exchange that is sent over the JMS wire must conform to the JMS Message spec.
For the exchange.in.header
the following rules apply for the header
keys:
Keys starting with
JMS
orJMSX
are reserved.exchange.in.headers
keys must be literals and all be valid Java identifiers (do not use dots in the key name).From Fuse Mediation Router 1.4 until Fuse Mediation Router 1.6.x, Fuse Mediation Router automatically replaces all dots with underscores in key names. This replacement is reversed when Fuse Mediation Router consumes JMS messages.
From Fuse Mediation Router 2.0 onwards, Fuse Mediation Router replaces dots & hyphens and the reverse when when consuming JMS messages:
.
is replaced by_DOT_
and the reverse replacement when Fuse Mediation Router consumes the message.-
is replaced by_HYPHEN_
and the reverse replacement when Fuse Mediation Router consumes the message.See also the option
jmsKeyFormatStrategy
introduced in Fuse Mediation Router 2.0, which allows you to use your own custom strategy for formatting keys.
For the exchange.in.header
, the following rules apply for the header
values:
The values must be primitives or their counter objects (such as
Integer
,Long
,Character
). The types,String
,CharSequence
,Date
,BigDecimal
andBigInteger
are all converted to theirtoString()
representation. All other types are dropped.
Fuse Mediation Router will log with category
org.apache.camel.component.jms.JmsBinding
at DEBUG level if it drops a given header value. For example:
2008-07-09 06:43:04,046 [main ] DEBUG JmsBinding - Ignoring non primitive header: order of class: org.apache.camel.component.jms.issues.DummyOrder with value: DummyOrder{orderId=333, itemId=4444, quantity=2}
Fuse Mediation Router adds the following properties to the Exchange
when it receives
a message:
Property | Type | Description |
---|---|---|
org.apache.camel.jms.replyDestination
|
javax.jms.Destination
|
The reply destination. |
Fuse Mediation Router adds the following JMS properties to the In message headers when it receives a JMS message:
Header | Type | Description |
---|---|---|
JMSCorrelationID
|
String
|
The JMS correlation ID. |
JMSDeliveryMode
|
int
|
The JMS delivery mode. |
JMSDestination
|
javax.jms.Destination
|
The JMS destination. |
JMSExpiration
|
long
|
The JMS expiration. |
JMSMessageID
|
String
|
The JMS unique message ID. |
JMSPriority
|
int
|
The JMS priority (with 0 as the lowest priority and 9 as the highest). |
JMSRedelivered
|
boolean
|
Is the JMS message redelivered. |
JMSReplyTo
|
javax.jms.Destination
|
The JMS reply-to destination. |
JMSTimestamp
|
long
|
The JMS timestamp. |
JMSType
|
String
|
The JMS type. |
JMSXGroupID
|
String
|
The JMS group ID. |
As all the above information is standard JMS you can check the JMS documentation for further details.
The JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for.
When Fuse Mediation Router sends a message using its JMSProducer
, it checks the
following conditions:
The message exchange pattern,
Whether a
JMSReplyTo
was set in the endpoint or in the message headers,Whether any of the following options have been set on the JMS endpoint:
disableReplyTo
,preserveMessageQos
,explicitQosEnabled
.
All this can be a tad complex to understand and configure to support your use case.
The JmsProducer
behaves as follows, depending on configuration:
Exchange Pattern | Other options | Description |
---|---|---|
InOut | Fuse Mediation Router will expect a reply, set a temporary JMSReplyTo , and after
sending the message, it will start to listen for the reply message on the temporary queue.
|
|
InOut | JMSReplyTo is set |
Fuse Mediation Router will expect a reply and, after sending the message, it will start to listen
for the reply message on the specified JMSReplyTo queue. |
InOnly | Fuse Mediation Router will send the message and not expect a reply. | |
InOnly | JMSReplyTo is set |
By default, Fuse Mediation Router suppresses the If you want to leave the |
The JmsConsumer
behaves as follows, depending on configuration:
Exchange Pattern | Other options | Description |
---|---|---|
InOut | Fuse Mediation Router will send the reply back to the JMSReplyTo queue. |
|
InOnly | Fuse Mediation Router will not send a reply back, as the pattern is InOnly. | |
disableReplyTo=true
|
This option suppresses replies. |
So pay attention to the message exchange pattern set on your exchanges.
If you send a message to a JMS destination in the middle of your route you can specify the
exchange pattern to use, see more at Request Reply. This is useful if you
want to send an InOnly
message to a JMS topic:
from("activemq:queue:in") .to("bean:validateOrder") .to(ExchangePattern.InOnly, "activemq:topic:order") .to("bean:handleOrder");
Available as of Fuse Mediation Router 1.6.2/2.0 If you need to send messages to a lot of different JMS destinations, it makes sense to reuse a JMS endpoint and specify the real destination in a message header. This allows Fuse Mediation Router to reuse the same endpoint, but send to different destinations. This greatly reduces the number of endpoints created and economizes on memory and thread resources.
You can specify the destination in the following headers:
Header | Type | Description |
---|---|---|
CamelJmsDestination
|
javax.jms.Destination
|
Fuse Mediation Router 2.0: A destination object. |
CamelJmsDestinationName
|
String
|
Fuse Mediation Router 1.6.2/2.0: The destination name. |
For example, the following route shows how you can compute a destination at run time and use it to override the destination appearing in the JMS URL:
from("file://inbox") .to("bean:computeDestination") .to("activemq:queue:dummy");
The queue name, dummy
, is just a placeholder. It must be provided as
part of the JMS endpoint URL, but it will be ignored in this example.
In the computeDestination
bean, specify the real destination by setting
the CamelJmsDestinationName
header as follows:
public void setJmsHeader(Exchange exchange) { String id = .... exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id"); }
Then Fuse Mediation Router will read this header and use it as the destination instead of the one
configured on the endpoint. So, in this example Fuse Mediation Router sends the message to
activemq:queue:order:2
, assuming the id
value was
2.
If both the CamelJmsDestination
and the
CamelJmsDestinationName
headers are set,
CamelJmsDestination
takes priority.
You can configure your JMS provider in Spring XML as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <jmxAgent id="agent" disabled="true"/> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false&roker.useJmx=false"/> </bean> </property> </bean>
Basically, you can configure as many JMS component instances as you wish and give them a
unique name using the id attribute. The preceding example
configures an activemq
component. You could do the same to configure
MQSeries, TibCo, BEA, Sonic and so on.
Once you have a named JMS component, you can then refer to endpoints within that component
using URIs. For example for the component name, activemq
, you can then
refer to destinations using the URI format,
activemq:[queue:|topic:]destinationName
. You can use the same approach
for all other JMS providers.
This works by the SpringCamelContext lazily fetching components from the spring context for the scheme name you use for Endpoint URIs and having the Component resolve the endpoint URIs.
If you are using a J2EE container, you might need to look up JNDI to find the JMS
ConnectionFactory
rather than use the usual <bean>
mechanism in Spring. You can do this using Spring's factory bean or the new Spring XML
namespace. For example:
<bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="myConnectionFactory"/> </bean> <jee:jndi-lookup id="myConnectionFactory" jndi-name="jms/connectionFactory"/>
See The jee schema in the Spring reference documentation for more details about JNDI lookup.
You need to use the destinationResolver
option to use the Spring JNDI
resolver that can lookup in the JNDI, or use your own custom implementation.
See this nabble post for more details: http://www.nabble.com/JMS-queue---JNDI-instead-of-physical-name-td24484994.html
See this link at nabble for details of how a Fuse Mediation Router user configured JMS to connect to remote WebSphere MQ brokers.
A common requirement with JMS is to consume messages concurrently in multiple threads in
order to make an application more responsive. You can set the
concurrentConsumers
option to specify the number of threads servicing the
JMS endpoint, as follows:
from("jms:SomeQueue?concurrentConsumers=20"). bean(MyClass.class);
You can configure this option in one of the following ways:
On the
JmsComponent
,On the endpoint URI or,
By invoking
setConcurrentConsumers()
directly on theJmsEndpoint
.
Camel supports Request Reply over JMS. In essence the MEP of the Exchange should be InOut
when you send a message to a JMS queue.
The JmsProducer
detects the InOut
and provides a JMSReplyTo
header with the reply destination to be used. By default Camel uses a temporary queue, but you can use the replyTo
option on the endpoint to specify a fixed reply queue (see more below about fixed reply queue).
Camel will automatic setup a consumer which listen on the reply queue, so you should not do anything.
This consumer is a Spring DefaultMessageListenerContainer
which listen for replies. However it's fixed to 1 concurrent consumer.
That means replies will be processed in sequence as there are only 1 thread to process the replies. If you want to process replies faster, then we need to use concurrency. But not using the concurrentConsumer
option. We should use the threads
from the Camel DSL instead, as shown in the route below:
from(xxx) .inOut().to("activemq:queue:foo") .threads(5) .to(yyy) .to(zzz);
In this route we instruct Camel to route replies asynchronously using a thread pool with 5 threads.
If you use a fixed reply queue when doing Request Reply over JMS as shown in the example below, then pay attention.
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar") .to(yyy)
In this example the fixed reply queue named bar is used. When using fixed reply queues, then Camel assumes that it may not be an exclusive consumer on the queue, and therefore it uses a JMSSelector
to only pickup the expected reply messages (eg based on the JMSCorrelationID
). That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the receiveTimeout
option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar?receiveTimeout=250") .to(yyy)
Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic. It is generally recommended to use temporary queues if possible.
When doing messaging between systems, its desirable that the systems have synchronized clocks. For example when sending a JMS message, then you can set a time to live value on the message. Then the receiver can inspect this value, and determine if the message is already expired, and thus drop the message instead of consume and process it. However this requires that both sender and receiver have synchronized clocks. If you are using ActiveMQ then you can use the timestamp plugin to synchronize clocks.
Read first above about synchronized clocks.
When you do request/reply (InOut) over JMS with Camel then Camel uses a timeout on the sender side, which is default 20 seconds from the requestTimeout
option. You can control this by setting a higher/lower value. However the time to live value is still set on the JMS message being send. So that requires the clocks to be synchronized between the systems. If they are not, then you may want to disable the time to live value being set. This is now possible using the disableTimeToLive
option from Camel 2.8 onwards. So if you set this option to disableTimeToLive=true
, then Camel does not set any time to live value when sending JMS messages. But the request timeout is still active. So for example if you do request/reply over JMS and have disabled time to live, then Camel will still use a timeout by 20 seconds (the requestTimeout
option). That option can of course also be configured. So the two options requestTimeout
and disableTimeToLive
gives you fine grained control when doing request/reply.
When you do fire and forget (InOut) over JMS with Camel then Camel by default does not set any time to live value on the message. You can configure a value by using the timeToLive
option. For example to indicate a 5 sec., you set timeToLive=5000
. The option disableTimeToLive
can be used to force disabling the time to live, also for InOnly messaging. The requestTimeout
option is not being used for InOnly messaging.
A common requirement is to consume from a queue in a transaction and then process the message using the Fuse Mediation Router route. To do this, just ensure that you set the following properties on the component/endpoint:
transacted
= truetransactionManager
= a Transsaction Manager - typically theJmsTransactionManager
See also the Transactional Client EIP pattern for further details.
Avaiable as of Fuse Mediation Router 2.0
When using Fuse Mediation Router as a JMS listener, it sets an Exchange property with the value of the
ReplyTo javax.jms.Destination
object, having the key
ReplyTo
. You can obtain this Destination
as
follows:
Destination replyDestination = exchange.getIn().getHeader(JmsConstants.JMS_REPLY_DESTINATION, Destination.class);
And then later use it to send a reply using regular JMS or Fuse Mediation Router.
// we need to pass in the JMS component, and in this sample we use ActiveMQ JmsEndpoint endpoint = JmsEndpoint.newInstance(replyDestination, activeMQComponent); // now we have the endpoint we can use regular Fuse Mediation Router API to send a message to it template.sendBody(endpoint, "Here is the late reply.");
A different solution to sending a reply is to provide the
replyDestination
object in the same Exchange property when sending.
Fuse Mediation Router will then pick up this property and use it for the real destination. The endpoint URI
must include a dummy destination, however. For example:
// we pretend to send it to some non existing dummy queue template.send("activemq:queue:dummy, new Processor() { public void process(Exchange exchange) throws Exception { // and here we override the destination with the ReplyTo destination object so the message is sent to there instead of dummy exchange.getIn().setHeader(JmsConstants.JMS_DESTINATION, replyDestination); exchange.getIn().setBody("Here is the late reply."); } }
In the sample below we send a Request Reply style message Exchange (we use the requestBody
method =
InOut
) to the slow queue for further processing in Camel and we wait for
a return reply:
// send a in-out with a timeout for 5 sec Object out = template.requestBody("activemq:queue:slow?requestTimeout=5000", "Hello World");
JMS is used in many examples for other components as well. But we provide a few samples below to get started.
In the following sample we configure a route that receives JMS messages and routes the message to a POJO:
from("jms:queue:foo"). to("bean:myBusinessLogic");
You can of course use any of the EIP patterns so the route can be context based. For example, here's how to filter an order topic for the big spenders:
from("jms:topic:OrdersTopic"). filter().method("myBean", "isGoldCustomer"). to("jms:queue:BigSpendersQueue");
In the sample below we poll a file folder and send the file content to a JMS topic. As we
want the content of the file as a TextMessage
instead of a
BytesMessage
, we need to convert the body to a
String
:
from("file://orders"). convertBodyTo(String.class). to("jms:topic:OrdersTopic");
Using Annotations
Fuse Mediation Router also has annotations so you can use POJO Consuming and POJO Producing.
The preceding examples use the Java DSL. Fuse Mediation Router also supports Spring XML DSL. Here is the big spender sample using Spring DSL:
<route> <from uri="jms:topic:OrdersTopic"/> <filter> <method bean="myBean" method="isGoldCustomer"/> <to uri="jms:queue:BigSpendersQueue"/> </filter> </route>
JMS appears in many of the examples for other components and EIP patterns, as well in this Fuse Mediation Router documentation. So feel free to browse the documentation. If you have time, check out the this tutorial that uses JMS but focuses on how well Spring Remoting and Fuse Mediation Router works together Tutorial-JmsRemoting.
Available as of Camel 2.0 Normally, when using JMS as the transport, it only transfers the body and headers as
the payload. If you want to use JMS with a Dead
Letter Channel, using a JMS queue as the Dead Letter Queue, then normally the caused
Exception is not stored in the JMS message. You can, however, use the transferExchange option on the JMS dead letter queue to instruct Camel to store
the entire Exchange in the queue as a
javax.jms.ObjectMessage
that holds a
org.apache.camel.impl.DefaultExchangeHolder
. This allows you to consume
from the Dead Letter Queue and retrieve the caused exception from the Exchange property with
the key Exchange.EXCEPTION_CAUGHT
. The demo below illustrates this:
// setup error handler to use JMS as queue and store the entire Exchange errorHandler(deadLetterChannel("jms:queue:dead?transferExchange=true"));
Then you can consume from the JMS queue and analyze the problem:
from("jms:queue:dead").to("bean:myErrorAnalyzer"); // and in our bean String body = exchange.getIn().getBody(); Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); // the cause message is String problem = cause.getMessage();
You can use JMS to store the cause error message or to store a custom body, which you can initialize yourself. The following example uses the Message Translator EIP to do a transformation on the failed exchange before it is moved to the JMS dead letter queue:
// we sent it to a seda dead queue first errorHandler(deadLetterChannel("seda:dead")); // and on the seda dead queue we can do the custom transformation before its sent to the JMS queue from("seda:dead").transform(exceptionMessage()).to("jms:queue:dead");
Here we only store the original cause error message in the transform. You can, however, use any Expression to send whatever you like. For example, you can invoke a method on a Bean or use a custom processor.
Transactional Client
When sending to a JMS destination using camel-jms the producer will use the MEP to detect if its InOnly or InOut messaging. However there can be times where you want to send an InOnly message but keeping the JMSReplyTo header. To do so you have to instruct Camel to keep it, otherwise the JMSReplyTo header will be dropped.
For example to send an InOnly message to the foo queue, but with a JMSReplyTo with bar queue you can do as follows:
template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("World"); exchange.getIn().setHeader("JMSReplyTo", "bar"); } });
Notice we use preserveMessageQos=true
to instruct Camel to keep the
JMSReplyTo header.
Some JMS providers, like IBM's WebSphere MQ, require options to be set on the JMS
destination. For example, you may need to specify the targetClient
option.
Because targetClient
is a WebSphere MQ option and not a Camel URI option, you
need to set the option on the JMS destination name, as follows:
... .setHeader("CamelJmsDestinationName", constant("queue:///MY_QUEUE?targetClient=1")) .to("wmq:queue:MY_QUEUE?useMessageIDAsCorrelationID=true");
Some versions of WMQ won't accept this option on the destination name and you will get an exception like:
com.ibm.msg.client.jms.DetailedJMSException: JMSCC0005: The specified value 'MY_QUEUE?targetClient=1' is not allowed for 'XMSC_DESTINATION_NAME'
A workaround is to use a custom DestinationResolver
:
JmsComponent wmq = new JmsComponent(connectionFactory); wmq.setDestinationResolver(new DestinationResolver(){ public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException { MQQueueSession wmqSession = (MQQueueSession) session; return wmqSession.createQueue("queue:///" + destinationName + "?targetClient=1"); } });