JMS Messages Listeners

JMS Message Listeners allow you to listen for incoming JMS messages. You specify the source of the messages (JMS Topic or JMS Queue) and a task that will be executed for each incoming message.

JMS messaging requires a JMS API (jms.jar) and specific third-party libraries. Every one of these libraries must be available on an application server classpath. Some application servers contain these libraries by default; however, some do not. In such a case, libraries must be added explicitly before starting the CloverETL Server.

JMS is a complex topic that goes beyond the scope of this document. For more detailed information about JMS, refer to the Oracle website: http://docs.oracle.com/javaee/6/tutorial/doc/bncdq.html

Note that the JMS implementation is dependent on the application server that the CloverETL Server is running in.

In Cluster, you can either explicitly specify which node will listen to JMS or not. If unspecified, all nodes will register as listeners. In the case of JMS Topic, all nodes will get the message and will trigger the task (multiple instances) or, in the case of JMS Queue, a random node will consume the message and will run the task (just one instance).

Table 24.2. Attributes of JMS message task

AttributeDescription
Initialize by This attribute makes sense only in cluster environment. It is node ID where the listener should be initialized. If it is not set, listener is initialized on all nodes in the cluster.

In the Cluster environment, each JMS event listener has a "Node IDs" attribute which may be used for specification which cluster node will consume messages from the queue/topic. There are following possibilities:

  • No failover: Just one node ID specified - Only specified node may consume messages, however node status must be "ready". When the node isn't ready, messages aren't consumed by any cluster node.

  • Failover with node concurrency: No node ID specified (empty input) - All cluster nodes with status "ready" consume messages concurrently.

  • Failover with node reservation: More node IDs specified (separated by a comma) - Just one of specified nodes consumes messages at a time. If the node fails from any reason (or its status isn't "ready"), any other "ready" node from the list continues with consuming messages.

In a standalone environment, the "Node IDs" attribute is ignored.

JNDI Access
Initial contextDefault or custom
Initial context factory class

A full class name of javax.naming.InitialContext implementation. Each JMS provider has its own implementation. E.g. Apache MQ has "org.apache.activemq.jndi.ActiveMQInitialContextFactory". If it is empty, server uses default initial context.

Specified class must be on web-app classpath or application-server classpath. It is usually included in one library with a JMS API implementation for each specific JMS broker provider.

Broker URLURL of a JMS message broker
Listen To
Connection factoryJNDI name of a connection factory. It depends on a JMS provider.
UsernameUsername for connection to a JMS message broker
PasswordPassword for connection to JMS message broker
Queue/TopicJNDI name of a message queue/topic on the server
Durable subscriber

If it is false, message consumer is connected to the broker as "non-durable", so it receives only messages which are sent while the connection is active. Other messages are lost.

If the attribute is true, the consumer is subscribed as "durable" so it receives even messages which are sent while the connection is inactive. The broker stores such messages until they can be delivered or until the expiration is reached.

This switch makes sense only for Topics destinations, because Queue destinations always store messages until they can be delivered or the expiration is reached.

Please note, that consumer is inactive i.e. during server restart and during short moment when user updates the "JMS message listener" ant it must be re-initialized. So during these intervals the message in the Topic may get lost if the consumer does not have durable subscription.

If the subscription is durable, client must have "ClientId" specified. This attribute can be set in different ways in dependence on JMS provider. E.g. for ActiveMQ, it is set as a URL parameter tcp://localhost:1244?jms.clientID=TestClientID

Message selector This "query string" can be used as specification of conditions for filtering incoming messages. Syntax is well described on Java EE API web site: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html It has different behaviour depending on type of consumer (queue/topic)

Queue: If a its a queue the messages that are filtered out remain on the queue.

Topic: Messages filtered out by a Topic subscriber's message selector will never be delivered to the subscriber. From the subscriber's perspective, they do not exist.

Message Processing
Number of consumersE.g. 1
Groovy code Groovy code may be used for additional message processing and/or for refusing message. Both features are described below.

Optional Groovy code

Groovy code may be used for additional message processing or for refusing a message.

  • Additional message processing Groovy code may modify/add/remove values stored in the containers "properties" and "data".

  • Refuse/acknowledge the message If the Groovy code returns Boolean.FALSE, the message is refused. Otherwise, the message is acknowledged. A refused message may be redelivered, however the JMS broker should configure a limit for redelivering messages. If the groovy code throws an exception, it’s considered a coding error and the JMS message is NOT refused because of it. So, if the message refusal is to be directed by some exception, it must be handled in groovy.

Table 24.3. Variables accessible in groovy code

typekeydescription
javax.jms.Messagemsginstance of a JMS message
java.util.PropertiespropertiesSee below for details. It contains values (String or converted to String) read from a message and it is passed to the task which may use them somehow. E.g. "execute graph" task passes these parameters to the executed graph.
java.util.Map<String, Object>dataSee below for details. Contains values (Object, Stream, ..) read or proxied from the message instance and it is passed to the task which may use them somehow. E.g. "execute graph" task passes it to the executed graph as "dictionary entries".
javax.servlet.ServletContextservletContextinstance of ServletContext
com.cloveretl.server.api.ServerFacadeserverFacadeinstance of serverFacade usable for calling CloverETL Server core features.
java.lang.StringsessionTokensessionToken, needed for calling serverFacade methods

Message data available for further processing

A JMS message is processed and the data it contains is stored into two data structures: Properties and Data.

Table 24.4. Properties Elements

keydescription
JMS_PROP_[property key]For each message property is created one entry, where "key" is made of a "JMS_PROP_" prefix and property key.
JMS_MAP_[map entry key]If the message is instance of MapMessage, for each map entry is created one entry, where "key" is made of "JMS_MAP_" prefix and map entry key. Values are converted to String.
JMS_TEXTIf the message is instanceof TextMessage, this property contains content of the message.
JMS_MSG_CLASSClass name of message implementation
JMS_MSG_CORRELATIONIDCorrelation ID is either a provider-specific message ID or an application-specific String value
JMS_MSG_DESTINATIONThe JMSDestination header field contains the destination to which the message is being sent.
JMS_MSG_MESSAGEIDA JMSMessageID is a String value that should function as a unique key for identifying messages in a historical repository. The exact scope of uniqueness is provider-defined. It should at least cover all messages for a specific installation of a provider, where an installation is some connected set of message routers.
JMS_MSG_REPLYTODestination to which a reply to this message should be sent.
JMS_MSG_TYPEMessage type identifier supplied by the client when the message was sent.
JMS_MSG_DELIVERYMODEThe DeliveryMode value specified for this message.
JMS_MSG_EXPIRATIONThe time the message expires, which is the sum of the time-to-live value specified by the client and the GMT at the time of the send.
JMS_MSG_PRIORITYThe JMS API defines ten levels of priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.
JMS_MSG_REDELIVERED"true" if this message is being redelivered.
JMS_MSG_TIMESTAMPThe time a message was handed off to a provider to be sent. It is not the time the message was actually transmitted, because the actual send may occur later due to transactions or other client-side queueing of messages.

Note that all values in the “Properties” structure are stored as String type – however they are numbers or text.

For backwards compatibility, all listed properties can also be accessed using lower-case keys; it is, however, a deprecated approach.

Table 24.5. "data" elements

keydescription
JMS_MSGinstance of javax.jms.Message
JMS_DATA_STREAMInstance of java.io.InputStream. Accessible only for TextMessage, BytesMessage, StreamMessage, ObjectMessage (only if payload object is instance of String). Strings are encoded in UTF-8.
JMS_DATA_TEXTInstance of String. Only for TextMessage and ObjectMessage, where payload object is instance of String.
JMS_DATA_OBJECTInstance of java.lang.Object - message payload. Only for ObjectMessage.

The “Data” container is passed to a task that can use it, depending on its implementation. For example, the task "execute graph" passes it to the executed graph as “dictionary entries.”

In the Cluster environment, you can specify explicitly node IDs, which can execute the task. However, if the “data” payload is not serializable and the receiving and executing node differ, an error will be thrown as the Cluster cannot pass the “data” to the executing node.

Inside a graph or a jobflow, data passed as dictionary entries can be used in some component attributes. For example, a File URL attribute would look like: "dict:JMS_DATA_STREAM:discrete" for reading the data directly from the incoming JMS message using a proxy stream.

For backwards compatibility, all listed dictionary entries can also be accessed using lower-case keys; it is, however, a deprecated approach.