LibraryLink ToToggle FramesPrintFeedback

Processors

To enable the router to do something more interesting than simply connecting a source endpoint to a target endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. FUSE Mediation Router provides a wide variety of different processors, as follows:

The filter() processor can be used to prevent uninteresting messages from reaching the target endpoint. It takes a single predicate argument: if the predicate is true, the message exchange is allowed through to the target; if the predicate is false, the message exchange is blocked. For example, the following filter blocks a message exchange, unless the incoming message contains a header, foo, with value equal to bar:

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

The choice() processor is a conditional statement that is used to route incoming messages to alternative targets. Each alternative target is preceded by a when() method, which takes a predicate argument. If the predicate is true, the following target is selected, otherwise processing proceeds to the next when() method in the rule. For example, the following choice() processor directs incoming messages to either Target1, Target2, or Target3, depending on the values of Predicate1 and Predicate2:

from("SourceURL").choice().when(Predicate1).to("Target1")
                    .when(Predicate2).to("Target2")
                    .otherwise().to("Target3");

The pipeline() processor links a chain of targets together, where the output of one target is fed into the input of the next target in the pipeline (analogous to the UNIX pipe command). The pipeline() method takes an arbitrary number of endpoint arguments, which specify the sequence of endpoints in the pipeline. For example, to pass messages from SourceURL to Target1 to Target2 to Target3 in a pipeline, you can use the following rule:

from("SourceURL").pipeline("Target1","Target2","Target3");

If you want the messages from a source endpoint, SourceURL, to be sent to more than one target, you can use two alternative approaches. The first is to invoke the to() method with multiple target endpoints (static recipient list), for example:

from("SourceURL").to("Target1","Target2","Target3");

The second is to invoke the recipientList() processor, which takes as its argument a list of recipients (dynamic recipient list). The advantage of the recipientList() processor is that the list of recipients can be calculated at runtime. For example, the following rule generates a recipient list by reading the contents of the recipientListHeader from the incoming message:

from("SourceURL").recipientList(header("recipientListHeader").tokenize(","));

The splitter() processor splits a message into parts, which are then processed as separate messages. The splitter() method takes a list argument, where each item in the list represents a message part that is re-sent as a separate message. For example, the following rule splits the body of an incoming message into separate lines and then sends each line to the target in a separate message:

from("SourceURL").splitter(bodyAs(String.class).tokenize("\n")).to("TargetURL");

The aggregator() processor aggregates related incoming messages into a single message. To distinguish which messages are eligible to be aggregated together, you define a correlation key for the aggregator. This key is normally derived from a field in the messag, such as a header field. Messages that have the same correlation key value are eligible to be aggregated together. You can also optionally specify an aggregation algorithm to the aggregator() processor. The default algorithm is to pick the latest message with a given value of the correlation key and to discard the older messages with that correlation key value).

For example, if you are monitoring a data stream that reports stock prices in real time, you might only be interested in the latest price of each stock symbol. In this case, you can configure an aggregator to transmit only the latest price for a given stock and discard the older (out-of-date) price notifications. The following rule implements this functionality, where the correlation key is read from the stockSymbol header and the default aggregator algorithm is used:

from("SourceURL").aggregator(header("stockSymbol")).to("TargetURL");

A resequencer() processor re-orders incoming messages and forwards them to the target. The resequencer() method takes a sequence number as its argument; this number is calculated from the contents of a field in the incoming message. The resequencer() processor waits for a specified amount of time to accumulate messages before reordering and forwarding them. You can specify the wait time in two ways:

The throttler() processor ensures that a target endpoint does not get overloaded. The throttler works by limiting the number of messages that can pass through per second. If the incoming messages exceed the specified rate, the throttler accumulates excess messages in a buffer and transmits them more slowly to the target endpoint. For example, to limit the rate of throughput to 100 messages per second, you can define the following rule:

from("SourceURL").throttler(100).to("TargetURL");

The delayer() processor delays messages for a specified length of time. The delay can either be relative (wait a specified length of time after receipt of the incoming message) or absolute (wait until a specific time). For example, to add a delay of 2 seconds before transmitting received messages, you can use the following rule:

from("SourceURL").delayer(2000).to("TargetURL");

To wait until the absolute time specified in the processAfter header, you can use the following rule:

from("SourceURL").delayer(header("processAfter").to("TargetURL");

The delayer() method is overloaded, such that an integer is interpreted as a relative delay and an expression (for example, a string) is interpreted as an absolute delay.

If none of the standard processors described here provide the functionality you need, you can always define your own custom processor. To create a custom processor, define a class that implements the org.apache.camel.Processor interface and overrides the process() method. The following custom processor, MyProcessor, removes the header named foo from incoming messages:


To insert the custom processor into a router rule, invoke the process() method, which provides a generic mechanism for inserting processors into rules. For example, the following rule invokes the processor defined in Example 2.2:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");