LibraryLink ToToggle FramesPrintFeedback

Implementing the Consumer Interface

You can implement a consumer in one of the following ways:

In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.

Example 7.2 shows the implementation of the JMXConsumer class, which is taken from the FUSE Mediation Router JMX component implementation. The JMXConsumer class is an example of an event-driven consumer, which is implemented by inheriting from the org.apache.camel.impl.DefaultConsumer class. In the case of the JMXConsumer example, events are represented by calls on the NotificationListener.handleNotification() method, which is a standard way of receiving JMX events. In order to receive these JMX events, it is necessary to implement the NotificationListener interface and override the handleNotification() method, as shown in Example 7.2.

Example 7.2. JMXConsumer Implementation

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}

1

The JMXConsumer pattern follows the usual pattern for event-driven consumers by extending the DefaultConsumer class. Additionally, because this consumer is designed to receive events from JMX (which are represented by JMX notifications), it is necessary to implement the NotificationListener interface.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.

3

The handleNotification() method (which is defined in NotificationListener) is automatically invoked by JMX whenever a JMX notification arrives. The body of this method should contain the code that performs the consumer's event processing. Because the handleNotification() call originates from the JMX layer, the consumer's threading model is implicitly controlled by the JMX layer, not by the JMXConsumer class.

[Note]Note

The handleNotification() method is specific to the JMX example. When implementing your own event-driven consumer, you must identify an analogous event listener method to implement in your custom consumer.

4

This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in FUSE Mediation Router. The, the newly created exchange object is passed to the next processor in the route (invoked synchronously).

5

The handleException() method is implemented by the DefaultConsumer base class. By default, it handles exceptions using the org.apache.camel.impl.LoggingExceptionHandler class.

In a scheduled poll consumer, polling events are automatically generated by a timer class, java.util.concurrent.ScheduledExecutorService. To receive the generated polling events, you must implement the ScheduledPollConsumer.poll() method (see Consumer Patterns and Threading).

Example 7.3 shows how to implement a consumer that follows the scheduled poll pattern, which is implemented by extending the ScheduledPollConsumer class.

Example 7.3. ScheduledPollConsumer Implementation

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class CustomConsumer<E extends Exchange> extends ScheduledPollConsumer<E> { 1
    private final CustomEndpoint endpoint;

    public CustomConsumer(CustomEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        E exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor. 
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}

1

Implement a scheduled poll consumer class, CustomConsumer, by extending the org.apache.camel.impl.ScheduledPollConsumer class.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.

3

Override the poll() method to receive the scheduled polling events. This is where you should put the code that retrieves and processes incoming events (represented by exchange objects).

4

In this example, the event is processed synchronously. If you want to process events asynchronously, you should use a reference to an asynchronous processor instead, by calling getAsyncProcessor(). For details of how to process events asynchronously, see Asynchronous Processing.

5

(Optional) If you want some lines of code to execute as the consumer is starting up, override the doStart() method as shown.

6

(Optional) If you want some lines of code to execute as the consumer is stopping, override the doStop() method as shown.

Example 7.4 outlines how to implement a consumer that follows the polling pattern, which is implemented by extending the PollingConsumerSupport class.

Example 7.4. PollingConsumerSupport Implementation

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class CustomConsumer extends PollingConsumerSupport { 1
    private final CustomEndpoint endpoint;

    public CustomConsumer(CustomEndpoint endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}

1

Implement your polling consumer class, CustomConsumer, by extending the org.apache.camel.impl.PollingConsumerSupport class.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, as an argument. A polling consumer does not need a reference to a processor instance.

3

The receiveNoWait() method should implement a non-blocking algorithm for retrieving an event (exchange object). If no event is available, it should return null.

4

The receive() method should implement a blocking algorithm for retrieving an event. This method can block indefinitely, if events remain unavailable.

5

The receive(long timeout) method implements an algorithm that can block for as long as the specified timeout (typically specified in units of milliseconds).

6

If you want to insert code that executes while a consumer is starting up or shutting down, implement the doStart() method and the doStop() method, respectively.