LibraryLink ToToggle FramesPrintFeedback

Transactional Client

The transactional client pattern refers to messaging endpoints that can participate in a transaction. FUSE Mediation Router supports transactions using Spring transaction management.


Not all FUSE Mediation Router endpoints support transactions. Those that do are called transaction oriented endpoints (or TOEs). For example, both the JMS component and the ActiveMQ component support transactions.

In order to enable transactions on a component, you need to perform the appropriate initialization before adding the component to the CamelContext. For this reason, you need to write some code to initialize your transactional components explicitly.

For example, consider a JMS component that is layered over ActiveMQ. To initialize this as a transactional component, you need to define an instance of JmsTransactionManager and an instance of ActiveMQConnectionFactory, using the following Spring XML configuration:

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
  
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

You can then initialize the JMS/ActiveMQ component using the following code:

// Java
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.camel.component.jms.JmsComponent;

import javax.jms.ConnectionFactory;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.PlatformTransactionManager;
...
ApplicationContext spring = new ClassPathXmlApplicationContext("org/apache/camel/transaction/spring.xml");
CamelContext camelContext = SpringCamelContext.springCamelContext(spring);
 
PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager");
ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory");
JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
component.getConfiguration().setConcurrentConsumers(1);
camelContext.addComponent("activemq", component);

Outbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a transaction policy to the processing route. First, define the transaction policies in your XML configuration. For example, you can define the transaction policies, PROPAGATION_REQUIRED, PROPAGATION_NOT_SUPPORTED, and PROPAGATION_REQUIRES_NEW, as follows:

  <bean id="PROPAGATION_REQUIRED" class="org.springframework.transaction.support.TransactionTemplate">
    <property name="transactionManager" ref="jmsTransactionManager"/>
  </bean>
  
  <bean id="PROPAGATION_NOT_SUPPORTED" class="org.springframework.transaction.support.TransactionTemplate">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
  </bean>

  <bean id="PROPAGATION_REQUIRES_NEW" class="org.springframework.transaction.support.TransactionTemplate">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
  </bean>

In your SpringRouteBuilder class, you need to create new SpringTransactionPolicy objects for each of the templates. For example:

// Java
public MyRouteBuilder extends SpringRouteBuilder {
  public void configure() {
    ...
    Policy required = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRED"));
    Policy notsupported = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_NOT_SUPPORTED"));
    Policy requirenew = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRES_NEW"));
    ...
  }
}
[Note]Note

The org.apache.camel.spring.SpringRouteBuilder class is a special implementation of the RouteBuilder class provided by the FUSE Mediation Router Spring component. It is required for any routes that use Spring transactions. The SpringRouteBuilder.bean() method provides a shortcut for looking up bean references in the Spring configuration file.

You can then use the transaction policy objects in your route definitions, as follows:

// Send to bar in a new transaction
from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar");

// Send to bar without a transaction.
from("activemq:queue:foo").policy(notsupported ).to("activemq:queue:bar");