In Fuse Mediation Router, pipelining is the dominant paradigm for connecting nodes in a route
definition. The pipeline concept is probably most familiar to users of the UNIX operating
system, where it is used to join operating system commands. For example, ls |
more
is an example of a command that pipes a directory listing, ls
, to
the page-scrolling utility, more
. The basic idea of a pipeline is that the
output of one command is fed into the input of
the next. The natural analogy in the case of a route is for the Out
message from one processor to be copied to the In message of the next
processor.
Every node in a route, except for the initial endpoint, is a
processor, in the sense that they inherit from the
org.apache.camel.Processor
interface. In other words, processors make up the
basic building blocks of a DSL route. For example, DSL commands such as
filter()
, delayer()
, setBody()
,
setHeader()
, and to()
all represent processors. When considering
how processors connect together to build up a route, it is important to distinguish two
different processing approaches.
The first approach is where the processor simply modifies the exchange's
In message, as shown in Figure 2.1. The exchange's
Out message remains null
in this case.
The following route shows a setHeader()
command that modifies the current
In message by adding (or modifying) the BillingSystem
heading:
from("activemq:orderQueue") .setHeader("BillingSystem", xpath("/order/billingSystem")) .to("activemq:billingQueue");
The second approach is where the processor creates an Out message to represent the result of the processing, as shown in Figure 2.2.
The following route shows a transform()
command that creates an
Out message with a message body containing the string,
DummyBody
:
from("activemq:orderQueue") .transform(constant("DummyBody")) .to("activemq:billingQueue");
where constant("DummyBody")
represents a constant expression. You cannot
pass the string, DummyBody
, directly, because the argument to
transform()
must be an expression type.
Figure 2.3 shows an example of a processor pipeline for InOnly exchanges. Processor A acts by modifying the In message, while processors B and C create an Out message. The route builder links the processors together as shown. In particular, processors B and C are linked together in the form of a pipeline: that is, processor B's Out message is moved to the In message before feeding the exchange into processor C, and processor C's Out message is moved to the In message before feeding the exchange into the producer endpoint. Thus the processors' outputs and inputs are joined into a continuous pipeline, as shown in Figure 2.3.
Fuse Mediation Router employs the pipeline pattern by default, so you do not need to use any special
syntax to create a pipeline in your routes. For example, the following route pulls messages
from a userdataQueue
queue, pipes the message through a Velocity template (to
produce a customer address in text format), and then sends the resulting text address to the
queue, envelopeAddressQueue
:
from("activemq:userdataQueue") .to(ExchangePattern.InOut, "velocity:file:AdressTemplate.vm") .to("activemq:envelopeAddresses");
Where the Velocity endpoint, velocity:file:AdressTemplate.vm
, specifies the
location of a Velocity template file, file:AdressTemplate.vm
, in the file
system. The to()
command changes the exchange pattern to
InOut before sending the exchange to the Velocity endpoint and then
changes it back to InOnly afterwards. For more details of the Velocity
endpoint, see Velocity in EIP Component Reference.
Figure 2.4 shows an example of a processor pipeline for InOut exchanges, which you typically use to support remote procedure call (RPC) semantics. Processors A, B, and C are linked together in the form of a pipeline, with the output of each processor being fed into the input of the next. The final Out message produced by the producer endpoint is sent all the way back to the consumer endpoint, where it provides the reply to the original request.
Note that in order to support the InOut exchange pattern, it is essential that the last node in the route (whether it is a producer endpoint or some other kind of processor) creates an Out message. Otherwise, any client that connects to the consumer endpoint would hang and wait indefinitely for a reply message. You should be aware that not all producer endpoints create Out messages.
Consider the following route that processes payment requests, by processing incoming HTTP requests:
from("jetty:http://localhost:8080/foo") .to("cxf:bean:addAccountDetails") .to("cxf:bean:getCreditRating") .to("cxf:bean:processTransaction");
Where the incoming payment request is processed by passing it through a pipeline of Web
services, cxf:bean:addAccountDetails
, cxf:bean:getCreditRating
,
and cxf:bean:processTransaction
. The final Web service,
processTransaction
, generates a response (Out message)
that is sent back through the JETTY endpoint.
When the pipeline consists of just a sequence of endpoints, it is also possible to use the following alternative syntax:
from("jetty:http://localhost:8080/foo") .pipeline("cxf:bean:addAccountDetails", "cxf:bean:getCreditRating", "cxf:bean:processTransaction");
The pipeline for InOptionalOut exchanges is essentially the same as
the pipeline in Figure 2.4. The
difference between InOut and InOptionalOut is that
an exchange with the InOptionalOut exchange pattern is allowed to have
a null Out message as a reply. That is, in the case of an
InOptionalOut exchange, a null
Out message is copied to the In message of the
next node in the pipeline. By contrast, in the case of an InOut
exchange, a null
Out message is discarded and the original In
message from the current node would be copied to the In message of the
next node instead.