LibraryLink ToToggle FramesPrintFeedback

Selective Consumer

The selective consumer pattern describes a consumer that applies a filter to incoming messages, so that only messages meeting a specific selection criterion are processed.


You can implement the selective consumer pattern in FUSE Mediation Router using one of the following approaches:

A JMS selector is a predicate expression involving JMS headers and JMS properties: if the selector evaluates to true, the JMS message is allowed to reach the consumer; if the selector evaluates to false, the JMS message is blocked. For example, to consume messages from the queue, selective, and select only those messages whose country code property is equal to US, you could use the following Java DSL route:

from("jms:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")).
    to("cxf:bean:replica01");

Where the selector string, CountryCode='US', must be URL encoded (using UTF-8 characters) in order to avoid trouble with parsing the query options. This example presumes that the JMS property, CountryCode, was set by the sender. For more details about JMS selectors, see JMS selectors.

[Note]Note

If a selector is applied to a JMS queue, messages that are not selected remain on the queue (and are thus potentially available to other consumers attached to the same queue).

You can also define JMS selectors on ActiveMQ endpoints. For example:

from("acivemq:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")).
    to("cxf:bean:replica01");

For more details, see ActiveMQ: JMS Selectors and ActiveMQ Message Properties.

If it is not possible to set a selector on the consumer endpoint, you can insert a filter processor into your route instead. For example, you could define a selective consumer that processes only messages with a US country code using Java DSL, as follows:

from("seda:a").filter(header("CountryCode").isEqualTo("US")).process(myProcessor);

The same route can be defined using XML configuration, as follows:

<camelContext id="buildCustomProcessorWithFilter" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="seda:a"/>
    <filter>
      <xpath>$CountryCode = 'US'</xpath>
      <process ref="#myProcessor"/>
    </filter>
  </route>
</camelContext>

For more information about the FUSE Mediation Router filter processor, see Message Filter.

[Warning]Warning

Be careful about using a message filter to select messages from a JMS queue. When using a filter processor, blocked messages are simply discarded. Hence, if the messages are consumed from a queue (which allows each message to be consumed only once—see Competing Consumers), blocked messages would not be processed at all. This might not be the behavior you want.