The Fuse Mediation Router threading model is based on the powerful Java concurrency API, java.util.concurrent, that first became available in Sun's JDK 1.5. The key
interface in this API is the ExecutorService
interface, which represents a
thread pool. Using the concurrency API, you can create many different kinds of thread pool,
covering a wide range of scenarios.
The Fuse Mediation Router thread pool API builds on the Java concurrency API by providing a central
factory (of org.apache.camel.spi.ExecutorServiceStrategy
type) for all of the
thread pools in your Fuse Mediation Router application. Centralising the createion of thread pools in
this way provides several advantages, including:
Simplified creation of thread pools, using utility classes.
Integrating thread pools with graceful shutdown.
Threads automatically given informative names, which is beneficial for logging and management.
Some Fuse Mediation Router components—such as SEDA, JMS, and Jetty—are inherently multi-threaded. These components have all been implemented using the Fuse Mediation Router threading model and thread pool API.
If you are planning to implement your own Fuse Mediation Router component, it is recommended that you
integrate your threading code with the Fuse Mediation Router threading model. For example, if your
component needs a thread pool, it is recommended that you create it using the CamelContext's
ExecutorServiceStrategy
object.
Some of the standard processors in Fuse Mediation Router create their own thread pool by default. These threading-aware processors are also integrated with the Fuse Mediation Router threading model and they provide various options that enable you to customize customize the thread pools that they use.
Table 2.7 shows the various options for controlling and setting thread pools on the threading-aware processors built-in to Fuse Mediation Router.
Table 2.7. Processor Threading Options
Processor | Java DSL | XML DSL |
---|---|---|
aggregate |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
multicast |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
recipientList |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
split |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
threads |
executorService() executorServiceRef() poolSize() maxPoolSize() keepAliveTime() timeUnit() maxQueueSize() rejectedPolicy() |
@executorServiceRef @poolSize @maxPoolSize @keepAliveTime @timeUnit @maxQueueSize @rejectedPolicy |
wireTap |
wireTap(String uri, ExecutorService executorService) wireTap(String uri, String executorServiceRef) |
@executorServiceRef |
To create a default thread pool for one of the threading-aware processors, enable the
parallelProcessing
option, using the parallelProcessing()
sub-clause, in the Java DSL, or the parallelProcessing
attribute, in the XML
DSL.
For example, in the Java DSL, you can invoke the multicast processor with a default thread pool (where the thread pool is used to process the multicast destinations concurrently) as follows:
from("direct:start") .multicast().parallelProcessing() .to("mock:first") .to("mock:second") .to("mock:third");
You can define the same route in XML DSL as follows
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <multicast parallelProcessing="true"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
The default thread pools are automatically created by a thread factory that takes its settings from the default thread pool profile. The default thread pool profile has the settings shown in Table 2.8 (assuming that these settings have not been modified by the application code).
Table 2.8. Default Thread Pool Profile Settings
Thread Option | Default Value |
---|---|
maxQueueSize | 1000 |
poolSize | 10 |
maxPoolSize | 20 |
keepAliveTime | 60
(seconds) |
rejectedPolicy | CallerRuns |
It is possible to change the default thread pool profile settings, so that all subsequent default thread pools will be created with the custom settings. You can change the profile either in Java or in Spring XML.
For example, in the Java DSL, you can customize the poolSize
option and the
maxQueueSize
option in the default thread pool profile, as follows:
// Java import org.apache.camel.spi.ExecutorServiceStrategy; import org.apache.camel.spi.ThreadPoolProfile; ... ExecutorServiceStrategy strategy = context.getExecutorServiceStrategy(); ThreadPoolProfile defaultProfile = strategy.getDefaultThreadPoolProfile(); // Now, customize the profile settings. defaultProfile.setPoolSize(3); defaultProfile.setMaxQueueSize(100); ...
In the XML DSL, you can customize the default thread pool profile, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Note that it is essential to set the defaultProfile
attribute to
true
in the preceding XML DSL example, otherwise the thread pool profile
would be treated like a custom thread pool profile (see Creating a custom thread pool profile), instead of replacing the default thread
pool profile.
It is also possible to specify the thread pool for a threading-aware processor more
directly, using either the executorService
or executorServiceRef
options (where these options are used instead of the parallelProcessing
option). There are two approaches you can use to customize a processor's thread pool, as
follows:
Specify a custom thread pool—explicitly create an
ExecutorService
(thread pool) instance and pass it to theexecutorService
option.Specify a custom thread pool profile—create and register a custom thread pool factory. When you reference this factory using the
executorServiceRef
option, the processor automatically uses the factory to create a custom thread pool instance.
When you pass a bean ID to the executorServiceRef
option, the
threading-aware processor first tries to find a custom thread pool with that ID in the
registry. If no thread pool is registered with that ID, the processor then attempts to look
up a custom thread pool profile in the registry and uses the custom thread pool profile to
instantiate a custom thread pool.
A custom thread pool can be any thread pool of java.util.concurrent.ExecutorService type. The following approaches to creating a thread pool instance are recommended in Fuse Mediation Router:
Use the
org.apache.camel.builder.ThreadPoolBuilder
utility to build the thread pool class.Use the
org.apache.camel.spi.ExecutorServiceStrategy
instance from the currentCamelContext
to create the thread pool class.
Ultimately, there is not much difference between the two approaches, because the
ThreadPoolBuilder
is actually defined using the
ExecutorServiceStrategy
instance. Normally, the
ThreadPoolBuilder
is preferred, because it offers a simpler approach. But
there is at least one kind of thread (the ScheduledExecutorService
) that can
only be created by accessing the ExecutorServiceStrategy
instance
directory.
Table 2.9 shows the options supported by the
ThreadPoolBuilder
class, which you can set when defining a new custom thread
pool.
Table 2.9. Thread Pool Builder Options
Builder Option | Description |
---|---|
maxQueueSize() | Sets the maximum number of pending tasks that this thread pool can store in its
incoming task queue. A value of -1 specifies an unbounded queue.
Default value is taken from default thread pool profile. |
poolSize() | Sets the minimum number of threads in the pool (this is also the initial pool size). Default value is taken from default thread pool profile. |
maxPoolSize() | Sets the maximum number of threads that can be in the pool. Default value is taken from default thread pool profile. |
keepAliveTime() | If any threads are idle for longer than this period of time (specified in seconds), they are terminated. This allows the thread pool to shrink when the load is light. Default value is taken from default thread pool profile. |
rejectedPolicy() |
Specifies what course of action to take, if the incoming task queue is full. You can specify four possible values:
|
build() | Finishes building the custom thread pool and registers the new thread pool
under the ID specified as the argument to build() . |
In Java DSL, you can define a custom thread pool using the
ThreadPoolBuilder
, as follows:
// Java import org.apache.camel.builder.ThreadPoolBuilder; import java.util.concurrent.ExecutorService; ... ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context); ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool"); ... from("direct:start") .multicast().executorService(customPool) .to("mock:first") .to("mock:second") .to("mock:third");
Instead of passing the object reference, customPool
, directly to the
executorService()
option, you can look up the thread pool in the registry, by
passing its bean ID to the executorServiceRef()
option, as follows:
// Java from("direct:start") .multicast().executorServiceRef("customPool") .to("mock:first") .to("mock:second") .to("mock:third");
In XML DSL, you access the ThreadPoolBuilder
using the
threadPool
element. You can then reference the custom thread pool using the
executorServiceRef
attribute to look up the thread pool by ID in the Spring
registry, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPool id="customPool" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customPool"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
If you have many custom thread pool instances to create, you might find it more convenient to define a custom thread pool profile, which acts as a factory for thread pools. Whenever you reference a thread pool profile from a threading-aware processor, the processor automatically uses the profile to create a new thread pool instance. You can define a custom thread pool profile either in Java DSL or in XML DSL.
For example, in Java DSL you can create a custom thread pool profile with the bean ID,
customProfile
, and reference it from within a route, as follows:
// Java import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.impl.ThreadPoolProfileSupport; ... // Create the custom thread pool profile ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile"); customProfile.setPoolSize(5); customProfile.setMaxPoolSize(5); customProfile.setMaxQueueSize(100); context.getExecutorServiceStrategy().registerThreadPoolProfile(customProfile); ... // Reference the custom thread pool profile in a route from("direct:start") .multicast().executorServiceRef("customProfile") .to("mock:first") .to("mock:second") .to("mock:third");
In XML DSL, use the threadPoolProfile
element to create a custom pool
profile (where you let the defaultProfile
option default to false
,
because this is not a default thread pool profile). You can create a
custom thread pool profile with the bean ID, customProfile
, and reference it
from within a route, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPoolProfile id="customProfile" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customProfile"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>