To enable the router to do something more interesting than simply connecting a source endpoint to a target endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. FUSE Mediation Router provides a wide variety of different processors, as follows:
The filter()
processor can be used to prevent uninteresting messages
from reaching the target endpoint. It takes a single predicate argument: if the predicate is
true, the message exchange is allowed through to the target; if the predicate is false, the
message exchange is blocked. For example, the following filter blocks a message exchange,
unless the incoming message contains a header, foo
, with value equal to
bar
:
from("SourceURL
").filter(header("foo").isEqualTo("bar")).to("TargetURL
");
The choice()
processor is a conditional statement that is used to
route incoming messages to alternative targets. Each alternative target is preceded by a
when()
method, which takes a predicate argument. If the predicate is
true, the following target is selected, otherwise processing proceeds to the next
when()
method in the rule. For example, the following
choice()
processor directs incoming messages to either
Target1
, Target2
, or
Target3
, depending on the values of
Predicate1
and Predicate2
:
from("SourceURL
").choice().when(Predicate1
).to("Target1
") .when(Predicate2
).to("Target2
") .otherwise().to("Target3
");
The pipeline()
processor links a chain of targets together, where the
output of one target is fed into the input of the next target in the pipeline (analogous to
the UNIX pipe
command). The pipeline()
method takes an
arbitrary number of endpoint arguments, which specify the sequence of endpoints in the
pipeline. For example, to pass messages from SourceURL
to
Target1
to Target2
to
Target3
in a pipeline, you can use the following
rule:
from("SourceURL
").pipeline("Target1
","Target2
","Target3
");
If you want the messages from a source endpoint, SourceURL
,
to be sent to more than one target, you can use two alternative approaches. The first is to
invoke the to()
method with multiple target endpoints (static recipient
list), for example:
from("SourceURL
").to("Target1
","Target2
","Target3
");
The second is to invoke the recipientList()
processor, which takes as
its argument a list of recipients (dynamic recipient list). The advantage of the
recipientList()
processor is that the list of recipients can be
calculated at runtime. For example, the following rule generates a recipient list by reading
the contents of the recipientListHeader
from the incoming
message:
from("SourceURL
").recipientList(header("recipientListHeader").tokenize(","));
The splitter()
processor splits a message into parts, which are then
processed as separate messages. The splitter()
method takes a list
argument, where each item in the list represents a message part that is re-sent as a
separate message. For example, the following rule splits the body of an incoming message
into separate lines and then sends each line to the target in a separate
message:
from("SourceURL
").splitter(bodyAs(String.class).tokenize("\n")).to("TargetURL
");
The aggregator()
processor aggregates related incoming messages into
a single message. To distinguish which messages are eligible to be aggregated together, you
define a correlation key for the aggregator. This key is normally
derived from a field in the messag, such as a header field. Messages that have the same
correlation key value are eligible to be aggregated together. You can also optionally
specify an aggregation algorithm to the aggregator()
processor. The
default algorithm is to pick the latest message with a given value of the correlation key
and to discard the older messages with that correlation key value).
For example, if you are monitoring a data stream that reports stock prices in real time,
you might only be interested in the latest price of each stock symbol. In this case, you can
configure an aggregator to transmit only the latest price for a given stock and discard the
older (out-of-date) price notifications. The following rule implements this functionality,
where the correlation key is read from the stockSymbol
header and the
default aggregator algorithm is
used:
from("SourceURL
").aggregator(header("stockSymbol")).to("TargetURL
");
A resequencer()
processor re-orders incoming messages and forwards
them to the target. The resequencer()
method takes a sequence number as
its argument; this number is calculated from the contents of a field in the incoming
message. The resequencer()
processor waits for a specified amount of time
to accumulate messages before reordering and forwarding them. You can specify the wait time
in two ways:
Batch resequencing — Wait until a specified number of
messages have accumulated before reordering and forwarding them. This is the default
processing option. You specify this option by invoking
resequencer().batch()
. For example, the following resequencing
rule reorders messages based on the timeOfDay
header, waiting until
at least 300 messages have accumulated or until 4000 ms have elapsed since the last
message was received:
from("SourceURL
").resequencer(header("timeOfDay").batch(new BatchResequencerConfig(300, 4000L)).to("TargetURL
");
Stream resequencing — Transmits messages as soon as they
arrive, unless the resequencer detects a gap in the incoming message stream (missing
sequence numbers). If a gap occurs, the resequencer waits until the missing messages
arrive and then forwards them in the correct order. To avoid the resequencer blocking
forever, you can specify a timeout (the default is 1000 ms), after which time the
message sequence is transmitted with unresolved gaps. For example, the following
resequencing rule detects gaps in the message stream by monitoring the value of the
sequenceNumber
header, where the maximum buffer size is limited
to 5000 and the timeout is specified to be 4000 ms:
from("SourceURL
").resequencer(header("sequenceNumber")).stream(new StreamResequencerConfig(5000, 4000L)).to("TargetURL
");
The throttler()
processor ensures that a target endpoint does not get
overloaded. The throttler works by limiting the number of messages that can pass through per
second. If the incoming messages exceed the specified rate, the throttler accumulates excess
messages in a buffer and transmits them more slowly to the target endpoint. For example, to
limit the rate of throughput to 100 messages per second, you can define the following
rule:
from("SourceURL
").throttler(100).to("TargetURL
");
The delayer()
processor delays messages for a specified length of
time. The delay can either be relative (wait a specified length of time after receipt of the
incoming message) or absolute (wait until a specific time). For example, to add a delay of 2
seconds before transmitting received messages, you can use the following
rule:
from("SourceURL
").delayer(2000).to("TargetURL
");
To wait until the absolute time specified in the processAfter
header,
you can use the following
rule:
from("SourceURL
").delayer(header("processAfter").to("TargetURL
");
The delayer()
method is overloaded, such that an integer is
interpreted as a relative delay and an expression (for example, a string) is interpreted as
an absolute delay.
If none of the standard processors described here provide the functionality you need,
you can always define your own custom processor. To create a custom processor, define a
class that implements the org.apache.camel.Processor
interface and
overrides the process()
method. The following custom processor,
MyProcessor
, removes the header named foo
from
incoming messages:
Example 2.2. Implementing a Custom Processor Class
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
To insert the custom processor into a router rule, invoke the
process()
method, which provides a generic mechanism for inserting
processors into rules. For example, the following rule invokes the processor defined in
Example 2.2:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL
").process(myProc).to("TargetURL
");