Fuse Mediation Router provides a simple mechanism for initiating a transaction in a route, by
inserting the transacted()
command in the Java DSL or by inserting the
<transacted/>
tag in the XML DSL.
Figure 5.1 shows an example of a route that
is made transactional by adding the transacted()
DSL command to the
route. All of the route nodes following the transacted()
node are
included in the transaction scope. In this example, the two following nodes access a
JDBC resource.
The transacted
processor demarcates transactions as follows: when an
exchange enters the transacted
processor, the transacted
processor invokes the default transaction manager to begin a
transaction (attaching it to the current thread); when the exchange reaches the end
of the remaining route, the transacted
processor invokes the
transaction manager to commit the current transaction.
The following Java DSL example shows how to define a transactional route by
marking the route with the transacted()
DSL command:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}
In this example, the file
endpoint reads some files in XML format
that describe a transfer of funds from one account to another. The first
beanRef()
invocation credits the specified sum of money to the
beneficiary's account and then the second beanRef()
invocation
subtracts the specified sum of money from the sender's account. Both of the
beanRef()
invocations cause updates to be made to a database
resource, which we are assuming is bound to the transaction through the transaction
manager (for example, see JDBC Data Source). For a
sample implementation of the accountService
bean, see Spring JDBC Template.
The beanRef()
Java DSL command is available only
in the SpringRouteBuilder
class. It enables you to reference a bean by
specifying the bean's Spring registry ID (for example, accountService
).
If you do not use the beanRef()
command, you could inherit from the
org.apache.camel.builder.RouteBuilder
class instead.
The preceding route can equivalently be expressed in Spring XML, where the
<transacted/>
tag is used to mark the route as transactional, as
follows:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... > <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file:src/data?noop=true"/> <transacted/> <bean ref="accountService" method="credit"/> <bean ref="accountService" method="debit"/> </route> </camelContext> </beans>
To demarcate transactions, the transacted
processor must be
associated with a particular transaction manager instance. To save you having to
specify the transaction manager every time you invoke transacted()
, the
transacted
processor automatically picks a sensible default. For
example, if there is only one instance of a transaction manager in your Spring
configuration, the transacted
processor implicitly picks this
transaction manager and uses it to demarcate transactions.
A transacted
processor can also be configured with a transacted
policy, of TransactedPolicy
type, which encapsulates a propagation
policy and a transaction manager (see Propagation Policies
for details). The following rules are used to pick the default transaction manager
or transaction policy:
If there is only one bean of
org.apache.camel.spi.TransactedPolicy
type, use this bean.Note The
TransactedPolicy
type is a base type of theSpringTransactionPolicy
type that is described in Propagation Policies. Hence, the bean referred to here could be aSpringTransactionPolicy
bean.If there is a bean of type,
org.apache.camel.spi.TransactedPolicy
, which has the ID,PROPAGATION_REQUIRED
, use this bean.If there is only one bean of
org.springframework.transaction.PlatformTransactionManager
type, use this bean.
You also have the option of specifying a bean explicitly by providing the bean ID
as an argument to transacted()
—see Sample route with PROPAGATION_NEVER policy in Java DSL.
If you insert a transacted
processor into a route, a new transaction
is created each time an exchange passes through this node and the transaction's
scope is defined as follows:
The transaction is associated with the current thread only.
The transaction scope encompasses all of the route nodes following the
transacted
processor.
In particular, all of the route nodes preceding the
transacted
processor are not included in the transaction (but the
situation is different, if the route begins with a transactional endpoint—see
Demarcation by Transactional Endpoints). For example, the following route
is incorrect, because the transacted()
DSL command mistakenly appears
after the first beanRef()
call (which accesses the database
resource):
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.beanRef("accountService","credit")
.transacted() // <-- WARNING: Transaction started in the wrong place!
.beanRef("accountService","debit");
}
}
It is crucial to understand that a given transaction is associated with the current thread only. It follows that you must not create a thread pool in the middle of a transactional route, because the processing in the new threads will not participate in the current transaction. For example, the following route is bound to cause problems:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.threads(3) // WARNING: Subthreads are not in transaction scope!
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}
A route like the preceding one is certain to corrupt your database, because the
threads()
DSL command is incompatible with transacted routes. Even
if the threads()
call precedes the transacted()
call, the
route will not behave as expected.
If you want to break a route into fragments and have each route fragment
participate in the current transaction, you can use direct:
endpoints.
For example, to send exchanges to separate route fragments, depending on whether the
transfer amount is big (greater than 100) or small (less than or equal to 100), you
can use the choice()
DSL command and direct
endpoints, as
follows:
// Java import org.apache.camel.spring.SpringRouteBuilder; public class MyRouteBuilder extends SpringRouteBuilder { ... public void configure() { from("file:src/data?noop=true") .transacted() .beanRef("accountService","credit") .choice().when(xpath("/transaction/transfer[amount > 100]")) .to("direct:txbig") .otherwise() .to("direct:txsmall"); from("direct:txbig") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") .to("file:target/messages/big"); from("direct:txsmall") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") .to("file:target/messages/small"); } }
Both the fragment beginning with direct:txbig
and the fragment
beginning with direct:txsmall
participate in the current transaction,
because the direct
endpoints are synchronous. This
means that the fragments execute in the same thread as the first route fragment and,
therefore, they are included in the same transaction scope.
![]() | Note |
---|---|
You must not use |
The following Fuse Mediation Router components act as resource endpoints
when they appear as the destination of a route (for example, if they appear in the
to()
DSL command). That is, these endpoints can access a
transactional resource, such as a database or a persistent queue. The resource
endpoints can participate in the current transaction, as long as they are associated
with the same transaction manager as the
transacted
processor that initiated the current transaction. If you
need to access multiple resources, you must deploy your application in a J2EE
container, which gives you access to a global transaction manager.
JMS in EIP Component Reference
ActiveMQ in EIP Component Reference
AMQP in EIP Component Reference
JavaSpace in EIP Component Reference
JPA in EIP Component Reference
Hibernate in EIP Component Reference
iBatis in EIP Component Reference
JBI in EIP Component Reference
JCR in EIP Component Reference
JDBC in EIP Component Reference
LDAP in EIP Component Reference
For example, the following route sends the order for a money transfer to two
different JMS queues: the credits
queue processes the order to credit
the receiver's account; and the debits queue processes the order to
debit
the sender's account. Since there must only be a credit, if
there is a corresponding debit, it makes sense to enclose the enqueueing operations
in a single transaction. If the transaction succeeds, both the credit order and the
debit order will be enqueued, but if an error occurs, neither
order will be enqueued.
from("file:src/data?noop=true") .transacted() .to("jmstx:queue:credits") .to("jmstx:queue:debits");