Through the consumer configuration options, there are two different ways in which you can optimize the threading model:
On the broker side, the broker normally dispatches messages to consumers asynchronously, which usually gives the best performance (that is, it enables the broker to cope better with slow consumers). If you are sure that your consumers are always fast, however, you could achieve better performance by disabling asynchronous dispatch on the broker (thereby avoiding the cost of unnecessary context switching).
Broker-side asynchronous dispatching can be enabled or disabled at the granularity of individual consumers. Hence, you can disable asynchronous dispatching for your fast consumers, but leave it enabled for your (possibly) slow consumers.
To disable broker-side asynchronous dispatching, set the
consumer.dispatchAsync
option to false
on the
transport URI used by the consumer. For example, to disable asynchronous
dispatch to the TEST.QUEUE
queue, use the following URI on the
consumer side:
TEST.QUEUE?consumer.dispatchAsync=false
It is also possible to disable asynchronous dispatch by setting the
dispatchAsync
property to false on the ActiveMQ connection
factory—for example:
// Java ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
On the consumer side, there are two layers of threads responsible for
receiving incoming messages: the Session
threads and the
MessageConsumer
threads. In the special case where only one
session is associated with a connection, the two layers are redundant and it is
possible to optimize the threading model by eliminating the thread associated
with the session layer. This section explains how to enable this consumer
threading optimization.
Figure 1.2 gives an overview
of the default threading model on a consumer. The first thread layer is
responsible for pulling messages directly from the transport layer, marshalling
each message, and inserting the message into a queue inside a
javax.jms.Session
instance. The second thread layer consists of
a pool of threads, where each thread is associated with a
javax.jms.MessageConsumer
instance. Each thread in this layer
picks the relevant messages out of the session queue, inserting each message
into a queue inside the javax.jms.MessageConsumer
instance.
Figure 1.3 gives an overview
of the optimized consumer threading model. This threading model can be enabled,
only if there is no more than one session associated with
the connection. In this case, it is possible to optimize away the session
threading layer and the MessageConsumer
threads can then pull
messages directly from the transport layer.
This threading optimization only works, if the following prerequisites are satisfied:
There must only be one JMS session on the connection. If there is more than one session, a separate thread is always used for each session, irrespective of the value of the
alwaysSessionAsync
flag.One of the following acknowledgment modes must be selected:
Session.DUPS_OK_ACKNOWLEDGE
Session.AUTO_ACKNOWLEDGE
To enable the consumer threading optimization, set the
alwaysSessionAsync
option to false
on the
ActiveMQConnectionFactory
(default is
true
).
The following example shows how to initialize a JMS connection and session on
a consumer that exploits the threading optimization by switching off the
alwaysSessionAsync
flag:
// Java ... // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connectionFactory.setAlwaysSessionAsync(false); Connection connection = connectionFactory.createConnection(); connection.start(); // Create the one-and-only session on this connection. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);