LibraryLink ToToggle FramesPrintFeedback

The Consumer Interface

An instance of org.apache.camel.Consumer type represents a source endpoint in a route. There are several different ways of implementing a consumer (see Consumer Patterns and Threading), and this degree of flexibility is reflected in the inheritance hierarchy ( see Figure 7.1), which includes several different base classes for implementing a consumer.


For consumers that follow the scheduled poll pattern (see Scheduled poll pattern), FUSE Mediation Router provides support for injecting parameters into consumer instances. For example, consider the following endpoint URI for a component identified by the custom prefix:

custom:destination?consumer.myConsumerParam

FUSE Mediation Router provides support for automatically injecting query options of the form consumer.*. For the consumer.myConsumerParam parameter, you need to define corresponding setter and getter methods on the Consumer implementation class as follows:

public class CustomConsumer<E extends Exchange> extends ScheduledPollConsumer<E> {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}

Where the getter and setter methods follow the usual Java bean conventions (including capitalizing the first letter of the property name).

In addition to defining the bean methods in your Consumer implementation, you must also remember to call the configureConsumer() method in the implementation of Endpoint.createConsumer(). See Scheduled poll endpoint implementation). Example 7.1 shows an example of a createConsumer() method implementation, taken from the FileEndpoint class in the file component:


At run time, consumer parameter injection works as follows:

A consumer that follows the scheduled poll pattern automatically supports the consumer parameters shown in Table 7.1 (which can appear as query options in the endpoint URI).


FUSE Mediation Router provides two special consumer implementations which can be used to convert back and forth between an event-driven consumer and a polling consumer. The following conversion classes are provided:

In practice, these classes are used to simplify the task of implementing an Endpoint type. The Endpoint interface defines the following two methods for creating a consumer instance:

package org.apache.camel;

public interface Endpoint<E extends Exchange> {
    ...
    Consumer<E> createConsumer(Processor processor) throws Exception;
    PollingConsumer<E> createPollingConsumer() throws Exception;
}

createConsumer() returns an event-driven consumer and createPollingConsumer() returns a polling consumer. You would only implement one these methods. For example, if you are following the event-driven pattern for your consumer, you would implement the createConsumer() method provide a method implementation for createPollingConsumer() that simply raises an exception. With the help of the conversion classes, however, FUSE Mediation Router is able to provide a more useful default implementation.

For example, if you want to implement your consumer according to the event-driven pattern, you implement the endpoint by extending DefaultEndpoint and implementing the createConsumer() method. The implementation of createPollingConsumer() is inherited from DefaultEndpoint, where it is defined as follows:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}

The EventDrivenPollingConsumer constructor takes a reference to the event-driven consumer, this, effectively wrapping it and converting it into a polling consumer. To implement the conversion, the EventDrivenPollingConsumer instance buffers incoming events and makes them available on demand through the receive(), the receive(long timeout), and the receiveNoWait() methods.

Analogously, if you are implementing your consumer according to the polling pattern, you implement the endpoint by extending DefaultPollingEndpoint and implementing the createPollingConsumer() method. In this case, the implementation of the createConsumer() method is inherited from DefaultPollingEndpoint, and the default implementation returns a DefaultScheduledPollConsumer instance (which converts the polling consumer into an event-driven consumer).