A recipient list, shown in Figure 7.3, is a type of router that sends each incoming message to multiple different destinations. In addition, a recipient list typically requires that the list of recipients be calculated at run time.
The simplest kind of recipient list is where the list of destinations is fixed and known
in advance, and the exchange pattern is InOnly. In this case, you can
hardwire the list of destinations into the to()
Java DSL command.
The following example shows how to route an InOnly exchange from a
consumer endpoint, queue:a
, to a fixed list of destinations:
from("seda:a").to("seda:b", "seda:c", "seda:d");
The following example shows how to configure the same route in XML:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
In most cases, when you use the recipient list pattern, the list of recipients should be
calculated at runtime. To do this use the recipientList()
processor, which
takes a list of destinations as its sole argument. Because Fuse Mediation Router applies a type converter
to the list argument, it should be possible to use most standard Java list types (for
example, a collection, a list, or an array). For more details about type converters, see
Built-In Type Converters in Programing EIP Components.
The recipients receive a copy of the same exchange instance and Fuse Mediation Router executes them sequentially.
The following example shows how to extract the list of destinations from a message
header called recipientListHeader
, where the header value is a comma-separated
list of endpoint URIs:
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));
In some cases, if the header value is a list type, you might be able to use it directly
as the argument to recipientList()
. For example:
from("seda:a").recipientList(header("recipientListHeader"));
However, this example is entirely dependent on how the underlying component parses this particular header. If the component parses the header as a simple string, this example will not work. The header must be parsed into some type of Java list.
The following example shows how to configure the preceding route in XML, where the header value is a comma-separated list of endpoint URIs:
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <recipientList delimiter=","> <header>recipientListHeader</header> </recipientList> </route> </camelContext>
Available as of Camel 2.2
The Recipient List supports
parallelProcessing
, which is similar to the corresponding feature in
Splitter. Use the parallel processing feature to
send the exchange to multiple recipients concurrently—for example:
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
In Spring XML, the parallel processing feature is implemented as an attribute on the
recipientList
tag—for example:
<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>
Available as of Camel 2.2
The Recipient List supports the
stopOnException
feature, which you can use to stop sending to any
further recipients, if any recipient fails.
from("direct:a").recipientList(header("myHeader")).stopOnException();
And in Spring XML its an attribute on the recipient list tag.
In Spring XML, the stop on exception feature is implemented as an attribute on the
recipientList
tag—for example:
<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>
![]() | Note |
---|---|
You can combine |
Available as of Camel 2.3
The Recipient List supports the
ignoreInvalidEndpoints
option, which enables the recipient list to skip
invalid endpoints (Routing Slip also supports
this option). For example:
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
And in Spring XML, you can enable this option by setting the
ignoreInvalidEndpoints
attribute on the recipientList
tag, as
follows
<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>
Consider the case where myHeader
contains the two endpoints,
direct:foo,xxx:bar
. The first endpoint is valid and works. The second
is invalid and, therefore, ignored. Fuse Mediation Router logs at INFO
level whenever an
invalid endpoint is encountered.
Available as of Camel 2.2
You can use a custom AggregationStrategy
with the Recipient List, which is useful for aggregating
replies from the recipients in the list. By default, Fuse Mediation Router uses the
UseLatestAggregationStrategy
aggregation strategy, which keeps just the
last received reply. For a more sophisticated aggregation strategy, you can define your own
implementation of the AggregationStrategy
interface—see Aggregator EIP for details. For example, to apply the
custom aggregation strategy, MyOwnAggregationStrategy
, to the reply messages,
you can define a Java DSL route as follows:
from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");
In Spring XML, you can specify the custom aggregation strategy as an attribute on the
recipientList
tag, as follows:
<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
Available as of Camel 2.2
This is only needed when you use parallelProcessing
. By default Camel
uses a thread pool with 10 threads. Notice this is subject to change when we overhaul thread
pool management and configuration later (hopefully in Camel 2.2).
You configure this just as you would with the custom aggregation strategy.
You can use a Bean in EIP Component Reference to provide the recipients, for example:
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
Where the MessageRouter
bean is defined as follows:
public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }
You can make a bean behave as a recipient list by adding the
@RecipientList
annotation to a methods that returns a list of
recipients. For example:
public class MessageRouter { @RecipientList public String routeTo() { String queueList = "activemq:queue:test1,activemq:queue:test2"; return queueList; } }
In this case, do not include the recipientList
DSL
command in the route. Define the route as follows:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
Available as of Camel 2.5
If you use parallelProcessing
, you can configure a total
timeout
value in milliseconds. Camel will then process the messages in
parallel until the timeout is hit. This allows you to continue processing if one message is
slow.
In the example below, the recipientlist
header has the value,
direct:a,direct:b,direct:c
, so that the message is sent to three recipients.
We have a timeout of 250 milliseconds, which means only the last two messages can be
completed within the timeframe. The aggregation therefore yields the string result,
BC
.
from("direct:start") .recipientList(header("recipients"), ",") .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250) // use end to indicate end of recipientList clause .end() .to("mock:result"); from("direct:a").delay(500).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C"));
![]() | Timeout in other EIPs |
---|---|
This |
By default if a timeout occurs the AggregationStrategy
is not
invoked. However you can implement a specialized version
// Java public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);
This allows you to deal with the timeout in the AggregationStrategy
if you really need to.
![]() | Timeout is total |
---|---|
The timeout is total, which means that after X time, Camel will aggregate the messages
which has completed within the timeframe. The remainders will be cancelled. Camel will
also only invoke the |
Before recipientList
sends a message to one of the recipient endpoints, it
creates a message replica, which is a shallow copy of the original message. If you want to
perform some custom processing on each message replica before the replica is sent to its
endpoint, you can invoke the onPrepare
DSL command in the
recipientList
clause. The onPrepare
command inserts a custom
processor just after the message has been shallow-copied and just
before the message is dispatched to its endpoint. For example, in the
following route, the CustomProc
processor is invoked on the message replica for
each recipient endpoint:
from("direct:start") .recipientList().onPrepare(new CustomProc());
A common use case for the onPrepare
DSL command is to perform a deep copy
of some or all elements of a message. This allows each message replica to be modified
independently of the others. For example, the following CustomProc
processor
class performs a deep copy of the message body, where the message body is presumed to be of
type,
, and the deep copy is performed by
the method, BodyType
.BodyType
.deepCopy()
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception {BodyType
body = exchange.getIn().getBody(BodyType
.class); // Make a _deep_ copy of of the body objectBodyType
clone =BodyType
.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
The recipientList
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
delimiter
|
,
|
Delimiter used if the Expression returned multiple endpoints. |
strategyRef
|
Refers to an AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the Recipient List. By default Camel will use the last reply as the outgoing message. | |
parallelProcessing
|
false
|
Camel 2.2: If enables then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. |
executorServiceRef
|
Camel 2.2: Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. | |
stopOnException
|
false
|
Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel will send the message to all recipients regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that. |
ignoreInvalidEndpoints
|
false
|
Camel 2.3: If an endpoint uri could not be resolved, should it be ignored. Otherwise Camel will thrown an exception stating the endpoint uri is not valid. |
streaming
|
false
|
Camel 2.5: If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as the Expression specified. |
timeout
|
Camel 2.5: Sets a total timeout specified in millis. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. |
|
onPrepareRef
|
Camel 2.8: Refers to a custom Processor to prepare the copy of the Exchange each recipient will receive. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc. | |
shareUnitOfWork
|
false
|
Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details. |