4.3 Polling Consumer

Spring Integration also provides a PollingConsumer, and it can be instantiated in the same way except that the channel must implement PollableChannel:

PollableChannel channel = (PollableChannel) context.getBean("pollableChannel");

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);

There are many other configuration options for the Polling Consumer. For example, the trigger is a required property:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new IntervalTrigger(30, TimeUnit.SECONDS));

Spring Integration currently provides two implementations of the Trigger interface: IntervalTrigger and CronTrigger. The IntervalTrigger is typically defined with a simple interval (in milliseconds), but also supports an 'initialDelay' property and a boolean 'fixedRate' property (the default is false, i.e. fixed delay):

IntervalTrigger trigger = new IntervalTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);

The CronTrigger simply requires a valid cron expression (see the Javadoc for details):

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

In addition to the trigger, several other polling-related configuration properties may be specified:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);

consumer.setReceiveTimeout(5000);

The 'maxMessagesPerPoll' property specifies the maximum number of messages to receive within a given poll operation. This means that the poller will continue calling receive() without waiting until either null is returned or that max is reached. For example, if a poller has a 10 second interval trigger and a 'maxMessagesPerPoll' setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits 10 seconds, grabs the next 25, and so on.

The 'receiveTimeout' property specifies the amount of time the poller should wait if no messages are available when it invokes the receive operation. For example, consider two options that seem similar on the surface but are actually quite different: the first has an interval trigger of 5 seconds and a receive timeout of 50 milliseconds while the second has an interval trigger of 50 milliseconds and a receive timeout of 5 seconds. The first one may receive a message up to 4950 milliseconds later than it arrived on the channel (if that message arrived immediately after one of its poll calls returned). On the other hand, the second configuration will never miss a message by more than 50 milliseconds. The difference is that the second option requires a thread to wait, but as a result it is able to respond much more quickly to arriving messages. This technique, known as "long polling", can be used to emulate event-driven behavior on a polled source.

A Polling Consumer may also delegate to a Spring TaskExecutor, and it can be configured to participate in Spring-managed transactions. The following example shows the configuration of both:

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = (TaskExecutor) context.getBean("exampleExecutor");
consumer.setTaskExecutor(taskExecutor);

PlatformTransactionManager txManager = (PlatformTransationManager) context.getBean("exampleTxManager");
consumer.setTransactionManager(txManager);

The examples above show dependency lookups, but keep in mind that these consumers will most often be configured as Spring bean definitions. In fact, Spring Integration also provides a FactoryBean that creates the appropriate consumer type based on the type of channel, and there is full XML namespace support to even further hide those details. The namespace-based configuration will be featured as each component type is introduced.

[Note]Note
Many of the MessageHandler implementations are also capable of generating reply Messages. As mentioned above, sending Messages is trivial when compared to the Message reception. Nevertheless, when and how many reply Messages are sent depends on the handler type. For example, an Aggregator waits for a number of Messages to arrive and is often configured as a downstream consumer for a Splitter which may generate multiple replies for each Message it handles. When using the namespace configuration, you do not strictly need to know all of the details, but it still might be worth knowing that several of these components share a common base class, the AbstractReplyProducingMessageHandler, and it provides a setOutputChannel(..) method.