You can implement a consumer in one of the following ways:
In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.
Example 8.3 shows the implementation of the
JMXConsumer
class, which is taken from the Fuse Mediation Router JMX component
implementation. The JMXConsumer
class is an example of an event-driven
consumer, which is implemented by inheriting from the
org.apache.camel.impl.DefaultConsumer
class. In the case of the
JMXConsumer
example, events are represented by calls on the
NotificationListener.handleNotification()
method, which is a
standard way of receiving JMX events. In order to receive these JMX events, it is necessary
to implement the NotificationListener
interface and override
the handleNotification()
method, as shown in Example 8.3.
Example 8.3. JMXConsumer Implementation
package org.apache.camel.component.jmx; import javax.management.Notification; import javax.management.NotificationListener; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; public class JMXConsumer extends DefaultConsumer implements NotificationListener {JMXEndpoint jmxEndpoint; public JMXConsumer(JMXEndpoint endpoint, Processor processor) {
super(endpoint, processor); this.jmxEndpoint = endpoint; } public void handleNotification(Notification notification, Object handback) {
try { getProcessor().process(jmxEndpoint.createExchange(notification));
} catch (Throwable e) { handleException(e);
} } }
The | ||||
You must implement at least one constructor that takes a reference to the parent
endpoint, | ||||
The
| ||||
This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Fuse Mediation Router. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously). | ||||
The |
In a scheduled poll consumer, polling events are automatically generated by a timer
class, java.util.concurrent.ScheduledExecutorService
. To receive the generated
polling events, you must implement the ScheduledPollConsumer.poll()
method (see
Consumer Patterns and Threading).
Example 8.4 shows how to implement a
consumer that follows the scheduled poll pattern, which is implemented by extending the
ScheduledPollConsumer
class.
Example 8.4. ScheduledPollConsumer Implementation
import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public classCustomConsumer
extends ScheduledPollConsumer {private final
CustomEndpoint
endpoint; publicCustomConsumer
(CustomEndpoint
endpoint, Processor processor) {super(endpoint, processor); this.endpoint = endpoint; } protected void poll() throws Exception {
Exchange exchange = /* Receive exchange object ... */; // Example of a synchronous processor. getProcessor().process(exchange);
} @Override protected void doStart() throws Exception {
// Pre-Start: // Place code here to execute just before start of processing. super.doStart(); // Post-Start: // Place code here to execute just after start of processing. } @Override protected void doStop() throws Exception {
// Pre-Stop: // Place code here to execute just before processing stops. super.doStop(); // Post-Stop: // Place code here to execute just after processing stops. } }
Implement a scheduled poll consumer class,
| |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
Override the | |
In this example, the event is processed synchronously. If you want to process events
asynchronously, you should use a reference to an asynchronous processor instead, by
calling | |
(Optional) If you want some lines of code to execute as the
consumer is starting up, override the | |
(Optional) If you want some lines of code to execute as the
consumer is stopping, override the |
Example 8.5 outlines how to implement a
consumer that follows the polling pattern, which is implemented by extending the
PollingConsumerSupport
class.
Example 8.5. PollingConsumerSupport Implementation
import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.PollingConsumerSupport; public classCustomConsumer
extends PollingConsumerSupport {private final
CustomEndpoint
endpoint; publicCustomConsumer
(CustomEndpoint
endpoint) {super(endpoint); this.endpoint = endpoint; } public Exchange receiveNoWait() {
Exchange exchange = /* Obtain an exchange object. */; // Further processing ... return exchange; } public Exchange receive() {
// Blocking poll ... } public Exchange receive(long timeout) {
// Poll with timeout ... } protected void doStart() throws Exception {
// Code to execute whilst starting up. } protected void doStop() throws Exception { // Code to execute whilst shutting down. } }
Implement your polling consumer class, | |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
The | |
The | |
The | |
If you want to insert code that executes while a consumer is starting up or shutting
down, implement the |
If the standard consumer patterns are not suitable for your consumer implementation, you
can implement the Consumer
interface directly and write the threading code
yourself. When writing the threading code, however, it is important that you comply with the
standard Fuse Mediation Router threading model, as described in Threading Model in Implementing Enterprise Integration Patterns.
For example, the SEDA component from camel-core
implements its own consumer
threading, which is consistent with the Fuse Mediation Router threading model. Example 8.6 shows an outline of how the
SedaConsumer
class implements its threading.
Example 8.6. Custom Threading Implementation
package org.apache.camel.component.seda; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.util.ServiceHelper; ... import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * A Consumer for the SEDA component. * * @version $Revision: 922485 $ */ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware {private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); private SedaEndpoint endpoint; private Processor processor; private ExecutorService executor; ... public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = processor; } ... public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue(); // Poll the queue and process exchanges ... } ... protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers(); executor = endpoint.getCamelContext().getExecutorServiceStrategy() .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this); } endpoint.onStarted(this); } protected void doStop() throws Exception {
endpoint.onStopped(this); // must shutdown executor on stop to avoid overhead of having them running endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor);
executor = null; if (multicast != null) { ServiceHelper.stopServices(multicast); } } ... //---------- // Implementation of ShutdownAware interface public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want seda consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; } public int getPendingExchangesSize() { // number of pending messages on the queue return endpoint.getQueue().size(); } }
The | |
Implement the | |
The | |
Instead of creating threads directly, you should create a thread pool using the
For details, see Threading Model in Implementing Enterprise Integration Patterns. | |
Kick off the threads by calling the | |
The | |
Shut down the thread pool, which is represented by the |