LibraryToggle FramesPrintFeedback

The Fuse Mediation Router threading model is based on the powerful Java concurrency API, java.util.concurrent, that first became available in Sun's JDK 1.5. The key interface in this API is the ExecutorService interface, which represents a thread pool. Using the concurrency API, you can create many different kinds of thread pool, covering a wide range of scenarios.

A custom thread pool can be any thread pool of java.util.concurrent.ExecutorService type. The following approaches to creating a thread pool instance are recommended in Fuse Mediation Router:

  • Use the org.apache.camel.builder.ThreadPoolBuilder utility to build the thread pool class.

  • Use the org.apache.camel.spi.ExecutorServiceStrategy instance from the current CamelContext to create the thread pool class.

Ultimately, there is not much difference between the two approaches, because the ThreadPoolBuilder is actually defined using the ExecutorServiceStrategy instance. Normally, the ThreadPoolBuilder is preferred, because it offers a simpler approach. But there is at least one kind of thread (the ScheduledExecutorService) that can only be created by accessing the ExecutorServiceStrategy instance directory.

Table 2.9 shows the options supported by the ThreadPoolBuilder class, which you can set when defining a new custom thread pool.

Table 2.9. Thread Pool Builder Options

Builder OptionDescription
maxQueueSize()Sets the maximum number of pending tasks that this thread pool can store in its incoming task queue. A value of -1 specifies an unbounded queue. Default value is taken from default thread pool profile.
poolSize()Sets the minimum number of threads in the pool (this is also the initial pool size). Default value is taken from default thread pool profile.
maxPoolSize()Sets the maximum number of threads that can be in the pool. Default value is taken from default thread pool profile.
keepAliveTime()If any threads are idle for longer than this period of time (specified in seconds), they are terminated. This allows the thread pool to shrink when the load is light. Default value is taken from default thread pool profile.
rejectedPolicy()

Specifies what course of action to take, if the incoming task queue is full. You can specify four possible values:

CallerRuns

(Default value) Gets the caller thread to run the latest incoming task. As a side effect, this option prevents the caller thread from receiving any more tasks until it has finished processing the latest incoming task.

Abort

Aborts the latest incoming task by throwing an exception.

Discard

Quietly discards the latest incoming task.

DiscardOldest

Discards the oldest unhandled task and then attempts to enqueue the latest incoming task in the task queue.

build()Finishes building the custom thread pool and registers the new thread pool under the ID specified as the argument to build().

In Java DSL, you can define a custom thread pool using the ThreadPoolBuilder, as follows:

// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...

from("direct:start")
  .multicast().executorService(customPool)
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

Instead of passing the object reference, customPool, directly to the executorService() option, you can look up the thread pool in the registry, by passing its bean ID to the executorServiceRef() option, as follows:

// Java
from("direct:start")
  .multicast().executorServiceRef("customPool")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

In XML DSL, you access the ThreadPoolBuilder using the threadPool element. You can then reference the custom thread pool using the executorServiceRef attribute to look up the thread pool by ID in the Spring registry, as follows:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPool id="customPool"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customPool">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

If you have many custom thread pool instances to create, you might find it more convenient to define a custom thread pool profile, which acts as a factory for thread pools. Whenever you reference a thread pool profile from a threading-aware processor, the processor automatically uses the profile to create a new thread pool instance. You can define a custom thread pool profile either in Java DSL or in XML DSL.

For example, in Java DSL you can create a custom thread pool profile with the bean ID, customProfile, and reference it from within a route, as follows:

// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceStrategy().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
  .multicast().executorServiceRef("customProfile")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

In XML DSL, use the threadPoolProfile element to create a custom pool profile (where you let the defaultProfile option default to false, because this is not a default thread pool profile). You can create a custom thread pool profile with the bean ID, customProfile, and reference it from within a route, as follows:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
                id="customProfile"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customProfile">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>
Comments powered by Disqus
loading table of contents...