The pattern used to implement the consumer determines the threading model used in processing the incoming exchanges. Consumers can be implemented using one of the following patterns:
Event-driven pattern—The consumer is driven by an external thread.
Scheduled poll pattern—The consumer is driven by a dedicated thread pool.
Polling pattern—The threading model is left undefined.
In the event-driven pattern, the processing of an incoming request is initiated when
another part of the application (typically a third-party library) calls a method
implemented by the consumer. A good example of an event-driven consumer is the Fuse Mediation Router
JMX component, where events are initiated by the JMX library. The JMX library calls the
handleNotification()
method to initiate request
processing—see Example 8.3 for
details.
Figure 5.3 shows an
outline of the event-driven consumer pattern. In this example, it is assumed that
processing is triggered by a call to the
method.notify
()
The event-driven consumer processes incoming requests as follows:
The consumer must implement a method to receive the incoming event (in Figure 5.3 this is represented by the
method). The thread that callsnotify
()
is normally a separate part of the application, so the consumer's threading policy is externally driven.notify
()For example, in the case of the JMX consumer implementation, the consumer implements the
NotificationListener.handleNotification()
method to receive notifications from JMX. The threads that drive the consumer processing are created within the JMX layer.In the body of the
method, the consumer first converts the incoming event into an exchange object,notify
()E
, and then callsprocess()
on the next processor in the route, passing the exchange object as its argument.
In the scheduled poll pattern, the consumer retrieves incoming requests by checking at regular time intervals whether or not a request has arrived. Checking for requests is scheduled automatically by a built-in timer class, the scheduled executor service, which is a standard pattern provided by the java.util.concurrent library. The scheduled executor service executes a particular task at timed intervals and it also manages a pool of threads, which are used to run the task instances.
Figure 5.4 shows an outline of the scheduled poll consumer pattern.
The scheduled poll consumer processes incoming requests as follows:
The scheduled executor service has a pool of threads at its disposal, that can be used to initiate consumer processing. After each scheduled time interval has elapsed, the scheduled executor service attempts to grab a free thread from its pool (there are five threads in the pool by default). If a free thread is available, it uses that thread to call the
poll()
method on the consumer.The consumer's
poll()
method is intended to trigger processing of an incoming request. In the body of thepoll()
method, the consumer attempts to retrieve an incoming message. If no request is available, thepoll()
method returns immediately.If a request message is available, the consumer inserts it into an exchange object and then calls
process()
on the next processor in the route, passing the exchange object as its argument.
In the polling pattern, processing of an incoming request is initiated when a third-party calls one of the consumer's polling methods:
receive()
receiveNoWait()
receive(long timeout)
It is up to the component implementation to define the precise mechanism for initiating calls on the polling methods. This mechanism is not specified by the polling pattern.
Figure 5.5 shows an outline of the polling consumer pattern.
The polling consumer processes incoming requests as follows:
Processing of an incoming request is initiated whenever one of the consumer's polling methods is called. The mechanism for calling these polling methods is implementation defined.
In the body of the
receive()
method, the consumer attempts to retrieve an incoming request message. If no message is currently available, the behavior depends on which receive method was called.receiveNoWait()
returns immediatelyreceive(long timeout)
waits for the specified timeout interval[2] before returningreceive()
waits until a message is received
If a request message is available, the consumer inserts it into an exchange object and then calls
process()
on the next processor in the route, passing the exchange object as its argument.