To enable the router to something more interesting than simply connecting a source endpoint to a target endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule in order to perform arbitrary processing of the messages that flow through the rule. FUSE Mediation Router provides a wide variety of different processors, as follows:
The filter
processor can be used to prevent uninteresting messages
from reaching the target endpoint. It takes a single predicate argument — if the
predicate is true, the message exchange is allowed through to the target; if the predicate
is false, the message exchange is blocked. In the following example, the filter blocks a
message exchange, unless the incoming message contains a header, foo
,
with value equal to bar
:
<camelContext id="filterRoute" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL
"/> </filter> </route> </camelContext>
The choice
processor is a conditional statement that routes incoming
messages to alternative targets. Each target is enclosed in a when
element, which takes a predicate argument. If the predicate is true, the current target is
selected; if false, processing proceeds to the next when
element in the
rule. In the following example, the choice()
processor directs incoming
messages to either Target1
, Target2
,
or Target3
, depending on the values of the predicates:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1
"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2
"/> </when> <otherwise> <to uri="Target3
"/> </otherwise> </choice> </route> </camelContext>
If you want the messages from a source endpoint, SourceURL
,
to be sent to more than one target, there are two alternative approaches. The first is to
include multiple to
elements in the route:
<camelContext id="staticRecipientList" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <to uri="Target1
"/> <to uri="Target2
"/> <to uri="Target3
"/> </route> </camelContext>
The second is to add a recipientList
element, which takes a list of
recipients as its argument (dynamic recipient list). The advantage of using the
recipientList
element is that the list of recipients can be calculated
at runtime. For example, the following rule generates a recipient list by reading the
contents of the recipientListHeader
from the incoming
message:
<camelContext id="dynamicRecipientList" xmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<from uri="SourceURL
"/>
<recipientList>
<!-- Requires XPath 2.0 -->
<xpath>tokenize(/headers/recipientListHeader,"\s+")</xpath>
</recipientList>
</route>
</camelContext>
The splitter
processor splits a message into parts, which are then
processed as separate messages. The splitter
element must contain an
expression that returns a list, where each item in the list represents a message part that
is to be re-sent as a separate message. The following example splits the body of an incoming
message into separate sections (represented by a top-level section
element) and then sends each section to the target in a separate
message:
<camelContext id="splitterRoute" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="seda:a"/> <splitter> <xpath>/section</xpath> <to uri="seda:b"/> </splitter> </route> </camelContext>
The aggregator
processor aggregates related incoming messages into a
single message. In order to distinguish which messages are eligible to be aggregated
together, you must define a correlation key for the aggregator. The
correlation key is normally derived from a field in the message (for example, a header
field). Messages that have the same correlation key value are eligible to be aggregated
together. You can also optionally specify an aggregation algorithm to the
aggregator
processor. The default algorithm is to pck the latest
message with a given value of the correlation key and to discard the older messages with
that correlation key value.
For example, if you are monitoring a data stream that reports stock prices in real time,
you might only be interested in the latest price of each stock symbol.
In this case, you could configure an aggregator to transmit only the latest price for a
given stock and discard the older (out-of-date) price notifications. The following rule
implements this functionality, where the correlation key is read from the
stockSymbol
header and the default aggregator algorithm is
used:
<camelContext id="aggregatorRoute" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <aggregator> <simple>header.stockSymbol</simple> <to uri="TargetURL
"/> </aggregator> </route> </camelContext>
A resequencer
processor reorders incoming messages and forwards them
to the target. The resequencer
element must be provided with a sequence
number calculated from the contents of a field in the incoming message. Before you can start
re-ordering messages, you must wait until a certain number of messages have been received
from the source. There are two ways to specify how long the resequencer
processor waits before attempting to re-order the accumulated messages and forward them to
the target. They are:
Batch resequencing — Waits until a specified number of
messages have accumulated before starting to re-order and forward messages. This is
the default. For example, the following resequencing rule re-orders messages based on
the timeOfDay
header, waiting until either 300 messages have
accumulated or 4000 ms have elapsed since the last message was received:
<camelContext id="batchResequencer" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
" /> <resequencer> <!-- Sequence ordering based on timeOfDay header --> <simple>header.timeOfDay</simple> <to uri="TargetURL
" /> <!-- batch-config can be ommitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> </resequencer> </route> </camelContext>
Stream resequencing — Transmits messages as soon as they
arrive unless the resequencer detects a gap in the incoming
message stream (missing sequence numbers). If there is a gap, the resequencer waits
until the missing messages arrive and then forwards the messages in the correct order.
To avoid the resequencer blocking forever, you specify a timeout (the default is 1000
ms), after which time the message sequence is transmitted with unresolved gaps. For
example, the following resequencing rule detects gaps in the message stream by
monitoring the value of the sequenceNumber
header, where the
maximum buffer size is limited to 5000, and the timeout is specified to be 4000
ms:
<camelContext id="streamResequencer" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <resequencer> <simple>header.sequenceNumber</simple> <to uri="TargetURL
" /> <stream-config capacity="5000" timeout="4000"/> </resequencer> </route> </camelContext>
The throttler
processor ensures that a target endpoint does not get
overloaded. The throttler works by limiting the number of messages that can pass through per
second. If the incoming messages exceed the specified rate, the throttler accumulates excess
messages in a buffer and transmits them more slowly to the target endpoint. To limit the
rate of throughput to 100 messages per second, you can define the following rule:
<camelContext id="throttlerRoute" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <throttler maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL
"/> </throttler> </route> </camelContext>
The delayer
processor delays messages for a specified length of time.
The delay can either be relative (waits a specified length of time after receipt of the
incoming message) or absolute (waits until a specific time). For example, to add a delay of
2 seconds before transmitting received messages, you can use the following
rule:
<camelContext id="delayerRelative" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <delayer> <delay>2000</delay> <to uri="TargetURL
"/> </delayer> </route> </camelContext>
To wait until the absolute time specified in the processAfter
header,
you can use the following
rule:
<camelContext id="delayerRelative" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="SourceURL
"/> <delayer> <simple>header.processAfter</simple> <to uri="TargetURL
"/> </delayer> </route> </camelContext>