The producer template supports a variety of different approaches to invoking
producer endpoints. There are methods that support different formats for the
request message (as an Exchange
object, as a message body, as a
message body with a single header setting, and so on) and there are methods to
support both the synchronous and the asynchronous style of invocation. Overall,
producer template methods can be grouped into the following categories:
The methods for invoking endpoints synchronously have names of the form
send
and
Suffix
()request
. For example,
the methods for invoking an endpoint using either the default message exchange
pattern (MEP) or an explicitly specified MEP are named Suffix
()send()
,
sendBody()
, and sendBodyAndHeader()
(where these
methods respectively send an Exchange
object, a message body, or a
message body and header value). If you want to force the MEP to be
InOut (request/reply semantics), you can call the
request()
, requestBody()
, and
requestBodyAndHeader()
methods instead.
The following example shows how to create a ProducerTemplate
instance and use it to send a message body to the activemq:MyQueue
endpoint. The example also shows how to send a message body and header value
using sendBodyAndHeader()
.
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue template.sendBody("activemq:MyQueue", "<hello>world!</hello>"); // Send with a body and header template.sendBodyAndHeader( "activemq:MyQueue", "<hello>world!</hello>", "CustomerRating", "Gold" );
A special case of synchronous invocation is where you provide the
send()
method with a Processor
argument instead of
an Exchange
argument. In this case, the producer template
implicitly asks the specified endpoint to create an Exchange
instance (typically, but not always having the InOnly MEP
by default). This default exchange is then passed to the processor, which
initializes the contents of the exchange object.
The following example shows how to send an exchange initialized by the
MyProcessor
processor to the activemq:MyQueue
endpoint.
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue, using a processor to initialize template.send("activemq:MyQueue", new MyProcessor());
The MyProcessor
class is implemented as shown in the following
example. In addition to setting the In message body (as
shown here), you could also initialize message heades and exchange
properties.
import org.apache.camel.Processor; import org.apache.camel.Exchange; ... public class MyProcessor implements Processor { public MyProcessor() { } public void process(Exchange ex) { ex.getIn().setBody("<hello>world!</hello>"); } }
The methods for invoking endpoints asynchronously have
names of the form asyncSend
and
Suffix
()asyncRequest
. For
example, the methods for invoking an endpoint using either the default message
exchange pattern (MEP) or an explicitly specified MEP are named
Suffix
()asyncSend()
and asyncSendBody()
(where these
methods respectively send an Exchange
object or a message body). If
you want to force the MEP to be InOut (request/reply
semantics), you can call the asyncRequestBody()
,
asyncRequestBodyAndHeader()
, and
asyncRequestBodyAndHeaders()
methods instead.
The following example shows how to send an exchange asynchronously to the
direct:start
endpoint. The asyncSend()
method
returns a java.util.concurrent.Future
object, which is used to
retrieve the invocation result at a later time.
import java.util.concurrent.Future; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; ... Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncSend("direct:start", exchange); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the resulting exchange from the Future Exchange result = future.get();
The producer template also provides methods to send a message body
asynchronously (for example, using asyncSendBody()
or
asyncRequestBody()
). In this case, you can use one of the
following helper methods to extract the returned message body from the
Future
object:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
The first version of the extractFutureBody()
method blocks until
the invocation completes and the reply message is available. The second version
of the extractFutureBody()
method allows you to specify a timeout.
Both methods have a type argument, type
, which casts the returned
message body to the specified type using a built-in type converter.
The following example shows how to use the asyncRequestBody()
method to send a message body to the direct:start
endpoint. The
blocking extractFutureBody()
method is then used to retrieve the
reply message body from the Future
object.
Future<Object> future = template.asyncRequestBody("direct:start", "Hello"); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the reply message body as a String type String result = template.extractFutureBody(future, String.class);
In the preceding asynchronous examples, the request message is dispatched in a
sub-thread, while the reply is retrieved and processed by the main thread. The
producer template also gives you the option, however, of processing replies in
the sub-thread, using one of the asyncCallback()
,
asyncCallbackSendBody()
, or
asyncCallbackRequestBody()
methods. In this case, you supply a
callback object (of org.apache.camel.impl.SynchronizationAdapter
type), which automatically gets invoked in the sub-thread as soon as a reply
message arrives.
The Synchronization
callback interface is defined as
follows:
package org.apache.camel.spi;
import org.apache.camel.Exchange;
public interface Synchronization {
void onComplete(Exchange exchange);
void onFailure(Exchange exchange);
}
Where the onComplete()
method is called on receipt of a normal
reply and the onFailure()
method is called on receipt of a fault
message reply. Only one of these methods gets called back, so you must override
both of them to ensure that all types of reply are processed.
The following example shows how to send an exchange to the
direct:start
endpoint, where the reply message is processed in
the sub-thread by the SynchronizationAdapter
callback
object.
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronizationAdapter; ... Exchange exchange = context.getEndpoint("direct:start").createExchange(); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); } });
Where the SynchronizationAdapter
class is a default
implementation of the Synchronization
interface, which you can
override to provide your own definitions of the onComplete()
and
onFailure()
callback methods.
You still have the option of accessing the reply from the main thread, because
the asyncCallback()
method also returns a Future
object—for example:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);