The Aggregation API consists of a number of classes:
The base class AbstractMessageAggregator
and its
subclass MethodInvokingMessageAggregator
The CompletionStrategy
interface and its default
implementation SequenceSizeCompletionStrategy
The CorrelationStrategy
interface and its default
implementation HeaderAttributeCorrelationStrategy
The AbstractMessageAggregator
is a
MessageHandler
implementation, encapsulating the common
functionalities of an Aggregator, which are:
correlating messages into a group to be aggregated
maintaining those messages until the group is complete
deciding when the group is in fact complete
processing the completed group into a single aggregated message
recognizing and responding to a timed-out completion attempt
The responsibility of deciding how the messages should be grouped together
is delegated to a CorrelationStrategy
instance. The responsibility
of deciding whether the message group is complete is delegated to a
CompletionStrategy
instance.
Here is a brief highlight of the base
AbstractMessageAggregator
(the responsibility of
implementing the aggregateMessages method is left to the
developer):
public abstract class AbstractMessageAggregator extends AbstractMessageBarrierHandler { private volatile CompletionStrategy completionStrategy = new SequenceSizeCompletionStrategy(); .... protected abstract Message<?> aggregateMessages(List<Message<?>> messages); }It also inherits the following default CorrelationStrategy:
private volatile CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);
When appropriate, the simplest option is the DefaultMessageAggregator
.
It creates a single Message whose payload is a List of the payloads received
for a given group. It uses the default CorrelationStrategy
and
CompletionStrategy
as shown above. This works well for simple
Scatter Gather implementations with either a Splitter, Publish Subscribe Channel,
or Recipient List Router upstream.
Note | |
---|---|
When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary. |
When implementing a specific aggregator object for an application,
a developer can extend AbstractMessageAggregator
and
implement the aggregateMessages
method. However, there are
better suited (which reads, less coupled to the API) solutions for
implementing the aggregation logic, which can be configured easily
either through XML or through annotations.
In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
Note | |
---|---|
In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application. |
The CompletionStrategy
interface is defined as
follows:
public interface CompletionStrategy { boolean isComplete(List<Message<?>> messages); }
In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
the method must return true if the message group is complete and ready for aggregation, and false otherwise.
Spring Integration provides an out-of-the box implementation for
CompletionStrategy
, the
SequenceSizeCompletionStrategy
. This implementation uses
the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for
deciding when a message group is complete and ready to be
aggregated. As shown above, it is also the default strategy.
The CorrelationStrategy
interface is defined as
follows:
public interface CorrelationStrategy { Object getCorrelationKey(Message<?> message); }
The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().
In general, any ordinary Java class (i.e. POJO) can implement the
correlation decision mechanism, and the rules for mapping a message to
a method's argument (or arguments) are the same as for a
ServiceActivator
(including support for @Header
annotations). The method must return a value, and the value must not be
null
.
Spring Integration provides an out-of-the box implementation for
CorrelationStrategy
, the
HeaderAttributeCorrelationStrategy
. This implementation
returns the value of one of the message headers (whose name is specified
by a constructor argument) as the correlation key. By default, the
correlation strategy is a HeaderAttributeCorrelationStrategy returning
the value of the CORRELATION_ID header attribute.