The multicast pattern, shown in Figure 7.10, is a variation of the recipient list with a fixed destination pattern, which is compatible with the InOut message exchange pattern. This is in contrast to recipient list, which is only compatible with the InOnly exchange pattern.
Whereas the multicast processor receives multiple Out messages in response to the original request (one from each of the recipients), the original caller is only expecting to receive a single reply. Thus, there is an inherent mismatch on the reply leg of the message exchange, and to overcome this mismatch, you must provide a custom aggregation strategy to the multicast processor. The aggregation strategy class is responsible for aggregating all of the Out messages into a single reply message.
Consider the example of an electronic auction service, where a seller offers an item for
sale to a list of buyers. The buyers each put in a bid for the item, and the seller
automatically selects the bid with the highest price. You can implement the logic for
distributing an offer to a fixed list of buyers using the multicast()
DSL
command, as follows:
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
Where the seller is represented by the endpoint, cxf:bean:offer
, and the
buyers are represented by the endpoints, cxf:bean:Buyer1
,
cxf:bean:Buyer2
, cxf:bean:Buyer3
. To consolidate the bids
received from the various buyers, the multicast processor uses the aggregation strategy,
HighestBidAggregationStrategy
. You can implement the
HighestBidAggregationStrategy
in Java, as follows:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
Where it is assumed that the buyers insert the bid price into a header named,
Bid
. For more details about custom aggregation strategies, see Aggregator.
By default, the multicast processor invokes each of the recipient endpoints one after
another (in the order listed in the to()
command). In some cases, this might
cause unacceptably long latency. To avoid these long latency times, you have the option of
enabling parallel processing by adding the parallelProcessing()
clause. For
example, to enable parallel processing in the electronic auction example, define the route
as follows:
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
Where the multicast processor now invokes the buyer endpoints, using a thread pool that has one thread for each of the endpoints.
If you want to customize the size of the thread pool that invokes the buyer endpoints,
you can invoke the executorService()
method to specify your own custom executor
service. For example:
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor
)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
Where MyExecutor
is an instance of java.util.concurrent.ExecutorService type.
When the exchange has an InOut pattern, an aggregation strategy is
used to aggregate reply messages. The default aggregation strategy takes the latest reply
message and discards earlier replies. For example, in the following route, the custom
strategy, MyAggregationStrategy
, is used to aggregate the replies from the
endpoints, direct:a
, direct:b
, and direct:c
:
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
The following example shows how to configure a similar route in XML, where the route uses a custom aggregation strategy and a custom thread executor:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
Where both the parallelProcessing
attribute and the
threadPoolRef
attribute are optional. It is only necessary to set them if you
want to customize the threading behavior of the multicast processor.
Before multicast sends a message to one of the recipient endpoints, it creates a message
replica, which is a shallow copy of the original message. If you want to perform some custom
processing on each message replica before the replica is sent to its endpoint, you can
invoke the onPrepare
DSL command in the multicast
clause. The
onPrepare
command inserts a custom processor just after
the message has been shallow-copied and just before the message is
dispatched to its endpoint. For example, in the following route, the CustomProc
processor is invoked on the message sent to direct:a
and the
CustomProc
processor is also invoked on the message sent to
direct:b
.
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
A common use case for the onPrepare
DSL command is to perform a deep copy
of some or all elements of a message. For example, the following CustomProc
processor class performs a deep copy of the message body, where the message body is presumed
to be of type,
, and the deep copy is
performed by the method, BodyType
.BodyType
.deepCopy()
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception {BodyType
body = exchange.getIn().getBody(BodyType
.class); // Make a _deep_ copy of of the body objectBodyType
clone =BodyType
.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
![]() | Note |
---|---|
Although the |
The Multicast will copy the source Exchange and multicast each copy. However the copy is a shallow
copy, so in case you have mutateable message bodies, then any changes will be visible by the
other copied messages. If you want to use a deep clone copy then you need to use a custom
onPrepare
which allows you to do this using the Processor interface.
Notice the onPrepare
can be used for any kind of custom logic which
you would like to execute before the Exchange is being
multicasted.
![]() | Design for immutable |
---|---|
Its best practice to design for immutable objects. |
For example if you have a mutable message body as this Animal class:
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
Then we can create a deep clone processor which clones the message body:
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
Then we can use the AnimalDeepClonePrepare class in the Multicast route using the onPrepare
option as shown:
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
And the same example in XML DSL
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
The multicast
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
strategyRef
|
Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. By default Camel will use the last reply as the outgoing message. | |
parallelProcessing
|
false
|
If enables then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. |
executorServiceRef
|
Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. | |
stopOnException
|
false
|
Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel will send the message to all multicasts regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that. |
streaming
|
false
|
If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as multicasted. |
timeout
|
Camel 2.5: Sets a total timeout specified in millis. If the Multicast hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Multicast breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. |
|
onPrepareRef
|
Camel 2.8: Refers to a custom Processor to prepare the copy of the Exchange each multicast will receive. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc. | |
shareUnitOfWork
|
false
|
Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details. |