Available as of Fuse Mediation Router 2.1
The cache component enables you to perform caching operations using EHCache as the Cache Implementation. The cache itself is created on demand or if a cache of that name already exists then it is simply utilized with its original settings.
This component supports producer and event based consumer endpoints.
The Cache consumer is an event based consumer and can be used to listen and respond to specific cache activities. If you need to perform selections from a pre-existing cache, used the processors defined for the cache component.
cache://cacheName[?options]
You can append query options to the URI in the following format,
?option=value&option=value&...
Name | Default Value | Description |
---|---|---|
maxElementsInMemory
|
1000
|
The numer of elements that may be stored in the defined cache |
memoryStoreEvictionPolicy
|
MemoryStoreEvictionPolicy.LFU
|
The number of elements that may be stored in the defined cache. The policy options include:
|
overflowToDisk
|
true
|
Specifies whether cache may overflow to disk. |
eternal
|
false
|
Sets whether elements are eternal. If eternal, timeouts are ignored and the element is never expired. |
timeToLiveSeconds
|
300
|
The maximum time between creation time and when an element expires. Is only used if the element is not eternal. |
timeToIdleSeconds
|
300
|
The maximum amount of time between accesses before an element expires. |
diskPersistent
|
true
|
Whether the disk store persists between restarts of the Virtual Machine. The default
value is false . |
diskExpiryThreadIntervalSeconds
|
120
|
The number of seconds between runs of the disk expiry thread. The default value is 120 seconds. |
cacheManagerFactory
|
null
|
If you want to use a custom factory which instantiates and creates the EHCache,
net.sf.ehcache.CacheManager , you must specify an instance of type,
org.apache.camel.component.cache.CacheManagerFactory . |
eventListenerRegistry
|
null
|
Camel 2.8: Sets a list of EHCache
net.sf.ehcache.event.CacheEventListener for all new caches\- no need
to define it per cache in EHCache xml config anymore. Type:
org.apache.camel.component.cache.CacheEventListenerRegistry |
cacheLoaderRegistry
|
null
|
Camel 2.8: Sets a list of
org.apache.camel.component.cache.CacheLoaderWrapper that extends
EHCache net.sf.ehcache.loader.CacheLoader for all new caches\- no need
to define it per cache in EHCache xml config anymore. Type:
org.apache.camel.component.cache.CacheLoaderRegistry |
Header | Description |
---|---|
CamelCacheOperation
|
The operation to be performed on the cache. The valid options are
|
CamelCacheKey
|
The cache key used to store the message in the cache. The cache key is optional if the
CamelCacheOperation is CamelCacheDeleteAll . |
Header changes in Camel 2.8 | |
---|---|
The header names and supported values have changed to be prefixed with
|
Sending data to the cache involves the ability to direct payloads in exchanges to be stored in a pre-existing or created-on- demand cache. The mechanics of doing this involve:
Setting the Message Exchange Headers shown above.
Ensuring that the Message Exchange Body contains the message directed to the cache.
Receiving data from the cache involves the ability of the CacheConsumer
to
listen on a pre-existing or created-on-demand Cache using an event Listener and receive
automatic notifications when any cache activity take place (i.e
ADD
/UPDATE
/DELETE
/DELETEALL
). Upon such
an activity taking place:
An exchange containing Message Exchange Headers and a Message Exchange Body containing the just added/updated payload is placed and sent.
In case of a
DELETEALL
operation, the Message Exchange HeaderCACHE_KEY
and the Message Exchange Body are not populated.
There are a set of nice processors with the ability to perform cache lookups and selectively replace payload content at the following levels:
Body.
Token.
XPath.
from("cache://MyApplicationCache" + "?maxElementsInMemory=1000" + "&memoryStoreEvictionPolicy=" + "MemoryStoreEvictionPolicy.LFU" + "&overflowToDisk=true" + "&eternal=true" + "&timeToLiveSeconds=300" + "&timeToIdleSeconds=true" + "&diskPersistent=true" + "&diskExpiryThreadIntervalSeconds=300")
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_UPDATE)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_DELETE)) .setHeader(CacheConstants.CACHE_KEY", constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") } };
RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start") .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_DELETEALL)) .to("cache://TestCache1"); } };
RouteBuilder builder = new RouteBuilder() { public void configure() { from("cache://TestCache1") .process(new Processor() { public void process(Exchange exchange) throws Exception { String operation = (String) exchange.getIn().getHeader(CacheConstants.CACHE_OPERATION); String key = (String) exchange.getIn().getHeader(CacheConstants.CACHE_KEY); Object body = exchange.getIn().getBody(); // Do something } }) } };
RouteBuilder builder = new RouteBuilder() { public void configure() { //Message Body Replacer from("cache://TestCache1") .filter(header(CacheConstants.CACHE_KEY).isEqualTo("greeting")) .process(new CacheBasedMessageBodyReplacer("cache://TestCache1","farewell")) .to("direct:next"); //Message Token replacer from("cache://TestCache1") .filter(header(CacheConstants.CACHE_KEY).isEqualTo("quote")) .process(new CacheBasedTokenReplacer("cache://TestCache1","novel","#novel#")) .process(new CacheBasedTokenReplacer("cache://TestCache1","author","#author#")) .process(new CacheBasedTokenReplacer("cache://TestCache1","number","#number#")) .to("direct:next"); //Message XPath replacer from("cache://TestCache1"). .filter(header(CacheConstants.CACHE_KEY).isEqualTo("XML_FRAGMENT")) .process(new CacheBasedXPathReplacer("cache://TestCache1","book1","/books/book1")) .process (new CacheBasedXPathReplacer("cache://TestCache1","book2","/books/book2")) .to("direct:next"); } };
from("direct:start") // Prepare headers .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_GET)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")). .to("cache://TestCache1"). // Check if entry was not found .choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull()). // If not found, get the payload and put it to cache .to("cxf:bean:someHeavyweightOperation"). .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") .end() .to("direct:nextPhase");
Note: CHECK command tests existence of the entry in the cache but doesn't place message to the body.
from("direct:start") // Prepare headers .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_CHECK)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")). .to("cache://TestCache1"). // Check if entry was not found .choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull()). // If not found, get the payload and put it to cache .to("cxf:bean:someHeavyweightOperation"). .setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)) .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")) .to("cache://TestCache1") .end();
EHCache has its own statistics and management from JMX.
Here's a snippet on how to expose them via JMX in a Spring application context:
<bean id="ehCacheManagementService" class="net.sf.ehcache.management.ManagementService" init-method="init" lazy-init="false"> <constructor-arg> <bean class="net.sf.ehcache.CacheManager" factory-method="getInstance"/> </constructor-arg> <constructor-arg> <bean class="org.springframework.jmx.support.JmxUtils" factory-method="locateMBeanServer"/> </constructor-arg> <constructor-arg value="true"/> <constructor-arg value="true"/> <constructor-arg value="true"/> <constructor-arg value="true"/> </bean>
Of course you can do the same thing in straight Java:
ManagementService.registerMBeans(CacheManager.getInstance(), mbeanServer, true, true, true, true);
You can get cache hits, misses, in-memory hits, disk hits, size stats this way. You can also change CacheConfiguration parameters on the fly.
Camel Cache component is able to distribute cache across server nodes using several different replication mechanisms like: RMI, JGroups, JMS and Cache Server.
There are two different ways To make it work:
Either configure
ehcache.xml
manually;Or configure the following options:
cacheManagerFactory
eventListenerRegistry
cacheLoaderRegistry
Configuring cache replication using the first option (that is,
ehcache.xml
) can be difficult, because you have to configure all caches
separately. If not all of the cache names are known, using ehcache.xml
is
not a good idea.
The second option is much better when you want to use many different caches, because you
do not need to define options per cache. This is because replication options are set per
CacheManager
and per CacheEndpoint
. Also, this is the
only feasible approach, if the cache names are not known at development time.
Note | |
---|---|
We recommend reading the EHCache manual to gain a better understanding of the Camel Cache replication mechanism. |