In FUSE Mediation Router, pipelining is the dominant paradigm for connecting nodes in a route
definition. The pipeline concept is probably most familiar to users of the UNIX operating
system, where it is used to join operating system commands. For example, ls |
more
is an example of a command that pipes a directory listing, ls
, to
the page-scrolling utility, more
. The basic idea of a pipeline is that the
output of one command is fed into the input of
the next. The natural analogy in the case of a route is for the Out
message from one processor to be copied to the In message of the next
processor.
Every node in a route, except for the initial endpoint, is a
processor, in the sense that they inherit from the
org.apache.camel.Processor
interface. In other words, processors make up the
basic building blocks of a DSL route. For example, DSL commands such as
filter()
, delayer()
, setBody()
,
setHeader()
, and to()
all represent processors. When considering
how processors connect together to build up a route, it is important to distinguish two
different processing approaches.
The first approach is where the processor simply modifies the exchange's
In message, as shown in Figure 3.1. The exchange's
Out message remains null
in this case.
The following route shows a setHeader()
command that modifies the current
In message by adding (or modifying) the BillingSystem
heading:
from("activemq:orderQueue") .setHeader("BillingSystem", xpath("/order/billingSystem")) .to("activemq:billingQueue");
The second approach is where the processor creates an Out message to represent the result of the processing, as shown in Figure 3.2.
The following route shows a transform()
command that creates an
Out message with a message body containing the string,
DummyBody
:
from("activemq:orderQueue") .transform(constant("DummyBody")) .to("activemq:billingQueue");
where constant("DummyBody")
represents a constant expression. You cannot
pass the string, DummyBody
, directly, because the argument to
transform()
must be an expression type.
Figure 3.3 shows an example of a processor pipeline for InOnly exchanges. Processor A acts by modifying the In message, while processors B and C create an Out message. The route builder links the processors together as shown. In particular, processors B and C are linked together in the form of a pipeline: that is, processor B's Out message is moved to the In message before feeding the exchange into processor C, and processor C's Out message is moved to the In message before feeding the exchange into the producer endpoint. Thus the processors' outputs and inputs are joined into a continuous pipeline, as shown in Figure 3.3.
FUSE Mediation Router employs the pipeline pattern by default, so you do not need to use any special
syntax to create a pipeline in your routes. For example, the following route pulls messages
from a userdataQueue
queue, pipes the message through a Velocity template (to
produce a customer address in text format), and then sends the resulting text address to the
queue, envelopeAddressQueue
:
from("activemq:userdataQueue") .to("velocity:file:AdressTemplate.vm") .to("activemq:envelopeAddresses");
Where the Velocity endpoint, velocity:file:AdressTemplate.vm
, specifies the
location of a Velocity template file, file:AdressTemplate.vm
, in the file
system. For more details of the Velocity endpoint, see ????.
Figure 3.4 shows an example of a processor pipeline for InOut exchanges, which you typically use to support remote procedure call (RPC) semantics. Processors A, B, and C are linked together in the form of a pipeline, with the output of each processor being fed into the input of the next. The final Out message produced by the producer endpoint is sent all the way back to the consumer endpoint, where it provides the reply to the original request.
Note that in order to support the InOut exchange pattern, it is essential that the last node in the route (whether it is a producer endpoint or some other kind of processor) creates an Out message. Otherwise, any client that connects to the consumer endpoint would hang and wait indefinitely for a reply message. You should be aware that not all producer endpoints create Out messages.
Consider the following route that processes payment requests, by processing incoming HTTP requests:
from("jetty:http://localhost:8080/foo") .to("cxf:bean:addAccountDetails") .to("cxf:bean:getCreditRating") .to("cxf:bean:processTransaction");
Where the incoming payment request is processed by passing it through a pipeline of Web
services, cxf:bean:addAccountDetails
, cxf:bean:getCreditRating
,
and cxf:bean:processTransaction
. The final Web service,
processTransaction
, generates a response (Out message)
that is sent back through the JETTY endpoint.
When the pipeline consists of just a sequence of endpoints, it is also possible to use the following alternative syntax:
from("jetty:http://localhost:8080/foo") .pipeline("cxf:bean:addAccountDetails", "cxf:bean:getCreditRating", "cxf:bean:processTransaction");
An alternative paradigm for linking together the nodes of a route is interceptor chaining, where a processor in the route processes the exchange both before and after dispatching the exchange to the next processor in the chain. This style of processing is also supported by FUSE Mediation Router, but it is not the usual approach to use. Figure 3.5 shows an example of an interceptor processor that implements a custom encryption algorithm.
In this example, incoming messages are encrypted in a custom format. The interceptor
first decrypts the In message, then dispatches it to the Web services
endpoint, cxf:bean:processTxn
, and finally, the reply (Out
message) is encrypted using the custom format, before being sent back through the consumer
endpoint. Using the interceptor chaining approach, therefore, a single interceptor instance
can modify both the request and the response.
For example, if you want to define a route with an HTTP port that services incoming requests encoded using custom encryption, you can define a route like the following:
from("jetty:http://localhost:8080/foo") .intercept(new MyDecryptEncryptInterceptor()) .to("cxf:bean:processTxn");
Where the class, MyDecryptEncryptInterceptor
, is implemented by inheriting
from the class, org.apache.camel.processor.DelegateProcessor
. For details of
how to implement this kind of processor, see ????.
Although it is possible to implement this kind of functionality using an interceptor processor, this is not a very idiomatic way of programming in FUSE Mediation Router. A more typical approach is shown in Figure 3.6.
In this example, the encrypt functionality is implemented in a separate processor from
the decrypt functionality. The resulting processor pipeline is semantically equivalent to
the original interceptor chain shown in Figure 3.5. One slight complication of this
route, however, is that it turns out to be necessary to add a transform
processor at the end of the route; the transform
processor copies the
In message to the Out message, ensuring that the
reply message is available to the HTTP consumer endpoint. An alternative solution to this
problem is to implement the encrypt processor so that it creates an Out
message directly.
To implement the pipeline approach shown in Figure 3.6, you can define a route like the following:
from("jetty:http://localhost:8080/foo") .process(new MyDecryptProcessor()) .to("cxf:bean:processTxn") .process(new MyEncryptProcessor()) .transform(body());
Where the final processor node, transform(body())
, has the effect of
copying the In message to the Out message (the
In message body is copied explicitly and the In
message headers are copied implicitly).