LibraryLink ToToggle FramesPrintFeedback

Pipeline Processing

In FUSE Mediation Router, pipelining is the dominant paradigm for connecting nodes in a route definition. The pipeline concept is probably most familiar to users of the UNIX operating system, where it is used to join operating system commands. For example, ls | more is an example of a command that pipes a directory listing, ls, to the page-scrolling utility, more. The basic idea of a pipeline is that the output of one command is fed into the input of the next. The natural analogy in the case of a route is for the Out message from one processor to be copied to the In message of the next processor.

Every node in a route, except for the initial endpoint, is a processor, in the sense that they inherit from the org.apache.camel.Processor interface. In other words, processors make up the basic building blocks of a DSL route. For example, DSL commands such as filter(), delayer(), setBody(), setHeader(), and to() all represent processors. When considering how processors connect together to build up a route, it is important to distinguish two different processing approaches.

The first approach is where the processor simply modifies the exchange's In message, as shown in Figure 3.1. The exchange's Out message remains null in this case.


The following route shows a setHeader() command that modifies the current In message by adding (or modifying) the BillingSystem heading:

from("activemq:orderQueue")
    .setHeader("BillingSystem", xpath("/order/billingSystem"))
    .to("activemq:billingQueue");

The second approach is where the processor creates an Out message to represent the result of the processing, as shown in Figure 3.2.


The following route shows a transform() command that creates an Out message with a message body containing the string, DummyBody:

from("activemq:orderQueue")
    .transform(constant("DummyBody"))
    .to("activemq:billingQueue");

where constant("DummyBody") represents a constant expression. You cannot pass the string, DummyBody, directly, because the argument to transform() must be an expression type.

Figure 3.3 shows an example of a processor pipeline for InOnly exchanges. Processor A acts by modifying the In message, while processors B and C create an Out message. The route builder links the processors together as shown. In particular, processors B and C are linked together in the form of a pipeline: that is, processor B's Out message is moved to the In message before feeding the exchange into processor C, and processor C's Out message is moved to the In message before feeding the exchange into the producer endpoint. Thus the processors' outputs and inputs are joined into a continuous pipeline, as shown in Figure 3.3.


FUSE Mediation Router employs the pipeline pattern by default, so you do not need to use any special syntax to create a pipeline in your routes. For example, the following route pulls messages from a userdataQueue queue, pipes the message through a Velocity template (to produce a customer address in text format), and then sends the resulting text address to the queue, envelopeAddressQueue:

from("activemq:userdataQueue")
    .to("velocity:file:AdressTemplate.vm")
    .to("activemq:envelopeAddresses");

Where the Velocity endpoint, velocity:file:AdressTemplate.vm, specifies the location of a Velocity template file, file:AdressTemplate.vm, in the file system. For more details of the Velocity endpoint, see ????.

Figure 3.4 shows an example of a processor pipeline for InOut exchanges, which you typically use to support remote procedure call (RPC) semantics. Processors A, B, and C are linked together in the form of a pipeline, with the output of each processor being fed into the input of the next. The final Out message produced by the producer endpoint is sent all the way back to the consumer endpoint, where it provides the reply to the original request.


Note that in order to support the InOut exchange pattern, it is essential that the last node in the route (whether it is a producer endpoint or some other kind of processor) creates an Out message. Otherwise, any client that connects to the consumer endpoint would hang and wait indefinitely for a reply message. You should be aware that not all producer endpoints create Out messages.

Consider the following route that processes payment requests, by processing incoming HTTP requests:

from("jetty:http://localhost:8080/foo")
    .to("cxf:bean:addAccountDetails")
    .to("cxf:bean:getCreditRating")
    .to("cxf:bean:processTransaction");

Where the incoming payment request is processed by passing it through a pipeline of Web services, cxf:bean:addAccountDetails, cxf:bean:getCreditRating, and cxf:bean:processTransaction. The final Web service, processTransaction, generates a response (Out message) that is sent back through the JETTY endpoint.

When the pipeline consists of just a sequence of endpoints, it is also possible to use the following alternative syntax:

from("jetty:http://localhost:8080/foo")
    .pipeline("cxf:bean:addAccountDetails", "cxf:bean:getCreditRating", "cxf:bean:processTransaction");

An alternative paradigm for linking together the nodes of a route is interceptor chaining, where a processor in the route processes the exchange both before and after dispatching the exchange to the next processor in the chain. This style of processing is also supported by FUSE Mediation Router, but it is not the usual approach to use. Figure 3.5 shows an example of an interceptor processor that implements a custom encryption algorithm.


In this example, incoming messages are encrypted in a custom format. The interceptor first decrypts the In message, then dispatches it to the Web services endpoint, cxf:bean:processTxn, and finally, the reply (Out message) is encrypted using the custom format, before being sent back through the consumer endpoint. Using the interceptor chaining approach, therefore, a single interceptor instance can modify both the request and the response.

For example, if you want to define a route with an HTTP port that services incoming requests encoded using custom encryption, you can define a route like the following:

from("jetty:http://localhost:8080/foo")
    .intercept(new MyDecryptEncryptInterceptor())
    .to("cxf:bean:processTxn");

Where the class, MyDecryptEncryptInterceptor, is implemented by inheriting from the class, org.apache.camel.processor.DelegateProcessor. For details of how to implement this kind of processor, see ????.

Although it is possible to implement this kind of functionality using an interceptor processor, this is not a very idiomatic way of programming in FUSE Mediation Router. A more typical approach is shown in Figure 3.6.


In this example, the encrypt functionality is implemented in a separate processor from the decrypt functionality. The resulting processor pipeline is semantically equivalent to the original interceptor chain shown in Figure 3.5. One slight complication of this route, however, is that it turns out to be necessary to add a transform processor at the end of the route; the transform processor copies the In message to the Out message, ensuring that the reply message is available to the HTTP consumer endpoint. An alternative solution to this problem is to implement the encrypt processor so that it creates an Out message directly.

To implement the pipeline approach shown in Figure 3.6, you can define a route like the following:

from("jetty:http://localhost:8080/foo")
    .process(new MyDecryptProcessor())
    .to("cxf:bean:processTxn")
    .process(new MyEncryptProcessor())
    .transform(body());

Where the final processor node, transform(body()), has the effect of copying the In message to the Out message (the In message body is copied explicitly and the In message headers are copied implicitly).