11.3 Programming model

The Aggregation API consists of a number of classes:

AbstractMessageAggregator

The AbstractMessageAggregator is a MessageHandler implementation, encapsulating the common functionalities of an Aggregator, which are:

  • correlating messages into a group to be aggregated

  • maintaining those messages until the group is complete

  • deciding when the group is in fact complete

  • processing the completed group into a single aggregated message

  • recognizing and responding to a timed-out completion attempt

The responsibility of deciding how the messages should be grouped together is delegated to a CorrelationStrategy instance. The responsibility of deciding whether the message group is complete is delegated to a CompletionStrategy instance.

Here is a brief highlight of the base AbstractMessageAggregator (the responsibility of implementing the aggregateMessages method is left to the developer):

public abstract class AbstractMessageAggregator 
              extends AbstractMessageBarrierHandler {

  private volatile CompletionStrategy completionStrategy
                            = new SequenceSizeCompletionStrategy();
  ....

  protected abstract Message<?> aggregateMessages(List<Message<?>> messages);

}
It also inherits the following default CorrelationStrategy:
private volatile CorrelationStrategy correlationStrategy =
          new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);

When appropriate, the simplest option is the DefaultMessageAggregator. It creates a single Message whose payload is a List of the payloads received for a given group. It uses the default CorrelationStrategy and CompletionStrategy as shown above. This works well for simple Scatter Gather implementations with either a Splitter, Publish Subscribe Channel, or Recipient List Router upstream.

[Note]Note

When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary.

When implementing a specific aggregator object for an application, a developer can extend AbstractMessageAggregator and implement the aggregateMessages method. However, there are better suited (which reads, less coupled to the API) solutions for implementing the aggregation logic, which can be configured easily either through XML or through annotations.

In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.

[Note]Note

In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application.

CompletionStrategy

The CompletionStrategy interface is defined as follows:

public interface CompletionStrategy {

  boolean isComplete(List<Message<?>> messages);

}

In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • the method must return true if the message group is complete and ready for aggregation, and false otherwise.

Spring Integration provides an out-of-the box implementation for CompletionStrategy, the SequenceSizeCompletionStrategy. This implementation uses the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for deciding when a message group is complete and ready to be aggregated. As shown above, it is also the default strategy.

CorrelationStrategy

The CorrelationStrategy interface is defined as follows:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().

In general, any ordinary Java class (i.e. POJO) can implement the correlation decision mechanism, and the rules for mapping a message to a method's argument (or arguments) are the same as for a ServiceActivator (including support for @Header annotations). The method must return a value, and the value must not be null.

Spring Integration provides an out-of-the box implementation for CorrelationStrategy, the HeaderAttributeCorrelationStrategy. This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key. By default, the correlation strategy is a HeaderAttributeCorrelationStrategy returning the value of the CORRELATION_ID header attribute.