servicemix-eip
The servicemix-eip component is a routing container where different routing patterns can be deployed as service unit.
This component is based on the great Enterprise Integration Patterns book.
Supported patterns:
In addition, this component can use all ServiceMix flows (including clustered and transactional flows), can be configured to be resilient to crashes and supports full fail-over to another node when clustered.
Using servicemix-eip as a standard JBI component
Installation
Installing the servicemix-eip component can be done in several ways:
- drop the installer zip in an hotdeploy directory monitored by ServiceMix
- using ant tasks
Note that when using ant tasks, the component is not started, you will have to start it manually using ant tasks or a console.
Configuration
For a complete list of configuration options for the supported EIPs, take a look at the XSD and generated HTML for this component on the Xml schemas page.
TODO: configure a persistent / clustered store
Service Unit packaging
Content of xbean.xml file to be packaged as a SU
<beans xmlns:eip="http://servicemix.apache.org/eip/1.0">
... add eip patterns here ...
</beans>
Using servicemix-eip in a ServiceMix xml configuration file
<beans xmlns:sm="http://servicemix.apache.org/config/1.0"
xmlns:eip="http://servicemix.apache.org/eip/1.0">
<sm:container ...>
<sm:activationSpecs>
<sm:activationSpec>
<sm:component>
<eip:component>
<eip:endpoints>
... add eip patterns here ...
</eip:endpoints>
</eip:component>
</sm:component>
</sm:activationSpec>
...
</sm:activationSpecs>
</sm:container>
...
</beans>
Patterns
Content-Based router
ContentBasedRouter can be used for all kind of content-based routing.
This pattern implements the Content-Based Router pattern.
<eip:content-based-router service="test:router" endpoint="endpoint">
<eip:rules>
<eip:routing-rule>
<eip:predicate>
<eip:xpath-predicate xpath="count(/test:echo) = 1" namespaceContext="#nsContext" />
</eip:predicate>
<eip:target>
<eip:exchange-target uri="endpoint:http://test/pipeline/endpoint" />
</eip:target>
</eip:routing-rule>
<eip:routing-rule>
<eip:target>
<eip:exchange-target service="test:recipients" />
</eip:target>
</eip:routing-rule>
</eip:rules>
</eip:content-based-router>
Message Filter
MessageFilter allows filtering incoming JBI exchanges. As it drops unwanted messages and in an InOut exchange a response is required, MessageFilter and InOut MEPs cannot be used together.
This pattern implements the Message Filter pattern.
<eip:message-filter service="test:messageFilter" endpoint="endpoint">
<eip:target>
<eip:exchange-target service="test:trace3" />
</eip:target>
<eip:filter>
<eip:xpath-predicate xpath="count(/test:world) = 1" namespaceContext="#nsContext"/>
</eip:filter>
</eip:message-filter>
Pipeline
The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and an In-Out MEP. When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP to the tranformer destination and forward the response in an In-Only MEP to the target destination.
The old org.apache.servicemix.components.util.PipelineComponent will be deprecated. This one offers the same feature but can be safely clustered and use in a transactional enviromnent.
<eip:pipeline service="test:pipeline" endpoint="endpoint">
<eip:transformer>
<eip:exchange-target service="test:routingSlip" />
</eip:transformer>
<eip:target>
<eip:exchange-target service="test:trace2" />
</eip:target>
</eip:pipeline>
In the default configuration, faults sent by the transformer component are sent back to the consumer as faults if the exchange MEP supports them, or as errors (for InOnly exchanges). This behavior can be changed by setting the sendFaultsToTarget attribute to true, in which case faults will be sent to the target component, or by adding a faultsTarget element where faults should be sent.
Static Recipient List
The StaticRecipientList component will forward an input In-Only or Robust-In-Only exchange to a list of known recipients.
This component implements the Recipient List pattern, with the limitation that the recipient list is static.
<eip:static-recipient-list service="test:recipients" endpoint="endpoint">
<eip:recipients>
<eip:exchange-target service="test:messageFilter" />
<eip:exchange-target service="test:trace4" />
</eip:recipients>
</eip:static-recipient-list>
Static Routing Slip
A RoutingSlip component can be used to route an incoming In-Out exchange through a series of target services.
This component implements the Routing Slip pattern, with the limitation that the routing table is static.
This component only uses In-Out MEPs and errors or faults sent by targets are reported back to the consumer, thus interrupting the routing process.
<eip:static-routing-slip service="test:routingSlip" endpoint="endpoint">
<eip:targets>
<eip:exchange-target service="test:echo" />
<eip:exchange-target service="test:echo" />
</eip:targets>
</eip:static-routing-slip>
Wire Tap
A WireTap component can be used to forward a copy of the input message to a listener in a proxy fashion.
This component implements the WireTap pattern.
It can handle all four standard MEPs, but will only send an In-Only MEP to the listener.
The originating service must be configured to send messages to the WireTap directly.
In the case of an In-Out MEP, this means that the WireTap needs to be configured to send the exchange along to the destination service.
<eip:wire-tap service="test:wireTap" endpoint="endpoint">
<eip:target>
<eip:exchange-target service="test:xpathSplitter" />
</eip:target>
<eip:inListener>
<eip:exchange-target service="test:trace1" />
</eip:inListener>
</eip:wire-tap>
Similar to the example above, the WireTap can also be used:
- to forward the output message of an exchange using <eip:outListener/>
- to forward the fault message of an exchange using <eip:faultListener/>
XPath Splitter
The XPathSplitter component implements the Splitter pattern using an xpath expression to split the incoming xml.
<eip:xpath-splitter service="test:xpathSplitter" endpoint="endpoint"
xpath="/*/*" namespaceContext="#nsContext">
<eip:target>
<eip:exchange-target uri="service:http://test/router" />
</eip:target>
</eip:xpath-splitter>
SplitAggregator
The SplitAggregator is an aggregator mainly usefull to collect messages that have been created using a splitter.
It relies on several properties that should be set on the exchanges (count, index, correlationId).
<eip:split-aggregator service="test:aggregator" endpoint="endpoint">
<eip:target>
<eip:exchange-target service="test:trace5" />
</eip:target>
</eip:split-aggregator>
Content Enricher
With a Content Enricher you can extract additional information from a source and add this information to your message. This is useful if the calling service for example extracts a 'userID' and your target system is only aware of a 'userName'. By using the Content-Enricher you could extract this information from a source system and add this additional information ('userName') to your message.
<eip:content-enricher service="test:contentEnricher" endpoint="endpoint">
<eip:enricherTarget>
<eip:exchange-target service="test:additionalInformationExtracter" />
</eip:enricherTarget>
<eip:target>
<eip:exchange-target service="test:myTarget" />
</eip:target>
</eip:content-enricher>
Resequencer
A resequencer re-orders incoming In-Only or Robust-In-Only exchanges and sends them synchronously to a targets service. Synchronous sending ensures that messages arrive in correct order at the target service. This component implements the Resequencer pattern.
It works on (continuous) streams of message exchanges using a timeout policy. Since the resequencer doesn't make batch reads there's no need to know the number of messages to be re-ordered in advance (although a capacity parameter prevents the resequencer from running out of memory). If the maximum out-of-sequence time difference between messages in a message stream is known, the resequencer's timeout parameter should be set to this value (milliseconds). In this case it is guaranteed that all elements of a stream are delivered in correct order to the target service. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages sent by this resequencer. Large timeout values should be supported by sufficiently high capacity values.
For comparing elements of a sequence the resequencer component can be configured with a sequence element comparator. A default comparator is provided that compares message exchanges based on Long sequence numbers. This comparator expects the sequence number to be the value of the org.apache.servicemix.eip.sequence.number property of the exchanges's in-NormalizedMessage. The name of the property can be customized in the comparator configuration (see below). You may also provide a custom comparator by implementing the SequenceElementComparator interface.
<eip:resequencer
service="sample:Resequencer"
endpoint="ResequencerEndpoint"
comparator="#comparator"
capacity="100"
timeout="2000">
<eip:target>
<eip:exchange-target service="sample:SampleTarget" />
</eip:target>
</eip:resequencer>
<eip:default-comparator id="comparator" sequenceNumberKey="seqnum"/>
A running example can be downloaded from here. In this example, a custom-coded message sender sends messages in "wrong" order to the resequencer. The resequencer re-orders these messages and (synchronously) sends them to a file sender-endpoint. The file sender-enpoint writes the messages (in proper order) to the work/output directory.
AsyncBridge
The AsyncBridge is the opposite of the Pipeline: it bridges an InOut exchange with two InOnly exchanges.
The AsyncBridge expects an InOut mep as input. It then uses the exchange id of the InOut mep as the correlation id and creates an InOnly message by copying the input message and sends it to the target (with the correlation id set as a property). Next it expects an InOnly to come back with the same correlation id property. When this happens, the message is copied to the out message of the original exchange and sent back. If no response is received during the configured amount of time (timeout property in milliseconds), an error will be sent back to the original consumer.
<eip:async-bridge
service="sample:AsyncBridge"
endpoint="AsyncBridgeEndpoint"
<eip:target>
<eip:exchange-target service="sample:SampleTarget" />
</eip:target>
</eip:async-bridge>
Correlation Id
There is a convention between the AsyncBridge and the target on how the correlation id is transmitted. The correlation id can only be transmitted from the AnsycBridge to the target using a message property . The property name can be customized. On the other hand, the correlation id coming back from the target could be set in a message property or the message payload. The AsyncBridge could use an Expression to extract the correlation id from the message returning from the target.
<eip:async-bridge
service="sample:AsyncBridge"
endpoint="AsyncBridgeEndpoint"
responseCorrIdProperty="correlationIdProperty"
responseCorrId="#responseCorrIdExpression">
<eip:target>
<eip:exchange-target service="sample:SampleTarget" />
</eip:target>
</eip:async-bridge>
<bean id="responseCorrIdExpression" class="org.apache.servicemix.expression.JAXPStringXPathExpression" >
<contructor-arg value="/my-response/message/@corrId"/>
</bean>
As you can see from the sample above the responseCorrIdProperty is used to set the name of the property that the target will query to get the correlation id sent by the AsyncBridge. In other words, the target will do something like this to extract the correlation id:
String correlationId = exchange.getProperty("correlationIdProperty");
The responseCorrId is set with an instance of type org.apache.servicemix.expression.Expression, in this case the class org.apache.servicemix.expression.JAXPStringXPathExpression.
This expression resolves the location of the correlation id coming back from the target. In the above example the expression shows that the correlation id comes as part of the message payload in an attribute called "corrId" of the /my-response/message element. In a similar manner the class org.apache.servicemix.expression.PropertyExpression could have been used to locate the correlation id in a message property.
Tips
ExchangeTarget
All patterns use the <exchange-target /> tag to specify the target of a JBI exchange.
This element has the following attributes:
Name |
Type |
Description |
interface |
QName |
the QName of the target interface. One of service or interface attribute is required |
operation |
QName |
the QName of the target operation (optional) |
service |
QName |
the QName of the target service. One of service or interface attribute is required |
endpoint |
String |
the name of the target JBI endpoint, only used when service is set |
uri |
String |
uri used to target the exchange (see URIs) |
If you want to target a given interface, just set the interface attribute. Else you have to set the service attribute, and optionally the endpoint attribute if you want to specify the JBI endpoint instead of a service name.
NamespaceContext
Some patterns use XPath expression. To use such expressions on an xml with namespaces, you need to define a NamespaceContext.
<eip:namespace-context id="nsContext">
<eip:namespaces>
<eip:namespace prefix="test">http://test</eip:namespace>
</eip:namespaces>
</eip:namespace-context>
This NamespaceContext can be referenced by a namespaceContext attribute as shown in the XPathSplitter or MessageFilter examples.
Predicates
Some patterns uses predicates to test a given JBI exchange. The only predicate currently implemented is the XPathPredicate, but you can implement your own and deploy it with the service unit.
TODO: link to a page documenting the classpath / location elements for deploying code inside xbean SU
Configuring temporary message storage
Many of the pattern implementation need to store MessageExchanges temporarily. An example: the aggregator will need to keep track of the MessageExchange it is aggregating. By default, the EIPs use a plain MemoryStoreFactory to create in-memory stores, but there are other options. If you set the timeout property on the MemoryStoreFactory, it will evict old object from the in-memory store to avoid a memory leak. You can also use a JDBCStoreFactory to store data in a database instead of in memory.
Example: to use an in-memory store with timeout for a storing active and closed aggregations in a <split-aggregator/>, you can do
<eip:split-aggregator service="test:aggregator" endpoint="endpoint"
storeFactory="#StoreFactory" closedAggregateStoreFactory="#StoreFactory">
<eip:target>
<eip:exchange-target service="test:trace5" />
</eip:target>
</eip:split-aggregator>
<bean id="StoreFactory" class="org.apache.servicemix.store.MemoryStoreFactory">
<property name="timeout" value="120000"/>
</bean>
Creating your own patterns
Some classes have been designed to be extensible, this includes:
- org.apache.servicemix.eip.support.AbstractAggregator
- org.apache.servicemix.eip.support.AbstractSplitter