LibraryLink ToToggle FramesPrintFeedback

Processors

To enable the router to something more interesting than simply connecting a source endpoint to a target endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule in order to perform arbitrary processing of the messages that flow through the rule. FUSE Mediation Router provides a wide variety of different processors, as follows:

The filter processor can be used to prevent uninteresting messages from reaching the target endpoint. It takes a single predicate argument — if the predicate is true, the message exchange is allowed through to the target; if the predicate is false, the message exchange is blocked. In the following example, the filter blocks a message exchange, unless the incoming message contains a header, foo, with value equal to bar:

<camelContext id="filterRoute" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

The choice processor is a conditional statement that routes incoming messages to alternative targets. Each target is enclosed in a when element, which takes a predicate argument. If the predicate is true, the current target is selected; if false, processing proceeds to the next when element in the rule. In the following example, the choice() processor directs incoming messages to either Target1, Target2, or Target3, depending on the values of the predicates:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

If you want the messages from a source endpoint, SourceURL, to be sent to more than one target, there are two alternative approaches. The first is to include multiple to elements in the route:

<camelContext id="staticRecipientList" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <to uri="Target1"/>
    <to uri="Target2"/>
    <to uri="Target3"/>
  </route>
</camelContext>

The second is to add a recipientList element, which takes a list of recipients as its argument (dynamic recipient list). The advantage of using the recipientList element is that the list of recipients can be calculated at runtime. For example, the following rule generates a recipient list by reading the contents of the recipientListHeader from the incoming message:

<camelContext id="dynamicRecipientList" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <recipientList>
      <!-- Requires XPath 2.0 -->
      <xpath>tokenize(/headers/recipientListHeader,"\s+")</xpath>
    </recipientList>
  </route>
</camelContext>

The splitter processor splits a message into parts, which are then processed as separate messages. The splitter element must contain an expression that returns a list, where each item in the list represents a message part that is to be re-sent as a separate message. The following example splits the body of an incoming message into separate sections (represented by a top-level section element) and then sends each section to the target in a separate message:

<camelContext id="splitterRoute" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="seda:a"/>
    <splitter>
      <xpath>/section</xpath>
      <to uri="seda:b"/>
    </splitter>
  </route>
</camelContext>

The aggregator processor aggregates related incoming messages into a single message. In order to distinguish which messages are eligible to be aggregated together, you must define a correlation key for the aggregator. The correlation key is normally derived from a field in the message (for example, a header field). Messages that have the same correlation key value are eligible to be aggregated together. You can also optionally specify an aggregation algorithm to the aggregator processor. The default algorithm is to pck the latest message with a given value of the correlation key and to discard the older messages with that correlation key value.

For example, if you are monitoring a data stream that reports stock prices in real time, you might only be interested in the latest price of each stock symbol. In this case, you could configure an aggregator to transmit only the latest price for a given stock and discard the older (out-of-date) price notifications. The following rule implements this functionality, where the correlation key is read from the stockSymbol header and the default aggregator algorithm is used:

<camelContext id="aggregatorRoute" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <aggregator>
      <simple>header.stockSymbol</simple>
      <to uri="TargetURL"/>
    </aggregator>
  </route>
</camelContext>

A resequencer processor reorders incoming messages and forwards them to the target. The resequencer element must be provided with a sequence number calculated from the contents of a field in the incoming message. Before you can start re-ordering messages, you must wait until a certain number of messages have been received from the source. There are two ways to specify how long the resequencer processor waits before attempting to re-order the accumulated messages and forward them to the target. They are:

The throttler processor ensures that a target endpoint does not get overloaded. The throttler works by limiting the number of messages that can pass through per second. If the incoming messages exceed the specified rate, the throttler accumulates excess messages in a buffer and transmits them more slowly to the target endpoint. To limit the rate of throughput to 100 messages per second, you can define the following rule:

<camelContext id="throttlerRoute" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttler maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttler>
  </route>
</camelContext>

The delayer processor delays messages for a specified length of time. The delay can either be relative (waits a specified length of time after receipt of the incoming message) or absolute (waits until a specific time). For example, to add a delay of 2 seconds before transmitting received messages, you can use the following rule:

<camelContext id="delayerRelative" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <delayer>
      <delay>2000</delay>
      <to uri="TargetURL"/>
    </delayer>
  </route>
</camelContext>

To wait until the absolute time specified in the processAfter header, you can use the following rule:

<camelContext id="delayerRelative" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <delayer>
      <simple>header.processAfter</simple>
      <to uri="TargetURL"/>
    </delayer>
  </route>
</camelContext>