The ActiveMQ component allows messages to be sent to a JMS Queue or Topic; or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on the JMS Component and uses
Spring's JMS support for declarative transactions, using Spring's
JmsTemplate
for sending and a MessageListenerContainer
for consuming. All the options from the JMS component also
apply for this component.
To use this component, make sure you have the activemq.jar
or
activemq-core.jar
on your classpath along with any Fuse Mediation Router dependencies
such as camel-core.jar
, camel-spring.jar
and
camel-jms.jar
.
activemq:[queue:|topic:]destinationName
Where destinationName is an ActiveMQ queue or topic name.
By default, the destinationName is interpreted as a queue
name. For example, to connect to the queue, FOO.BAR
, use:
activemq:FOO.BAR
You can include the optional queue:
prefix, if you prefer:
activemq:queue:FOO.BAR
To connect to a topic, you must include the topic:
prefix. For example,
to connect to the topic, Stocks.Prices
, use:
activemq:topic:Stocks.Prices
See Options on the JMS component as all these options also apply for this component.
The following test case shows how to add an ActiveMQComponent to the CamelContext using the activeMQComponent()
method while specifying the brokerURL used to
connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
When sending to an ActiveMQ broker using Camel it's recommended to use a pooled connection factory to handle efficient pooling of JMS connections, sessions and producers. This is documented in the page ActiveMQ Spring Support.
You can grab Jencks AMQ pool with Maven:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.3.2</version> </dependency>
And then setup the activemq component as follows:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="maxConnections" value="8" /> <property name="maximumActive" value="500" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="transacted" value="false"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component is capable of invoking any JMS MessageListener bean directly inside any route.
So for example you can create a MessageListener in JMS as follows:
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
Then use it in your route as follows
from("file://foo/bar"). bean(MyListener.class);
That is, you can reuse any of the Fuse Mediation Router
Components and easily integrate them into your JMS
MessageListener
POJO\!
ActiveMQ can generate Advisory messages which are put in topics that you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
If you consume a message on a queue, you should see the following files under data/activemq folder :
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
and containing string:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
You must have the camel-jms
as
dependency as ActiveMQ is an extension to the JMS component.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>1.6.0</version> </dependency>
The ActiveMQ component is released with the ActiveMQ project itself. For Maven 2 users you just need to add the following dependency to your project.
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.2.0</version> </dependency>
For 5.1.0 its in the activemq-core library
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.1.0</version> </dependency>
Alternatively you can download the component JAR file directly from the Maven repository:
For this version you must use the JMS component instead. Please be careful to use a pooling connection factory as described in the JmsTemplate Gotchas