Oracle GlassFish Server Message Queue Developer's Guide for Java Clients Release 4.5.2 Part Number E24945-01 |
|
|
View PDF |
This chapter describes how to use the classes and methods of the Message Queue Java application programming interface (API) to accomplish specific tasks, and provides brief code samples to illustrate some of these tasks. (For clarity, the code samples shown in the chapter omit an exception check.) The topics covered include the following:
This chapter does not provide exhaustive information about each class and method. For detailed reference information, see the JavaDoc documentation for each individual class. For information on the practical design of Message Queue Java programs, see Message Queue Clients: Design and Features.
The Java Message Service (JMS) specification, which Message Queue implements, supports two commonly used models of interaction between message clients and message brokers, sometimes known as messaging domains:
In the point-to-point (or PTP) messaging model, each message is delivered from a message producer to a single message consumer. The producer delivers the message to a queue, from which it is later delivered to one of the consumers registered for the queue. Any number of producers and consumers can interact with the same queue, but each message is guaranteed to be delivered to (and be successfully consumed by) exactly one consumer and no more. If no consumers are registered for a queue, it holds the messages it receives and eventually delivers them when a consumer registers.
In the publish/subscribe (or pub/sub) model, a single message can be delivered from a producer to any number of consumers. The producer publishes the message to a topic, from which it is then delivered to all active consumers that have subscribed to the topic. Any number of producers can publish messages to a given topic, and each message can be delivered to any number of subscribed consumers. The model also supports the notion of durable subscriptions, in which a consumer registered with a topic need not be active at the time a message is published; when the consumer subsequently becomes active, it will receive the message. If no active consumers are registered for a topic, the topic does not hold the messages it receives unless it has inactive consumers with durable subscriptions.
JMS applications are free to use either of these messaging models, or even to mix them both within the same application. Historically, the JMS API provided a separate set of domain-specific object classes for each model. While these domain-specific interfaces continue to be supported for legacy purposes, client programmers are now encouraged to use the newer unified domain interface, which supports both models indiscriminately. For this reason, the discussions and code examples in this manual focus exclusively on the unified interfaces wherever possible. Table 2-1 shows the API classes for all three domains.
Table 2-1 Interface Classes for Messaging Domains
Unified Domain | Point-to-Point Domain | Publish/Subscribe Domain |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
All messaging occurs within the context of a connection. Connections are created using a connection factory encapsulating all of the needed configuration properties for connecting to a particular JMS provider. A connection's configuration properties are completely determined by the connection factory, and cannot be changed once the connection has been created. Thus the only way to control the properties of a connection is by setting those of the connection factory you use to create it.
Typically, a connection factory is created for you by a Message Queue administrator and preconfigured, using the administration tools described in "Administrative Tasks and Tools" in Oracle GlassFish Server Message Queue Administration Guide with whatever property settings are appropriate for connecting to particular JMS provider. The factory is then placed in a publicly available administered object store, where you can access it by name using the Java Naming and Directory Interface (JNDI) API. This arrangement has several benefits:
It allows the administrator to control the properties of client connections to the provider, ensuring that they are properly configured.
It enables the administrator to tune performance and improve throughput by adjusting configuration settings even after an application has been deployed.
By relying on the predefined connection factory to handle the configuration details, it helps keep client code provider-independent and thus more easily portable from one JMS provider to another.
Sometimes, however, it may be more convenient to dispense with JNDI lookup and simply create your own connection factory by direct instantiation. Although hard-coding configuration values for a particular JMS provider directly into your application code sacrifices flexibility and provider-independence, this approach might make sense in some circumstances: for example, in the early stages of application development and debugging, or in applications where reconfigurability and portability to other providers are not important concerns.
The following sections describe these two approaches to obtaining a connection factory: by JNDI lookup or direct instantiation.
Example 2-1 shows how to look up a connection factory object in the JNDI object store. The code example is explained in the procedure that follows.
Note:
If a Message Queue client is a J2EE component, JNDI resources are provided by the J2EE container. In such cases, JNDI lookup code may differ from that shown here; see your J2EE provider documentation for details.
Example 2-1 Looking Up a Connection Factory
// Create the environment for constructing the initial JNDI // naming context. Hashtable env = new Hashtable(); // Store the environment attributes that tell JNDI which initial context // factory to use and where to find the provider.// env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects"); // Create the initial context. Context ctx = new InitialContext(env); // Look up the connection factory object in the JNDI object store. String CF_LOOKUP_NAME = "MyConnectionFactory"; ConnectionFactory myFactory = (ConnectionFactory) ctx.lookup (CF_LOOKUP_NAME);
Follow this procedure:
Create the environment for constructing the initial JNDI naming context.
How you create the initial context depends on whether you are using a file-system object store or a Lightweight Directory Access Protocol (LDAP) server for your Message Queue administered objects. The code shown here assumes a file-system store; for information about the corresponding LDAP object store attributes, see "Using an LDAP User Repository" in Oracle GlassFish Server Message Queue Administration Guide.
The constructor for the initial context accepts an environment parameter, a hash table whose entries specify the attributes for creating the context:
Hashtable env = new Hashtable();
You can also set an environment by specifying system properties on the command line, rather than programmatically. For instructions, see the README
file in the JMS example applications directory.
Store the environment attributes that tell JNDI which initial context factory to use and where to find the JMS provider.
The names of these attributes are defined as static constants in class Context
:
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects");
Note:
The directory represented by C:/
imq_admin_objects must already exist; if necessary, you must create the directory before referencing it in your code.
Create the initial context.
Context ctx = new InitialContext(env);
If you use system properties to set the environment, omit the environment parameter when creating the context:
Context ctx = new InitialContext();
Look up the connection factory object in the administered object store and typecast it to the appropriate class:
String CF_LOOKUP_NAME = "MyConnectionFactory"; ConnectionFactory myFactory = (ConnectionFactory) ctx.lookup(CF_LOOKUP_NAME);
The lookup name you use, CF_LOOKUP_NAME
, must match the name used when the object was stored.
You can now proceed to use the connection factory to create connections to the message broker, as described under Using Connections.
It is recommended that you use a connection factory just as you receive it from a JNDI lookup, with the property settings originally configured by your Message Queue administrator. However, there may be times when you need to override the preconfigured properties with different values of your own. You can do this from within your application code by calling the connection factory's setProperty
method. This method (inherited from the superclass AdministeredObject
) takes two string arguments giving the name and value of the property to be set. The property names for the first argument are defined as static constants in the Message Queue class ConnectionConfiguration
: for instance, the statement
myFactory.setProperty(ConnectionConfiguration.imqDefaultPassword, "mellon");
sets the default password for establishing broker connections. See "Connection Factory Attributes" in Oracle GlassFish Server Message Queue Administration Guide for complete information on the available connection factory configuration attributes.
It is also possible to override connection factory properties from the command line, by using the -D
option to set their values when starting your client application. For example, the command line
java -DimqDefaultPassword=mellon MyMQClient
starts an application named MyMQClient
with the same default password as in the preceding example. Setting a property value this way overrides any other value specified for it, whether preconfigured in the JNDI object store or set programmatically with the setProperty
method.
Note:
A Message Queue administrator can prevent a connection factory's properties from being overridden by specifying that the object be read-only when placing it in the object store. The properties of such a factory cannot be changed in any way, whether with the -D
option from the command line or using the setProperty
method from within your client application's code. Any attempt to override the factory's property values will simply be ignored.
Example 2-2 shows how to create a connection factory object by direct instantiation and configure its properties.
Example 2-2 Instantiating a Connection Factory
// Instantiate the connection factory object. com.sun.messaging.ConnectionFactory myFactory = new com.sun.messaging.ConnectionFactory(); // Set the connection factory's configuration properties. myFactory.setProperty(ConnectionConfiguration.imqAddressList, "localhost:7676,broker2:5000,broker3:9999");
The following procedure explains each program satement in the previous code sample.
Follow this procedure:
Instantiate the connection factory object.
The name ConnectionFactory
is defined both as a JMS interface (in package javax.jms
) and as a Message Queue class (in com.sun.messaging
) that implements that interface. Since only a class can be instantiated, you must use the constructor defined in com.sun.messaging
to create your connection factory object. Note, however, that you cannot import the name from both packages without causing a compilation error. Hence, if you have imported the entire package javax.jms.*
, you must qualify the constructor with the full package name when instantiating the object:
com.sun.messaging.ConnectionFactory myFactory = new com.sun.messaging.ConnectionFactory();
Notice that the type declaration for the variable myFactory
, to which the instantiated connection factory is assigned, is also qualified with the full package name. This is because the setProperty
method, used in Instantiating a Connection Factory, belongs to the ConnectionFactory
class defined in the package com.sun.messaging
, rather than to the ConnectionFactory
interface defined in javax.jms
. Thus in order for the compiler to recognize this method, myFactory
must be typed explicitly as com.sun.messaging.ConnectionFactory
rather than simply ConnectionFactory
(which would resolve to javax.jms.ConnectionFactory
after importing javax.jms.*
).
Set the connection factory's configuration properties.
The most important configuration property is imqAddressList
, which specifies the host names and port numbers of the message brokers to which the factory creates connections. By default, the factory returned by the ConnectionFactory
constructor in Instantiating a Connection Factory is configured to create connections to a broker on host localhost
at port number 7676
. If necessary, you can use the setProperty
method, described in the preceding section, to change that setting:
myFactory.setProperty(ConnectionConfiguration.imqAddressList, "localhost:7676,broker2:5000,broker3:9999");
When specifying the host name portion of a broker, you can use a literal IPv4 or IPv6 address instead of a host name. If you use a literal IPv6 address, its format must conform to RFC2732, Format for Literal IPv6 Addresses in URL's.
Of course, you can also set any other configuration properties your application may require. See "Connection Factory Attributes" in Oracle GlassFish Server Message Queue Administration Guide for a list of the available connection factory attributes.
You can now proceed to use the connection factory to create connections to the message service, as described in the next section.
Once you have obtained a connection factory, you can use it to create a connection to the message service. The factory's createConnection
method takes a user name and password as arguments:
Connection myConnection = myFactory.createConnection("mithrandir", "mellon");
Before granting the connection, Message Queue authenticates the user name and password by looking them up in its user repository. As a convenience for developers who do not wish to go to the trouble of populating a user repository during application development and testing, there is also a parameterless form of the createConnection
method:
Connection myConnection = myFactory.createConnection();
This creates a connection configured for the default user identity, with both user name and password set to guest
.
This unified-domain createConnection
method is part of the generic JMS ConnectionFactory
interface, defined in package javax.jms
; the Message Queue version in com.sun.messaging
adds corresponding methods createQueueConnection
and createTopicConnection
for use specifically with the point-to-point and publish/subscribe domains.
The following table shows the methods defined in the Connection
interface.
Name | Description |
---|---|
|
Create session |
|
Set client identifier |
|
Get client identifier |
|
Set event listener for connection events |
|
Set exception listener |
|
Get exception listener |
|
Get metadata for connection |
|
Create connection consumer |
|
Create durable connection consumer |
|
Start incoming message delivery |
|
Stop incoming message delivery |
|
Close connection |
The main purpose of a connection is to create sessions for exchanging messages with the message service:
myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
The first argument to createSession
is a boolean indicating whether the session is transacted; the second specifies its acknowledgment mode. Possible values for this second argument are AUTO_ACKNOWLEDGE
, CLIENT_ACKNOWLEDGE
, and DUPS_OK_ACKNOWLEDGE
, all defined as static constants in the standard JMS Session
interface, javax.jms.Session
; the extended Message Queue version of the interface, com.sun.messaging.jms.Session
, adds another such constant, NO_ACKNOWLEDGE
. See Acknowledgment Modes and Transacted Sessions for further discussion.
If your client application will be using the publish/subscribe domain to create durable topic subscriptions, it must have a client identifier to identify itself to the message service. In general, the most convenient arrangement is to configure the client runtime to provide a unique client identifier automatically for each client. However, the Connection
interface also provides a method, setClientID
, for setting a client identifier explicitly, and a corresponding getClientID
method for retrieving its value. See Assigning Client Identifiers in this guide and "Client Identifier" in Oracle GlassFish Server Message Queue Administration Guide for more information.
You should also use the setExceptionListener
method to register an exception listener for the connection. This is an object implementing the JMS ExceptionListener
interface, which consists of the single method onException
:
void onException (JMSException exception)
In the event of a problem with the connection, the message broker will call this method, passing an exception object identifying the nature of the problem.
A connection's getMetaData
method returns a ConnectionMetaData
object, which in turn provides methods for obtaining various items of information about the connection, such as its JMS version and the name and version of the JMS provider.
The createConnectionConsumer
and createDurableConnectionConsumer
methods (as well as the session methods setMessageListener
and getMessageListener
, listed in Table 2-3) are used for concurrent message consumption; see the Java Message Service Specification for more information.
In order to receive incoming messages, you must 7start the connection by calling its start
method:
myConnection.start();
It is important not to do this until after you have created any message consumers you will be using to receive messages on the connection. Starting the connection before creating the consumers risks missing some incoming messages before the consumers are ready to receive them. It is not necessary to start the connection in order to send outgoing messages.
If for any reason you need to suspend the flow of incoming messages, you can do so by calling the connection's stop
method:
myConnection.stop();
To resume delivery of incoming messages, call the start
method again.
Finally, when you are through with a connection, you should close it to release any resources associated with it:
myConnection.close();
This automatically closes all sessions, message producers, and message consumers associated with the connection and deletes any temporary destinations. All pending message receives are terminated and any transactions in progress are rolled back. Closing a connection does not force an acknowledgment of client-acknowledged sessions.
A connection service that is based on the Transport Layer Security (TLS/SSL) standard is used to authenticate and encrypt messages sent between the client and the broker. This section describes what the client needs to do to use TLS/SSL connections. A user can also establish a secure connection by way of an HTTPS tunnel servlet. For information on setting up secure connections over HTTP, see "HTTP/HTTPS Support" in Oracle GlassFish Server Message Queue Administration Guide.
Some of the work needed to set up a TLS/SSL connection is done by an administrator. This section summarizes these steps. For complete information about the administrative work required, please see "Message Encryption" in Oracle GlassFish Server Message Queue Administration Guide.
To set up a secure connection service, you must do the following.
Generate a self-signed or signed certificate for the broker (administrator).
Enable the ssljms
connection service in the broker (administrator).
Start the broker (administrator).
Configure and run the client as explained below.
To configure a client to use a TLS/SSL connection you must do the following.
If your client is not using J2SDK 1.4 (which has JSSE and JNDI support built in), make sure the client has the following files in its class path:jsse.jar
, jnet.jar
, jcert, jar
, jndi.jar
.
Make sure the client has the following Message Queue files in its class path: imq.jar
, jms.jar
.
If the client is not willing to trust the broker's self-signed certificate, set the imqSSLIsHostTrusted
attribute to false for the connection factory from which you get the TLS/SSL connection.
Connect to the broker's ssljms
service. There are two ways to do this. The first is to specify the service name ssljms
in the address for the broker when you provide a value for the imqAddressList
attribute of the connection factory from which you obtain the connection. When you run the client, it will be connected to the broker by a TLS/SSLconnection. The second is to specify the following directive when you run the command that starts the client.
java -DimqConnectionType=TLS clientAppName
All Message Queue messages travel from a message producer to a message consumer by way of a destination on a message broker. Message delivery is thus a two-stage process: the message is first delivered from the producer to the destination and later from the destination to the consumer. Physical destinations on the broker are created administratively by a Message Queue administrator, using the administration tools described in "Configuring and Managing Physical Destinations" in Oracle GlassFish Server Message Queue Administration Guide. The broker provides routing and delivery services for messages sent to such a destination.
As described earlier under Messaging Domains, Message Queue supports two types of destination, depending on the messaging domain being used:
Queues (point-to-point domain)
Topics (publish/subscribe domain)
These two types of destination are represented by the Message Queue classes Queue
and Topic
, respectively. These, in turn, are both subclasses of the generic class Destination
, part of the unified messaging domain that subsumes both the point-to-point and publish-subscribe domains. A client program that uses the Destination
superclass can thus handle both queue and topic destinations indiscriminately.
Because JMS providers differ in their destination addressing conventions, Message Queue does not define a standard address syntax for obtaining access to a destination. Rather, the destination is typically placed in a publicly available administered object store by a Message Queue administrator and accessed by the client using a JNDI lookup in a manner similar to that described earlier for connection factories (see Looking Up a Connection Factory With JNDI).
Example 2-3 shows how to look up a destination object in the JNDI object store.
Note:
If a Message Queue client is a J2EE component, JNDI resources are provided by the J2EE container. In such cases, JNDI lookup code may differ from that shown here; see your J2EE provider documentation for details.
Example 2-3 Looking Up a Destination
// Create the environment for constructing the initial JNDI naming context. Hashtable env = new Hashtable(); // Store the environment attributes that tell JNDI which initial // context factory to use and where to find the provider. env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects"); // Create the initial context. Context ctx = new InitialContext(env); // Look up the destination object in the JNDI object store. String DEST_LOOKUP_NAME = "MyDest"; Destination MyDest = (Destination) ctx.lookup(DEST_LOOKUP_NAME);
The following section explains the program statements in Example 2-3.
Create the environment for constructing the initial JNDI naming context.
How you create the initial context depends on whether you are using a file-system object store or a Lightweight Directory Access Protocol (LDAP) server for your Message Queue administered objects. The code shown here assumes a file-system store; for information about the corresponding LDAP object store attributes, see "LDAP Server Object Stores" in Oracle GlassFish Server Message Queue Administration Guide.
The constructor for the initial context accepts an environment parameter, a hash table whose entries specify the attributes for creating the context:
Hashtable env = new Hashtable();
You can also set an environment by specifying system properties on the command line, rather than programmatically. For instructions, see the README
file in the JMS example applications directory.
Store the environment attributes that tell JNDI which initial context factory to use and where to find the JMS provider.
The names of these attributes are defined as static constants in class Context
:
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects");
Note:
The directory represented by C:/
imq_admin_objects must already exist; if necessary, you must create the directory before referencing it in your code.
Create the initial context.
Context ctx = new InitialContext(env);
If you use system properties to set the environment, omit the environment parameter when creating the context:
Context ctx = new InitialContext();
Look up the destination object in the administered object store and typecast it to the appropriate class:
String DEST_LOOKUP_NAME = "MyDest"; Destination MyDest = (Destination) ctx.lookup(DEST_LOOKUP_NAME);
The lookup name you use, DEST_LOOKUP_NAME
, must match the name used when the object was stored. Note that the actual destination object returned from the object store will always be either a (point-to-point) queue or a (publish/subscribe) topic, but that either can be assigned to a variable of the generic unified-domain class Destination
.
Note:
For topic destinations, a symbolic lookup name that includes wildcard characters can be used as the lookup string. See "Supported Topic Destination Names" in Oracle GlassFish Server Message Queue Administration Guide.
You can now proceed to send and receive messages using the destination, as described under Sending Messages and Receiving Messages.
As with connection factories, you may sometimes find it more convenient to dispense with JNDI lookup and simply create your own queue or topic destination objects by direct instantiation. Although a variable of type Destination
can accept objects of either class, you cannot directly instantiate a Destination
object; the object must always belong to one of the specific classes Queue
or Topic
. The constructors for both of these classes accept a string argument specifying the name of the physical destination to which the object corresponds:
Destination myDest = new com.sun.messaging.Queue("myDest");
Note, however, that this only creates a Java object representing the destination; it does not actually create a physical destination on the message broker. The physical destination itself must still be created by a Message Queue administrator, with the same name you pass to the constructor when instantiating the object.
Note:
Destination names beginning with the letters mq
are reserved and should not be used by client programs.
Also, for topic destinations, a symbolic lookup name that includes wildcard characters can be used as the lookup string. See "Supported Topic Destination Names" in Oracle GlassFish Server Message Queue Administration Guide.
Unlike connection factories, destinations have a much more limited set of configuration properties. In fact, only two such properties are defined in the Message Queue class DestinationConfiguration
: the name of the physical destination itself (imqDestinationName
) and an optional descriptive string (imqDestinationDescription
). Since the latter property is rarely used and the physical destination name can be supplied directly as an argument to the Queue
or Topic
constructor as shown above, there normally is no need (as there often is with a connection factory) to specify additional properties with the object's setProperty
method. Hence the variable to which you assign the destination object (myDest
in the example above) need not be typed with the Message Queue class com.sun.messaging.Destination
; the standard JMS interface javax.jms.Destination
(which the Message Queue class implements) is sufficient. If you have imported the full JMS package javax.jms.*
, you can simply declare the variable with the unqualified name Destination
, as above, rather than with something like
com.sun.messaging.Destination myDest = new com.sun.messaging.Queue("myDest");
as shown earlier for connection factories.
A temporary destination is one that exists only for the duration of the connection that created it. You may sometimes find it convenient to create such a destination to use, for example, as a reply destination for messages you send. Temporary destinations are created with the session method createTemporaryQueue
or createTemporaryTopic
(see Working With Sessions below): for example,
TemporaryQueue tempQueue = mySession.createTemporaryQueue();
Although the temporary destination is created by a particular session, its scope is actually the entire connection to which that session belongs. Any of the connection's sessions (not just the one that created the temporary destination) can create a message consumer for the destination and receive messages from it. The temporary destination is automatically deleted when its connection is closed, or you can delete it explicitly by calling its delete
method:
tempQueue.delete();
A session is a single-threaded context for producing and consuming messages. You can create multiple message producers and consumers for a single session, but you are restricted to using them serially, in a single logical thread of control.
Table 2-3 shows the methods defined in the Session
interface; they are discussed in the relevant sections below.
Name | Description |
---|---|
|
Create message producer |
|
Create message consumer |
|
Create durable subscriber for topic |
|
Delete durable subscription to topic |
|
Create null message |
|
Create text message |
|
Create stream message |
|
Create map message |
|
Create object message |
|
Create bytes message |
|
Create queue destination |
|
Create topic destination |
|
Create temporary queue |
|
Create temporary topic |
|
Create message browser |
|
Set distinguished message listener |
|
Get distinguished message listener |
|
Get session's acknowledgment mode |
|
Is session transacted? |
|
Commit transaction |
|
Roll back transaction |
|
Recover unacknowledged messages |
|
Close session |
Every session exists within the context of a particular connection. The number of sessions you can create for a single connection is limited only by system resources. As described earlier (see Using Connections), you use the connection's createSession
method to create a session:
Session mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
The first (boolean) argument specifies whether the session is transacted; see Transacted Sessions for further discussion. The second argument is an integer constant representing the session's acknowledgment mode, as described in the next section.
A session's acknowledgment mode determines the way your application handles the exchange of acknowledgment information when receiving messages from a broker. The JMS specification defines three possible acknowledgment modes:
In auto-acknowledge mode, the Message Queue client runtime immediately sends a client acknowledgment for each message it delivers to the message consumer; it then blocks waiting for a return broker acknowledgment confirming that the broker has received the client acknowledgment. This acknowledgment "handshake" between client and broker is handled automatically by the client runtime, with no need for explicit action on your part.
In client-acknowledge mode, your client application must explicitly acknowledge the receipt of all messages. This allows you to defer acknowledgment until after you have finished processing the message, ensuring that the broker will not delete it from persistent storage before processing is complete. You can either acknowledge each message individually or batch multiple messages and acknowledge them all at once; the client acknowledgment you send to the broker applies to all messages received since the previous acknowledgment. In either case, as in auto-acknowledge mode, the session thread blocks after sending the client acknowledgment, waiting for a broker acknowledgment in return to confirm that your client acknowledgment has been received.
In dups-OK-acknowledge mode, the session automatically sends a client acknowledgment each time it has received a fixed number of messages, or when a fixed time interval has elapsed since the last acknowledgment was sent. (This fixed batch size and timeout interval are currently 10 messages and 7 seconds, respectively, and are not configurable by the client.) Unlike the first two modes described above, the broker does not acknowledge receipt of the client acknowledgment, and the session thread does not block awaiting such return acknowledgment from the broker. This means that you have no way to confirm that your acknowledgment has been received; if it is lost in transmission, the broker may redeliver the same message more than once. However, because client acknowledgments are batched and the session thread does not block, applications that can tolerate multiple delivery of the same message can achieve higher throughput in this mode than in auto-acknowledge or client-acknowledge mode.
Message Queue extends the JMS specification by adding a fourth acknowledgment mode:
In no-acknowledge mode, your client application does not acknowledge receipt of messages, nor does the broker expect any such acknowledgment. There is thus no guarantee whatsoever that any message sent by the broker has been successfully received. This mode sacrifices all reliability for the sake of maximum throughput of message traffic.
The standard JMS Session
interface, defined in package javax.jms
, defines static constants for the first three acknowledgment modes (AUTO_ACKNOWLEDGE
, CLIENT_ACKNOWLEDGE
, and DUPS_OK_ACKNOWLEDGE
), to be used as arguments to the connection's createSession
method. The constant representing the fourth mode (NO_ACKNOWLEDGE
) is defined in the extended Message Queue version of the interface, in package com.sun.messaging.jms
. The session method getAcknowledgeMode
returns one of these constants:
int ackMode = mySession.getAcknowledgeMode(); switch (ackMode) { case Session.AUTO_ACKNOWLEDGE: /* Code here to handle auto-acknowledge mode */ break; case Session.CLIENT_ACKNOWLEDGE: /* Code here to handle client-acknowledge mode */ break; case Session.DUPS_OK_ACKNOWLEDGE: /* Code here to handle dups-OK-acknowledge mode */ break; case com.sun.messaging.jms.Session.NO_ACKNOWLEDGE: /* Code here to handle no-acknowledge mode */ break; }
Note:
All of the acknowledgment modes discussed above apply to message consumption. For message production, the broker's acknowledgment behavior depends on the message's delivery mode (persistent or nonpersistent; see Message Header). The broker acknowledges the receipt of persistent messages, but not of nonpersistent ones; this behavior is not configurable by the client.
In a transacted session (see next section), the acknowledgment mode is ignored and all acknowledgment processing is handled for you automatically by the Message Queue client runtime. In this case, the getAcknowledgeMode
method returns the special constant Session.
SESSION_TRANSACTED
.
Transactions allow you to group together an entire series of incoming and outgoing messages and treat them as an atomic unit. The message broker tracks the state of the transaction's individual messages, but does not complete their delivery until you commit the transaction. In the event of failure, you can roll back the transaction, canceling all of its messages and restarting the entire series from the beginning.
Transactions always take place within the context of a single session. To use them, you must create a transacted session by passing true
as the first argument to a connection's createSession
method:
Session mySession = myConnection.createSession(true, Session.SESSION_TRANSACTED);
The session's getTransacted
method tests whether it is a transacted session:
if ( mySession.getTransacted() ) { /* Code here to handle transacted session */ } else { /* Code here to handle non-transacted session */ }
A transacted session always has exactly one open transaction, encompassing all messages sent or received since the session was created or the previous transaction was completed. Committing or rolling back a transaction ends that transaction and automatically begins another.
Note:
Because the scope of a transaction is limited to a single session, it is not possible to combine the production and consumption of a message into a single end-to-end transaction. That is, the delivery of a message from a message producer to a destination on the broker cannot be placed in the same transaction with its subsequent delivery from the destination to a consumer.
When all messages in a transaction have been successfully delivered, you call the session's commit
method to commit the transaction:
mySession.commit();
All of the session's incoming messages are acknowledged and all of its outgoing messages are sent. The transaction is then considered complete and a new one is started.
When a send or receive operation fails, an exception is thrown. While it is possible to handle the exception by simply ignoring it or by retrying the operation, it is recommended that you roll back the transaction, using the session's rollback
method:
mySession.rollback();
All of the session's incoming messages are recovered and redelivered, and its outgoing messages are destroyed and must be re-sent.
This section describes how to use the Message Queue Java API to compose, send, receive, and process messages.
A message consists of the following parts:
A header containing identifying and routing information
Optional properties that can be used to convey additional identifying information beyond that contained in the header
A body containing the actual content of the message
The following sections discuss each of these in greater detail.
Every message must have a header containing identifying and routing information. The header consists of a set of standard fields, which are defined in the Java Message Service Specification and summarized in Table 2-4. Some of these are set automatically by Message Queue in the course of producing and delivering a message, some depend on settings specified when a message producer sends a message, and others are set by the client on a message-by-message basis.
Table 2-4 Message Header Fields
Name | Description |
---|---|
|
Message identifier |
|
Destination to which message is sent |
|
Destination to which to reply |
|
Link to related message |
|
Delivery mode (persistent or nonpersistent) |
|
Priority level |
|
Time of transmission |
|
Expiration time |
|
Message type |
|
Has message been delivered before? |
The JMS Message
interface defines methods for setting the value of each header field: for instance,
outMsg.setJMSReplyTo(replyDest);
Table 2-5 lists all of the available header specification methods.
Table 2-5 Message Header Specification Methods
Name | Description |
---|---|
|
Set message identifier |
|
Set destination |
|
Set reply destination |
|
Set correlation identifier from string |
|
Set correlation identifier from byte array |
|
Set delivery mode |
|
Set priority level |
|
Set time stamp |
|
Set expiration time |
|
Set message type |
|
Set redelivered flag |
The message identifier (JMSMessageID
) is a string value uniquely identifying the message, assigned and set by the message broker when the message is sent. Because generating an identifier for each message adds to both the size of the message and the overhead involved in sending it, and because some client applications may not use them, the JMS interface provides a way to suppress the generation of message identifiers, using the message producer method setDisableMessageID
(see Sending Messages).
The JMSDestination
header field holds a Destination
object representing the destination to which the message is directed, set by the message broker when the message is sent. There is also a JMSReplyTo
field that you can set to specify a destination to which reply messages should be directed. Clients sending such a reply message can set its JMSCorrelationID
header field to refer to the message to which they are replying. Typically this field is set to the message identifier string of the message being replied to, but client applications are free to substitute their own correlation conventions instead, using either the setJMSCorrelationID
method (if the field value is a string) or the more general s
etJMSCorrelationIDAsBytes
(if it is not).
The delivery mode (JMSDeliveryMode
) specifies whether the message broker should log the message to stable storage. There are two possible values, PERSISTENT
and NON_PERSISTENT
, both defined as static constants of the JMS interface DeliveryMode
: for example,
outMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
The default delivery mode is PERSISTENT
, represented by the static constant Message.
DEFAULT_DELIVERY_MODE
.
The choice of delivery mode represents a tradeoff between performance and reliability:
In persistent mode, the broker logs the message to stable storage, ensuring that it will not be lost in transit in the event of transmission failure; the message is guaranteed to be delivered exactly once.
In nonpersistent mode, the message is not logged to stable storage; it will be delivered at most once, but may be lost in case of failure and not delivered at all. This mode does, however, improve performance by reducing the broker's message-handling overhead. It may thus be appropriate for applications in which performance is at a premium and reliability is not.
The message's priority level (JMSPriority
) is expressed as an integer from 0
(lowest) to 9
(highest). Priorities from 0
to 4
are considered gradations of normal priority, those from 5
to 9
of expedited priority. The default priority level is 4
, represented by the static constant Message.
DEFAULT_PRIORITY
.
The Message Queue client runtime sets the JMSTimestamp
header field to the time it delivers the message to the broker, expressed as a long integer in standard Java format (milliseconds since midnight, January 1, 1970 UTC). The message's lifetime, specified when the message is sent, is added to this value and the result is stored in the JMSExpiration
header field. (The default lifetime value of 0
, represented by the static constant Message.
DEFAULT_TIME_TO_LIVE
, denotes an unlimited lifetime. In this case, the expiration time is also set to 0
to indicate that the message never expires.) As with the message identifier, client applications that do not use a message's time stamp can improve performance by suppressing its generation with the message producer method setDisableMessageTimestamp
(see Sending Messages).
The header field JMSType
can contain an optional message type identifier string supplied by the client when the message is sent. This field is intended for use with other JMS providers; Message Queue clients can simply ignore it.
When a message already delivered must be delivered again because of a failure, the broker indicates this by setting the JMSRedelivered
flag in the message header to true
. This can happen, for instance, when a session is recovered or a transaction is rolled back. The receiving client can check this flag to avoid duplicate processing of the same message (such as when the message has already been successfully received but the client's acknowledgment was missed by the broker).
See the Java Message Service Specification for a more detailed discussion of all message header fields.
A message property consists of a name string and an associated value, which must be either a string or one of the standard Java primitive data types (int
, byte
, short
, long
, float
, double
, or boolean
). The Message
interface provides methods for setting properties of each type (see Table 2-6). There is also a setObjectProperty
method that accepts a primitive value in objectified form, as a Java object of class Integer
, Byte
, Short
, Long
, Float
, Double
, Boolean
, or String
. The clearProperties
method deletes all properties associated with a message; the message header and body are not affected.
Table 2-6 Message Property Specification Methods
Name | Description |
---|---|
|
Set integer property |
|
Set byte property |
|
Set short integer property |
|
Set long integer property |
|
Set floating-point property |
|
Set double-precision property |
|
Set boolean property |
|
Set string property |
|
Set property from object |
|
Clear properties |
The JMS specification defines certain standard properties, listed in Table 2-7 . By convention, the names of all such standard properties begin with the letters JMSX
; names of this form are reserved and must not be used by a client application for its own custom message properties. Similarly, property names beginning with JMS_SUN
are reserved for provider-specific properties defined by Message Queue itself; these are discussed in Message Queue Clients: Design and Features.
Table 2-7 Standard JMS Message Properties
Name | Description |
---|---|
|
Identity of user sending message |
|
Identity of application sending message |
|
Number of delivery attempts |
|
Identity of message group to which this message belongs |
|
Sequence number within message group |
|
Identifier of transaction within which message was produced |
|
Identifier of transaction within which message was consumed |
|
Time message delivered to consumer |
|
Message state (waiting, ready, expired, or retained) |
The actual content of a message is contained in the message body. JMS defines six classes (or types) of message, each with a different body format:
A text message (interface TextMessage
) contains a Java string.
A stream message (interface StreamMessage
) contains a stream of Java primitive values, written and read sequentially.
A map message (interface MapMessage
) contains a set of name-value pairs, where each name is a string and each value is a Java primitive value. The order of the entries is undefined; they can be accessed randomly by name or enumerated sequentially.
An object message (interface ObjectMessage
) contains a serialized Java object (which may in turn be a collection of other objects).
A bytes message (interface BytesMessage
) contains a stream of uninterpreted bytes.
A null message (interface Message
) consists of a header and properties only, with no message body.
Each of these is a subinterface of the generic Message
interface, extended with additional methods specific to the particular message type.
The JMS Session
interface provides methods for creating each type of message, as shown in Table 2-8. For instance, you can create a text message with a statement such as
TextMessage outMsg = mySession.createTextMessage();
In general, these methods create a message with an empty body; the interfaces for specific message types then provide additional methods for filling the body with content, as described in the sections that follow.
Table 2-8 Session Methods for Message Creation
Name | Description |
---|---|
|
Create null message |
|
Create text message |
|
Create stream message |
|
Create map message |
|
Create object message |
|
Create bytes message |
Note:
Some of the message-creation methods have an overloaded form that allows you to initialize the message body directly at creation: for example,
TextMessage outMsg = mySession.createTextMessage("Hello, World!");
These exceptions are pointed out in the relevant sections below.
Once a message has been delivered to a message consumer, its body is considered read-only; any attempt by the consumer to modify the message body will cause an exception (MessageNotWriteableException
) to be thrown. The consumer can, however, empty the message body and place it in a writeable state by calling the message method clearBody
:
outMsg.clearBody();
This places the message in the same state as if it had been newly created, ready to fill its body with new content.
You create a text message with the session method createTextMessage
. You can either initialize the message body directly at creation time
TextMessage outMsg = mySession.createTextMessage("Hello, World!");
or simply create an empty message and then use its setText
method (see Table 2-9 ) to set its content:
TextMessage outMsg = mySession.createTextMessage(); outMsg.setText("Hello, World!");
The session method createStreamMessage
returns a new, empty stream message. You can then use the methods shown in Table 2-10 to write primitive data values into the message body, similarly to writing to a data stream: for example,
StreamMessage outMsg = mySession.createStreamMessage(); outMsg.writeString("The Meaning of Life"); outMsg.writeInt(42);
Table 2-10 Stream Message Composition Methods
Name | Description |
---|---|
|
Write integer to message stream |
|
Write byte value to message stream |
|
Write byte array to message stream |
|
Write short integer to message stream |
|
Write long integer to message stream |
|
Write floating-point value to message stream |
|
Write double-precision value to message stream |
|
Write boolean value to message stream |
|
Write character to message stream |
|
Write string to message stream |
|
Write value of object to message stream |
|
Reset message stream |
As a convenience for handling values whose types are not known until execution time, the writeObject
method accepts a string or an objectified primitive value of class Integer
, Byte
, Short
, Long
, Float
, Double
, Boolean
, or Character
and writes the corresponding string or primitive value to the message stream: for example, the statements
Integer meaningOfLife = new Integer(42); outMsg.writeObject(meaningOfLife);
are equivalent to
outMsg.writeInt(42);
This method will throw an exception (MessageFormatException
) if the argument given to it is not of class String
or one of the objectified primitive classes.
Once you've written the entire message contents to the stream, the reset
method
outMsg.reset();
puts the message body in read-only mode and repositions the stream to the beginning, ready to read (see Processing Messages). When the message is in this state, any attempt to write to the message stream will throw the exception MessageNotWriteableException
. A call to the clearBody
method (inherited from the superinterface Message
) deletes the entire message body and makes it writeable again.
Table 2-11 shows the methods available in the MapMessage
interface for adding content to the body of a map message. Each of these methods takes two arguments, a name string and a primitive or string value of the appropriate type, and adds the corresponding name-value pair to the message body: for example,
StreamMessage outMsg = mySession.createMapMessage(); outMsg.setInt("The Meaning of Life", 42);
Table 2-11 Map Message Composition Methods
Name | Description |
---|---|
|
Store integer in message map by name |
|
Store byte value in message map by name |
|
Store byte array in message map by name |
|
Store short integer in message map by name |
|
Store long integer in message map by name |
|
Store floating-point value in message map by name |
|
Store double-precision value in message map by name |
|
Store boolean value in message map by name |
|
Store character in message map by name |
|
Store string in message map by name |
|
Store object in message map by name |
Like stream messages, map messages provide a convenience method (setObject
) for dealing with values whose type is determined dynamically at execution time: for example, the statements
Integer meaningOfLife = new Integer(42); outMsg.setObject("The Meaning of Life", meaningOfLife);
are equivalent to
outMsg.setInt("The Meaning of Life", 42);
The object supplied must be either a string object (class String
) or an objectified primitive value of class Integer
, Byte
, Short
, Long
, Float
, Double
, Boolean
, or Character
; otherwise an exception (MessageFormatException
) will be thrown.
The ObjectMessage
interface provides just one method, setObject
(Table 2-12 ), for setting the body of an object message:
ObjectMessage outMsg = mySession.createObjectMessage(); outMsg.setObject(bodyObject);
The argument to this method can be any serializable object (that is, an instance of any class that implements the standard Java interface Serializable
). If the object is not serializable, the exception MessageFormatException
will be thrown.
Table 2-12 Object Message Composition Method
Name | Description |
---|---|
|
Serialize object to message body |
As an alternative, you can initialize the message body directly when you create the message, by passing an object to the session method createObjectMessage
:
ObjectMessage outMsg = mySession.createObjectMessage(bodyObject);
Again, an exception will be thrown if the object is not serializable.
The body of a bytes message simply consists of a stream of uninterpreted bytes; its interpretation is entirely a matter of agreement between sender and receiver. This type of message is intended primarily for encoding message formats required by other existing message systems; Message Queue clients should generally use one of the other, more specific message types instead.
Composing a bytes message is similar to composing a stream message (see Composing Stream Messages). You create the message with the session method createBytesMessage
, then use the methods shown in Table 2-13 to encode primitive values into the message's byte stream: for example,
BytesMessage outMsg = mySession.createBytesMessage(); outMsg.writeUTF("The Meaning of Life"); outMsg.writeInt(42);
Table 2-13 Bytes Message Composition Methods
Name | Description |
---|---|
|
Write integer to message stream |
|
Write byte value to message stream |
|
Write byte array to message stream |
|
Write short integer to message stream |
|
Write long integer to message stream |
|
Write floating-point value to message stream |
|
Write double-precision value to message stream |
|
Write boolean value to message stream |
|
Write character to message stream |
|
Write UTF-8 string to message stream |
|
Write value of object to message stream |
|
Reset message stream |
As with stream and map messages, you can use the generic object-based method writeObject
to handle values whose type is unknown at compilation time: for example, the statements
Integer meaningOfLife = new Integer(42); outMsg.writeObject(meaningOfLife);
are equivalent to
outMsg.writeInt(42);
The message's reset
method
outMsg.reset();
puts the message body in read-only mode and repositions the byte stream to the beginning, ready to read (see Processing Messages). Attempting to write further content to a message in this state will cause an exception (MessageNotWriteableException
). The inherited Message
method clearBody
can be used to delete the entire message body and make it writeable again.
In order to send messages to a message broker, you must create a message producer using the session method createProducer
:
MessageProducer myProducer = mySession.createProducer(myDest);
The scope of the message producer is limited to the session that created it and the connection to which that session belongs. Table 2-14 shows the methods defined in the MessageProducer
interface.
Table 2-14 Message Producer Methods
Name | Description |
---|---|
|
Get default destination |
|
Set default delivery mode |
|
Get default delivery mode |
|
Set default priority level |
|
Get default priority level |
|
Set default message lifetime |
|
Get default message lifetime |
|
Set message identifier disable flag |
|
Get message identifier disable flag |
|
Set time stamp disable flag |
|
Get time stamp disable flag |
|
Send message |
|
Close message producer |
The createProducer
method takes a destination as an argument, which may be either a (point-to-point) queue or a (publish/subscribe) topic. The producer will then send all of its messages to the specified destination. If the destination is a queue, the producer is called a sender for that queue; if it is a topic, the producer is a publisher to that topic. The message producer's getDestination
method returns this destination.
You also have the option of leaving the destination unspecified when you create a producer
MessageProducer myProducer = mySession.createProducer(null);
in which case you must specify an explicit destination for each message. This option is typically used for producers that must send messages to a variety of destinations, such as those designated in the JMSReplyTo
header fields of incoming messages (see Message Header).
Note:
The generic MessageProducer
interface also has specialized subinterfaces, QueueSender
and TopicPublisher
, for sending messages specifically to a point-to-point queue or a publish/subscribe topic. These types of producer are created by the createSender
and createPublisher
methods of the specialized session subinterfaces QueueSession
and TopicSession
, respectively. However, it is generally more convenient (and recommended) to use the generic form of message producer described here, which can handle both types of destination indiscriminately.
A producer has a default delivery mode (persistent or nonpersistent), priority level, and message lifetime, which it will apply to all messages it sends unless explicitly overridden for an individual message. You can set these properties with the message producer methods setDeliveryMode
, setPriority
, and setTimeToLive
, and retrieve them with getDeliveryMode
, getPriority
, and getTimeToLive
. If you don't set them explicitly, they default to persistent delivery, priority level 4
, and a lifetime value of 0
, denoting an unlimited message lifetime.
The heart of the message producer interface is the send
method, which is available in a variety of overloaded forms. The simplest of these just takes a message as its only argument:
myProducer.send(outMsg);
This sends the specified message to the producer's default destination, using the producer's default delivery mode, priority, and message lifetime. Alternatively, you can explicitly specify the destination
myProducer.send(myDest, outMsg);
or the delivery mode, priority, and lifetime in milliseconds
myProducer.send(outMsg, DeliveryMode.NON_PERSISTENT, 9, 1000);
or all of these at once:
myProducer.send(myDest, outMsg, DeliveryMode.NON_PERSISTENT, 9, 1000);
Recall that if you did not specify a destination when creating the message producer, you must provide an explicit destination for each message you send.
As discussed earlier under Message Header, client applications that have no need for the message identifier and time stamp fields in the message header can gain some performance improvement by suppressing the generation of these fields, using the message producer's setDisableMessageID
and setdisableMessageTimestamp
methods. Note that a true
value for either of these flags disables the generation of the corresponding header field, while a false
value enables it. Both flags are set to false
by default, meaning that the broker will generate the values of these header fields unless explicitly instructed otherwise.
When you are finished using a message producer, you should call its close
method
myProducer.close();
allowing the broker and client runtime to release any resources they may have allocated on the producer's behalf.
Messages are received by a message consumer, within the context of a connection and a session. Once you have created a consumer, you can use it to receive messages in either of two ways:
In synchronous message consumption, you explicitly request the delivery of messages when you are ready to receive them.
In asynchronous message consumption, you register a message listener for the consumer. The Message Queue client runtime then calls the listener whenever it has a message to deliver.
These two forms of message consumption are described in the sections Receiving Messages Synchronously and Receiving Messages Asynchronously.
The session method createConsumer
creates a generic consumer that can be used to receive messages from either a (point-to-point) queue or a (publish/subscribe) topic:
MessageConsumer myConsumer = mySession.createConsumer(myDest);
If the destination is a queue, the consumer is called a receiver for that queue; if it is a topic, the consumer is a subscriber to that topic.
Note:
The generic MessageConsumer
interface also has specialized subinterfaces, QueueReceiver
and TopicSubscriber
, for receiving messages specifically from a point-to-point queue or a publish/subscribe topic. These types of consumer are created by the createReceiver
and createSubscriber
methods of the specialized session subinterfaces QueueSession
and TopicSession
, respectively. However, it is generally more convenient (and recommended) to use the generic form of message consumer described here, which can handle both types of destination indiscriminately.
A subscriber created for a topic destination with the createConsumer
method is always nondurable, meaning that it will receive only messages that are sent (published)to the topic while the subscriber is active. If you want the broker to retain messages published to a topic while no subscriber is active and deliver them when one becomes active again, you must instead create a durable subscriber, as described in Durable Subscribers.
Table 2-15 shows the methods defined in the MessageConsumer
interface, which are discussed in detail in the relevant sections below.
Table 2-15 Message Consumer Methods
Name | Description |
---|---|
|
Get message selector |
|
Receive message synchronously |
|
Receive message synchronously without blocking |
|
Set message listener for asynchronous reception |
|
Get message listener for asynchronous reception |
|
Close message consumer |
If appropriate, you can restrict the messages a consumer will receive from its destination by supplying a message selector as an argument when you create the consumer:
String mySelector = "/* Text of selector here */"; MessageConsumer myConsumer = mySession.createConsumer(myDest, mySelector);
The selector is a string whose syntax is based on a subset of the SQL92 conditional expression syntax, which allows you to filter the messages you receive based on the values of their properties (see Message Properties). See the Java Message Service Specification for a complete description of this syntax. The message consumer's getMessageSelector
method returns the consumer's selector string (or null
if no selector was specified when the consumer was created):
String mySelector = myConsumer.getMessageSelector();
Note:
Messages whose properties do not satisfy the consumer's selector will be retained undelivered by the destination until they are retrieved by another message consumer. The use of message selectors can thus cause messages to be delivered out of sequence from the order in which they were originally produced. Only a message consumer without a selector is guaranteed to receive messages in their original order.
In some cases, the same connection may both publish and subscribe to the same topic destination. The createConsumer
method accepts an optional boolean argument that suppresses the delivery of messages published by the consumer's own connection:
String mySelector = "/* Text of selector here */"; MessageConsumer myConsumer = mySession.createConsumer(myDest, mySelector, true);
The resulting consumer will receive only messages published by a different connection.
To receive messages delivered to a publish/subscribe topic while no message consumer is active, you must ask the message broker to create a durable subscriber for that topic. All sessions that create such subscribers for a given topic must have the same client identifier (see Using Connections ). When you create a durable subscriber, you supply a subscriber name that must be unique for that client identifier:
MessageConsumer myConsumer = mySession.createDurableSubscriber(myDest, "mySub");
(The object returned by the createDurableSubscriber
method is actually typed as TopicSubscriber
, but since that is a subinterface of MessageConsumer
, you can safely assign it to a MessageConsumer
variable. Note, however, that the destination myDest
must be a publish/subscribe topic and not a point-to-point queue.)
You can think of a durable subscriber as a "virtual message consumer" for the specified topic, identified by the unique combination of a client identifier and subscriber name. When a message arrives for the topic and no message consumer is currently active for it, the message will be retained for later delivery. Whenever you create a consumer with the given client identifier and subscriber name, it will be considered to represent this same durable subscriber and will receive all of the accumulated messages that have arrived for the topic in the subscriber's absence. Each message is retained until it is delivered to (and acknowledged by) such a consumer or until it expires.
Note:
Only one session at a time can have an active consumer for a given durable subscription. If another such consumer already exists, the createDurableSubscriber
method will throw an exception.
Like the createConsumer
method described in the preceding section (which creates nondurable subscribers), createDurableSubscriber
can accept an optional message selector string and a boolean argument telling whether to suppress the delivery of messages published by the subscriber's own connection:
String mySelector = "/* Text of selector here */"; MessageConsumer myConsumer = mySession.createDurableSubscriber(myDest, "mySub", mySelector, true);
You can change the terms of a durable subscription by creating a new subscriber with the same client identifier and subscription name but with a different topic, selector, or both. The effect is as if the old subscription were destroyed and a new one created with the same name. When you no longer need a durable subscription, you can destroy it with the session method unsubscribe
:
mySession.unsubscribe("mySub");
Once you have created a message consumer for a session, using either the createConsumer
orcreateDurableSubscriber
method, you must start the session's connection to begin the flow of incoming messages:
myConnection.start();
(Note that it is not necessary to start a connection in order to produce messages, only to consume them.) You can then use the consumer's receive
method to receive messages synchronously from the message broker:
Message inMsg = myConsumer.receive();
This returns the next available message for this consumer. If no message is immediately available, the receive
method blocks until one arrives. You can also provide a timeout interval in milliseconds:
Message inMsg = myConsumer.receive(1000);
In this case, if no message arrives before the specified timeout interval (1 second in the example) expires, the method will return with a null result. An alternative method, receiveNoWait
, returns a null result immediately if no message is currently available:
Message inMsg = myConsumer.receiveNoWait();
If you want your message consumer to receive incoming messages asynchronously, you must create a message listener to process the messages. This is a Java object that implements the JMS MessageListener
interface. The procedure is as follows:
Follow this procedure:
Define a message listener class implementing the MessageListener
interface.
The interface consists of the single method onMessage
, which accepts a message as a parameter and processes it in whatever way is appropriate for your application:
public class MyMessageListener implements MessageListener { public void onMessage (Message inMsg) { /* Code here to process message */ } }
Create a message consumer.
You can use either the createConsumer
or createDurableSubscriber
method of the session in which the consumer will operate: for instance,
MessageConsumer myConsumer = mySession.createConsumer(myDest);
Create an instance of your message listener class.
MyMessageListener myListener = new MyMessageListener();
Associate the message listener with your message consumer.
The message consumer method setMessageListener
accepts a message listener object and associates it with the given consumer:
myConsumer.setMessageListener(myListener);
Start the connection to which this consumer's session belongs.
The connection's start
method begins the flow of messages from the message broker to your message consumer:
myConnection.start();
Once the connection is started, the Message Queue client runtime will call your message listener's onMessage
method each time it has a message to deliver to this consumer.
To ensure that no messages are lost before your consumer is ready to receive them, it is important not to start the connection until after you have created the message listener and associated it with the consumer. If the connection is already started, you should stop it before creating an asynchronous consumer, then start it again when the consumer is ready to begin processing.
Setting a consumer's message listener to null
removes any message listener previously associated with it:
myConsumer.setMessageListener(null);
The consumer's getMessageListener
method returns its current message listener (or null
if there is none):
MyMessageListener myListener = myConsumer.getMessageListener();
If you have specified client-acknowledge as your session's acknowledgment mode (see Acknowledgment Modes), it is your client application's responsibility to explicitly acknowledge each message it receives. If you have received the message synchronously, using a message consumer's receive
(or receiveNoWait
) method, you should process the message first and then acknowledge it; if you have received it asynchronously, your message listener's onMessage
method should acknowledge the message after processing it. This ensures that the message broker will not delete the message from persistent storage until processing is complete.
Note:
In a transacted session (see Transacted Sessions), there is no need to acknowledge a message explicitly: the session's acknowledgment mode is ignored and all acknowledgment processing is handled for you automatically by the Message Queue client runtime. In this case, the session's getAcknowledgeMode
method will return the special constant Session.
SESSION_TRANSACTED
.
Table 2-16 shows the methods available for acknowledging a message. The most general is acknowledge
, defined in the standard JMS interface javax.jms.Message
:
inMsg.acknowledge();
This acknowledges all unacknowledged messages consumed by the session up to the time of call. You can use this method to acknowledge each message individually as you receive it, or you can group several messages together and acknowledge them all at once by calling acknowledge
on the last one in the group.
Table 2-16 Message Acknowledgment Methods
Function | Description |
---|---|
|
Acknowledge all unacknowledged messages for session |
|
Acknowledge this message only |
|
Acknowledge all unacknowledged messages through this one |
The Message Queue version of the Message
interface, defined in the package com.sun.messaging.jms
, adds two more methods that provide more flexible control over which messages you acknowledge. The acknowledgeThisMessage
method just acknowledges the single message for which it is called, rather than all messages consumed by the session; acknowledgeUpThroughThisMessage
acknowledges the message for which it is called and all previous messages; messages received after that message remain unacknowledged.
If the destination from which you are consuming messages is a point-to-point queue, you can use a queue browser to examine the messages in the queue without consuming them. The session method createBrowser
creates a browser for a specified queue:
QueueBrowser myBrowser = mySession.createBrowser(myDest);
The method will throw an exception (InvalidDestinationException
) if you try to pass it a topic destination instead of a queue. You can also supply a selector string as an optional second argument:
String mySelector = "/* Text of selector here */"; QueueBrowser myBrowser = mySession.createBrowser(myDest, mySelector);
Table 2-17 shows the methods defined in the QueueBrowser
interface. The getQueue
and getMessageSelector
methods return the browser's queue and selector string, respectively.
Table 2-17 Queue Browser Methods
Name | Description |
---|---|
|
Get queue from which this browser reads |
|
Get message selector |
|
Get enumeration of all messages in the queue |
|
Close browser |
The most important queue browser method is getEnumeration
, which returns a Java enumeration object that you can use to iterate through the messages in the queue, as shown in Example 2-4.
Enumeration queueMessages = myBrowser.getEnumeration(); Message eachMessage; while ( queueMessages.hasMoreElements() ) { eachMessage = queueMessages.nextElement(); /* Do something with the message */ }
The browser's close
method closes it when you're through with it:
myBrowser.close();
As a matter of good programming practice, you should close a message consumer when you have no further need for it. Closing a session or connection automatically closes all consumers associated with it; to close a consumer without closing the session or connection to which it belongs, you can use its close
method:
myConsumer.close();
For a consumer that is a nondurable topic subscriber, this terminates the flow of messages to the consumer. However, if the consumer is a queue receiver or a durable topic subscriber, messages will continue to be accumulated for the destination and will be delivered the next time a consumer for that destination becomes active. To terminate a durable subscription permanently, call its session's unsubscribe
method with the subscriber name as an argument:
mySession.unsubscribe("mySub");
Processing a message after you have received it may entail examining its header fields, properties, and body. The following sections describe how this is done.
The standard JMS message header fields are described in Table 2-4. Table 2-18 shows the methods provided by the JMS Message
interface for retrieving the values of these fields: for instance, you can obtain a message's reply destination with the statement
Destination replyDest = inMsg.getJMSReplyTo();
Table 2-18 Message Header Retrieval Methods
Name | Description |
---|---|
|
Get message identifier |
|
Get destination |
|
Get reply destination |
|
Get correlation identifier as string |
|
Get correlation identifier as byte array |
|
Get delivery mode |
|
Get priority level |
|
Get time stamp |
|
Get expiration time |
|
Get message type |
|
Get redelivered flag |
Table 2-19 lists the methods defined in the JMS Message
interface for retrieving the values of a message's properties (see Message Properties). There is a retrieval method for each of the possible primitive types that a property value can assume: for instance, you can obtain a message's time stamp with the statement
long timeStamp = inMsg.getLongProperty("JMSXRcvTimestamp");
Table 2-19 Message Property Retrieval Methods
Name | Description |
---|---|
|
Get integer property |
|
Get byte property |
|
Get short integer property |
|
Get long integer property |
|
Get floating-point property |
|
Get double-precision property |
|
Get boolean property |
|
Get string property |
|
Get property as object |
|
Get property names |
|
Does property exist? |
There is also a generic getObjectProperty
method that returns a property value in objectified form, as a Java object of class Integer
, Byte
, Short
, Long
, Float
, Double
, Boolean
, or String
. For example, another way to obtain a message's time stamp, equivalent to that shown above, would be
Long timeStampObject = (Long)inMsg.getObjectProperty("JMSXRcvTimestamp"); long timeStamp = timeStampObject.longValue();
If the message has no property with the requested name, getObjectProperty
will return null
; the message method propertyExists
tests whether this is the case.
The getPropertyNames
method returns a Java enumeration object for iterating through all of the property names associated with a given message; you can then use the retrieval methods shown in the table to retrieve each of the properties by name, as shown in Example 2-5.
The methods for retrieving the contents of a message's body essentially parallel those for composing the body, as described earlier under Composing Messages. The following sections describe these methods for each of the possible message types (text, stream, map, object, and bytes).
The text message method getText
(Table 2-20 ) retrieves the contents of a text message's body in the form of a string:
String textBody = inMsg.getText();
Reading the contents of a stream message is similar to reading from a data stream, using the access methods shown in Table 2-21: for example, the statement
int intVal = inMsg.readInt();
retrieves an integer value from the message stream.
Table 2-21 Stream Message Access Methods
Name | Description |
---|---|
|
Read integer from message stream |
|
Read byte value from message stream |
|
Read byte array from message stream |
|
Read short integer from message stream |
|
Read long integer from message stream |
|
Read floating-point value from message stream |
|
Read double-precision value from message stream |
|
Read boolean value from message stream |
|
Read character from message stream |
|
Read string from message stream |
|
Read value from message stream as object |
The readObject
method returns the next value from the message stream in objectified form, as a Java object of the class corresponding to the value's primitive data type: for instance, if the value is of type int
, readObject
returns an object of class Integer
. The following statements are equivalent to the one shown above:
Integer intObject = (Integer) inMsg.readObject(); int intVal = intObject.intValue();
The MapMessage
interface provides the methods shown in Table 2-22 for reading the body of a map message. Each access method takes a name string as an argument and returns the value to which that name is mapped: for instance, under the example shown in Composing Map Messages, the statement
int meaningOfLife = inMsg.getInt("The Meaning of Life");
would set the variable meaningOfLife
to the value 42
.
Table 2-22 Map Message Access Methods
Name | Description |
---|---|
|
Get integer from message map by name |
|
Get byte value from message map by name |
|
Get byte array from message map by name |
|
Get short integer from message map by name |
|
Get long integer from message map by name |
|
Get floating-point value from message map by name |
|
Get double-precision value from message map by name |
|
Get boolean value from message map by name |
|
Get character from message map by name |
|
Get string from message map by name |
|
Get object from message map by name |
|
Does map contain an item with specified name? |
|
Get enumeration of all names in map |
Like stream messages, map messages provide an access method, getObject
, that returns a value from the map in objectified form, as a Java object of the class corresponding to the value's primitive data type: for instance, if the value is of type int
, getObject
returns an object of class Integer
. The following statements are equivalent to the one shown above:
Integer meaningObject = (Integer) inMsg.getObject("The Meaning of Life"); int meaningOfLife = meaningObject.intValue();
The itemExists
method returns a boolean value indicating whether the message map contains an association for a given name string:
if ( inMsg.itemExists("The Meaning of Life") ) { /* Life is meaningful */ } else { /* Life is meaningless */ }
The getMapNames
method returns a Java enumeration object for iterating through all of the names defined in the map; you can then use getObject
to retrieve the corresponding values, as shown in Example 2-6.
The ObjectMessage
interface provides just one method, getObject
(Table 2-23 ), for retrieving the serialized object that is the body of an object message:
Object messageBody = inMsg.getObject();
You can then typecast the result to a more specific class and process it in whatever way is appropriate.
The body of a bytes message simply consists of a stream of uninterpreted bytes; its interpretation is entirely a matter of agreement between sender and receiver. This type of message is intended primarily for decoding message formats used by other existing message systems; Message Queue clients should generally use one of the other, more specific message types instead.
Reading the body of a bytes message is similar to reading a stream message (see Processing Stream Messages): you use the methods shown in Table 2-24 to decode primitive values from the message's byte stream. For example, the statement
int intVal = inMsg.readInt();
retrieves an integer value from the byte stream. The getBodyLength
method returns the length of the entire message body in bytes:
int bodyLength = inMsg.getBodyLength();
Table 2-24 Bytes Message Access Methods
Name | Description |
---|---|
|
Get length of message body in bytes |
|
Read integer from message stream |
|
Read signed byte value from message stream |
|
Read unsigned byte value from message stream |
|
Read byte array from message stream |
|
Read signed short integer from message stream |
|
Read unsigned short integer from message stream |
|
Read long integer from message stream |
|
Read floating-point value from message stream |
|
Read double-precision value from message stream |
|
Read boolean value from message stream |
|
Read character from message stream |
|
Read UTF-8 string from message stream |