Available as of Fuse Mediation Router 2.3
HawtDB is a very lightweight and embedable key value database. It allows together with Fuse Mediation Router to provide persistent support for various Fuse Mediation Router features such as Aggregator.
Current features it provides:
HawtDBAggregationRepository
HawtDBAggregationRepository
is an
AggregationRepository
which on the fly persists the aggregated messages.
This ensures that you will not loose messages, as the default aggregator will use an in memory
only AggregationRepository
.
It has the following options:
Option | Type | Description |
---|---|---|
repositoryName
|
String | A mandatory repository name. Allows you to use a shared HawtDBFile
for multiple repositories. |
persistentFileName
|
String | Filename for the persistent storage. If no file exists on startup a new file is created. |
bufferSize
|
int | The size of the memory segment buffer which is mapped to the file store. By default its 8mb. The value is in bytes. |
sync
|
boolean | Whether or not the HawtDBFile should sync on write or not. Default
is true . By sync on write ensures that its always waiting for all
writes to be spooled to disk and thus will not loose updates. If you disable this option,
then HawtDB will auto sync when it has batched up a number of writes. |
pageSize
|
short | The size of memory pages. By default its 512 bytes. The value is in bytes. |
hawtDBFile
|
HawtDBFile | Use an existing configured
org.apache.camel.component.hawtdb.HawtDBFile instance. |
returnOldExchange
|
boolean | Whether the get operation should return the old existing Exchange if any existed. By
default this option is false to optimize as we do not need the old
exchange when aggregating. |
useRecovery
|
boolean | Whether or not recovery is enabled. This option is by default true .
When enabled the Fuse Mediation Router
Aggregator automatic recover failed aggregated exchange and have them
resubmitted. |
recoveryInterval
|
long | If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis. |
maximumRedeliveries
|
int | Allows you to limit the maximum number of redelivery attempts for a recovered exchange.
If enabled then the Exchange will be moved to the dead letter channel if all redelivery
attempts failed. By default this option is disabled. If this option is used then the
deadLetterUri option must also be provided. |
deadLetterUri
|
String | An endpoint uri for a Dead Letter Channel where
exhausted recovered Exchanges will be moved. If this option is used then the
maximumRedeliveries option must also be provided. |
The repositoryName
option must be provided. Then either the
persistentFileName
or the hawtDBFile
must be
provided.
HawtDBAggregationRepository
will only preserve any
Serializable
compatible data types. If a data type is not such a type its
dropped and a WARN
is logged. And it only persists the
Message
body and the Message
headers. The
Exchange
properties are not persisted.
The HawtDBAggregationRepository
will by default recover any failed
Exchange. It does this by having a background tasks that
scans for failed Exchanges in the persistent store. You can
use the checkInterval
option to set how often this task runs. The recovery
works as transactional which ensures that Fuse Mediation Router will try to recover and redeliver the
failed Exchange. Any Exchange
which was found to be recovered will be restored from the persistent store and resubmitted and
send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Header | Type | Description |
---|---|---|
Exchange.REDELIVERED
|
Boolean | Is set to true to indicate the Exchange is being redelivered. |
Exchange.REDELIVERY_COUNTER
|
Integer | The redelivery attempt, starting from 1. |
Only when an Exchange has been successfully processed it
will be marked as complete which happens when the confirm
method is invoked
on the AggregationRepository
. This means if the same Exchange fails again it will be kept retried until it
success.
You can use option maximumRedeliveries
to limit the maximum number of
redelivery attempts for a given recovered Exchange. You must
also set the deadLetterUri
option so Fuse Mediation Router knows where to send the Exchange when the maximumRedeliveries
was hit.
You can see some examples in the unit tests of camel-hawtdb, for example this test.
In this example we want to persist aggregated messages in the
target/data/hawtdb.dat
file.
public void configure() throws Exception { // create the hawtdb repo HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat"); // here is the Camel route where we aggregate from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) // use our created hawtdb repo as aggregation repository .completionSize(5).aggregationRepository(repo) .to("mock:aggregated"); }
The same example but using Spring XML instead:
<!-- a persistent aggregation repository using camel-hawtdb --> <bean id="repo" class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository"> <!-- store the repo in the hawtdb.dat file --> <property name="persistentFileName" value="target/data/hawtdb.dat"/> <!-- and use repo2 as the repository name --> <property name="repositoryName" value="repo2"/> </bean> <!-- aggregate the messages using this strategy --> <bean id="myAggregatorStrategy" class="org.apache.camel.component.hawtdb.HawtDBSpringAggregateTest$MyAggregationStrategy"/> <!-- this is the camel routes --> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- aggregate using our strategy and hawtdb repo, and complete when we have 5 messages aggregated --> <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo" completionSize="5"> <!-- correlate by header with the key id --> <correlationExpression><header>id</header></correlationExpression> <!-- send aggregated messages to the mock endpoint --> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext>
To use HawtDB in your Fuse Mediation Router routes you need to add the a dependency on camel-hawtdb.
If you use maven you could just add the following to your pom.xml, substituting the version number for the latest & greatest release (see the download page for the latest versions).
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-hawtdb</artifactId> <version>2.3.0</version> </dependency>
See Also:
Aggregator