The content enricher pattern describes a scenario where the message destination requires more data than is present in the original message. In this case, you would use a content enricher to pull in the extra data from an external resource.
Fuse Mediation Router supports two kinds of content enricher, as follows:
enrich()
—obtains additional data from the resource by sending a copy of the current exchange to a producer endpoint and then using the data from the resulting reply (the exchange created by the enricher is always an InOut exchange).pollEnrich()
—obtains the additional data by polling a consumer endpoint for data. Effectively, the consumer endpoint from the main route and the consumer endpoint inpollEnrich()
are coupled, such that exchanges incoming on the main route trigger a poll of thepollEnrich()
endpoint.
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
The content enricher (enrich
) retrieves additional data from a
resource endpoint in order to enrich an incoming message (contained
in the orginal exchange). An aggregation strategy combines the original
exchange and the resource exchange. The first parameter of the
AggregationStrategy.aggregate(Exchange, Exchange)
method corresponds to
the the original exchange, and the second parameter corresponds to the resource exchange.
The results from the resource endpoint are stored in the resource exchange's
Out message. Here is a sample template for implementing your own
aggregation strategy class:
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getOut().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
Using this template, the original exchange can have any exchange pattern. The resource exchange created by the enricher is always an InOut exchange.
The preceding example can also be implemented in Spring XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich uri="direct:resource" strategyRef="aggregationStrategy"/> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
The aggregation strategy is optional. If you do not provide it, Fuse Mediation Router will use the body obtained from the resource by default. For example:
from("direct:start") .enrich("direct:resource") .to("direct:result");
In the preceding route, the message sent to the direct:result
endpoint contains the output from the direct:resource
, because this
example does not use any custom aggregation.
In XML DSL, just omit the strategyRef
attribute, as follows:
<route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>
The enrich
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
uri
|
The endpoint uri for the external servie to enrich from. You must use either uri or ref . |
|
ref
|
Refers to the endpoint for the external servie to enrich from. You must use either uri or ref . |
|
strategyRef
|
Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. |
The pollEnrich
command treats the resource endpoint as a
consumer. Instead of sending an exchange to the resource endpoint, it
polls the endpoint. By default, the poll returns immediately, if
there is no exchange available from the resource endpoint. For example, the following route
reads a file whose name is extracted from the header of an incoming JMS message:
from("activemq:queue:order") .setHeader(Exchange.FILE_NAME, header("orderId")) .pollEnrich("file://order/data/additional") .to("bean:processOrder");
And if you want to wait at most 20 seconds for the file to be ready, you can use a timeout as follows:
from("activemq:queue:order") .setHeader(Exchange.FILE_NAME, header("orderId")) .pollEnrich("file://order/data/additional", 20000) // timeout is in milliseconds .to("bean:processOrder");
You can also specify an aggregation strategy for pollEnrich
, as
follows:
.pollEnrich("file://order/data/additional", 20000, aggregationStrategy)
![]() | Note |
---|---|
The resource exchange passed to the aggregation strategy's |
![]() | Data from current Exchange not used |
---|---|
|
In general, the pollEnrich()
enricher polls the consumer endpoint using one
of the following polling methods:
receiveNoWait()
(used by default)receive()
receive(long timeout)
The pollEnrich()
command's timeout argument (specified in milliseconds)
determines which method gets called, as follows:
Timeout is
0
or not specified,receiveNoWait
is called.Timeout is negative,
receive
is called.Otherwise,
receive(timeout)
is called.
In this example we enrich the message by loading the content from the file named inbox/data.txt.
from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");
And in XML DSL you do:
<route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt"/> <to uri="direct:result"/> </route>
If there is no file then the message is empty. We can use a timeout to either wait (potential forever) until a file exists, or use a timeout to wait a period. For example to wait up til 5 seconds you can do:
<route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt" timeout="5000"/> <to uri="direct:result"/> </route>
The pollEnrich
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
uri
|
The endpoint uri for the external servie to enrich from. You must use either uri or ref . |
|
ref
|
Refers to the endpoint for the external servie to enrich from. You must use either uri or ref . |
|
strategyRef
|
Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. | |
timeout
|
0
|
Timeout in millis to use when polling from the external service. See below for important details about the timeout. |