In this section, we describe the Ice implementation of AMI and how to use it. We begin by discussing a way to (partially) simulate AMI using oneway invocations. This is not a technique that we recommend, but it is an informative exercise that highlights the benefits of AMI and illustrates how it works. Next, we explain the AMI mapping and illustrate its use with examples.
As we discussed at the beginning of the chapter, synchronous invocations are not appropriate for certain types of applications. For example, an application with a graphical user interface typically must avoid blocking the window system’s event dispatch thread because blocking makes the application unresponsive to user commands. In this situation, making a synchronous remote invocation is asking for trouble.
The application could avoid this situation using oneway invocations (see Section 28.13), which by definition cannot return a value or have any
out parameters. Since the Ice run time does not expect a reply, the invocation blocks only as long as it takes to marshal and copy the message into the local transport buffer. However, the use of oneway invocations may require unacceptable changes to the interface definitions. For example, a twoway invocation that returns results or raises user exceptions must be converted into at least two operations: one for the client to invoke with oneway semantics that contains only in parameters, and one (or more) for the server to invoke to notify the client of the results.
interface I {
int op(string s, out long l);
};
In its current form, the operation op is not suitable for a oneway invocation because it has an
out parameter and a non-
void return type. In order to accommodate a oneway invocation of
op, we can change the Slice definitions as shown below:
interface ICallback {
void opResults(int result, long l);
};
interface I {
void op(ICallback* cb, string s);
};
•
We added interface ICallback, containing an operation
opResults whose arguments represent the results of the original twoway operation. The server invokes this operation to notify the client of the completion of the operation.
•
We modified I::op to be compliant with oneway semantics: it now has a
void return type, and takes only in parameters.
As you can see, we have made significant changes to our interface definitions to accommodate the implementation requirements of the client. One ramification of these changes is that the client must now also be a server, because it must create an instance of
ICallback and register it with an object adapter in order to receive notifications of completed operations.
A more severe ramification, however, is the impact these changes have on the type system, and therefore on the server. Whether a client invokes an operation synchronously or asynchronously should be irrelevant to the server; this is an artifact of behavior that should have no impact on the type system. By changing the type system as shown above, we have tightly coupled the server to the client, and eliminated the ability for
op to be invoked synchronously.
To make matters even worse, consider what would happen if op could raise user exceptions. In this case,
ICallback would have to be expanded with additional operations that allow the server to notify the client of the occurrence of each exception. Since exceptions cannot be used as parameter or member types in Slice, this quickly becomes a difficult endeavor, and the results are likely to be equally difficult to use.
At this point, you will hopefully agree that this technique is flawed in many ways, so why do we bother describing it in such detail? The reason is that the Ice implementation of AMI uses a strategy similar to the one described above, with several important differences:
AMI operations have the same semantics in all of the language mappings that support asynchronous invocations. This section provides a language-independent introduction to the AMI model.
Annotating a Slice operation with the AMI metadata tag does not prevent an application from invoking that operation using the traditional synchronous model. Rather, the presence of the metadata extends the proxy with an asynchronous version of the operation, so that invocations can be made using either model.
The asynchronous operation never blocks the calling thread. If the message cannot be accepted into the local transport buffer without blocking, the Ice run time queues the request and immediately returns control to the calling thread.
The parameters of the asynchronous operation are modified similar to the example from
Section 29.3.1: the first argument is a callback object (described below), followed by any
in parameters in the order of declaration. The operation’s return value and
out parameters, if any, are passed to the callback object when the response is received.
The asynchronous operation only raises CommunicatorDestroyedException directly; all other exceptions are reported to the callback object. See
Section 29.3.9 for more information on error handling.
Finally, the return value of the asynchronous operation is a boolean that indicates whether the Ice run time was able to send the request synchronously; that is, whether the entire message was immediately accepted by the local transport buffer. An application can use this value to implement flow control (see
Section 29.3.6).
The asynchronous operation requires the application to supply a callback object as the first argument. This object is an instance of an application-defined class; in strongly-typed languages this class must inherit from a superclass generated by the Slice compiler. In contrast to the example in
Section 29.3.1, the callback object is a purely local object that is invoked by the Ice run time in the client, and not by the remote server.
The Ice run time always invokes methods of the callback object from a thread in an Ice thread pool, and never from the thread that is invoking the asynchronous operation. Exceptions raised by a callback object are ignored but may cause the Ice run time to log a warning message (see the description of
Ice.Warn.AMICallback in
Appendix C).
The Ice run time invokes ice_response to supply the results of a successful twoway invocation; this method is not invoked for oneway invocations. The arguments to
ice_response consist of the return value (if the operation returns a non-
void type) followed by any
out parameters in the order of declaration.
For an asynchronous invocation, the Ice run time calls ice_response or
ice_exception, but not both. It is possible for one of these methods to be called before control returns to the thread that is invoking the operation.
The ice_sent method is invoked when the entire message has been passed to the local transport buffer. The Ice run time does not invoke
ice_sent if the asynchronous operation returned true to indicate that the message was sent synchronously. An application must make no assumptions about the order of invocations on a callback object;
ice_sent can be called before, after, or concurrently with
ice_response or
ice_exception. Refer to
Section 29.3.6 for more information about the purpose of this method.
1. An abstract callback class whose name is formed using the pattern
AMI_class_op. For example, an operation named
foo defined in interface
I results in a class named
AMI_I_foo. The class is generated in the same scope as the interface or class containing the operation. Two methods must be defined by the subclass:
2.
An additional proxy method, having the mapped name of the operation with the suffix
_async. This method returns a boolean indicating whether the request was sent synchronously. The first parameter is a smart pointer to an instance of the callback class described above. The remaining parameters comprise the
in parameters of the operation, in the order of declaration.
interface I {
["ami"] int foo(short s, out long l);
};
class AMI_I_foo : public ... {
public:
virtual void ice_response(Ice::Int, Ice::Long) = 0;
virtual void ice_exception(const Ice::Exception&) = 0;
};
typedef IceUtil::Handle<AMI_I_foo> AMI_I_fooPtr;
bool foo_async(const AMI_I_fooPtr&, Ice::Short);
Section 29.3.2 describes proxy methods and callback objects in greater detail.
1. An abstract callback class whose name is formed using the pattern
AMI_class_op. For example, an operation named
foo defined in interface
I results in a class named
AMI_I_foo. The class is generated in the same scope as the interface or class containing the operation. Three methods must be defined by the subclass:
public void ice_response(<params>);
public void ice_exception(Ice.LocalException ex);
public void ice_exception(Ice.UserException ex);
2.
An additional proxy method, having the mapped name of the operation with the suffix
_async. This method returns a boolean indicating whether the request was sent synchronously. The first parameter is a reference to an instance of the callback class described above. The remaining parameters comprise the
in parameters of the operation, in the order of declaration.
interface I {
["ami"] int foo(short s, out long l);
};
public abstract class AMI_I_foo extends ... {
public abstract void ice_response(int __ret, long l);
public abstract void ice_exception(Ice.LocalException ex);
public abstract void ice_exception(Ice.UserException ex);
}
public boolean foo_async(AMI_I_foo __cb, short s);
public boolean foo_async(AMI_I_foo __cb, short s,
java.util.Map<String, String> __ctx);
Section 29.3.2 describes proxy methods and callback objects in greater detail.
1. An abstract callback class whose name is formed using the pattern
AMI_class_op. For example, an operation named
foo defined in interface
I results in a class named
AMI_I_foo. The class is generated in the same scope as the interface or class containing the operation. Two methods must be defined by the subclass:
2.
An additional proxy method, having the mapped name of the operation with the suffix
_async. This method returns a boolean indicating whether the request was sent synchronously. The first parameter is a reference to an instance of the callback class described above. The remaining parameters comprise the in parameters of the operation, in the order of declaration.
interface I {
["ami"] int foo(short s, out long l);
};
public abstract class AMI_I_foo : ...
{
public abstract void ice_response(int __ret, long l);
public abstract void ice_exception(Ice.Exception ex);
}
bool foo_async(AMI_I_foo __cb, short s);
bool foo_async(AMI_I_foo __cb, short s,
Dictionary<string, string> __ctx);
Section 29.3.2 describes proxy methods and callback objects in greater detail.
For each AMI operation, the Python mapping emits an additional proxy method having the mapped name of the operation with the suffix
_async. This method returns a boolean indicating whether the request was sent synchronously. The first parameter is a reference to a callback object; the remaining parameters comprise the
in parameters of the operation, in the order of declaration.
Unlike the mappings for strongly-typed languages, the Python mapping does not generate a callback class for asynchronous operations. In fact, the callback object’s type is irrelevant; the Ice run time simply requires that it define the
ice_response and
ice_exception methods:
interface I {
["ami"] int foo(short s, out long l);
};
class ...
#
# Operation signatures:
#
# def ice_response(self, _result, l)
# def ice_exception(self, ex)
def foo_async(self, __cb, s)
Section 29.3.2 describes proxy methods and callback objects in greater detail.
module Demo {
sequence<float> Row;
sequence<Row> Grid;
exception RangeError {};
interface Model {
["ami"] Grid interpolate(Grid data, float factor)
throws RangeError;
};
};
Given a two-dimensional grid of floating point values and a factor, the interpolate operation returns a new grid of the same size with the values interpolated in some interesting (but unspecified) way. In the sections below, we present C++, Java, C#, and Python clients that invoke
interpolate using AMI.
class AMI_Model_interpolateI : public Demo::AMI_Model_interpolate
{
public:
virtual void ice_response(const Demo::Grid& result)
{
cout << "received the grid" << endl;
// ... postprocessing ...
}
virtual void ice_exception(const Ice::Exception& ex)
{
try {
ex.ice_throw();
} catch (const Demo::RangeError& e) {
cerr << "interpolate failed: range error" << endl;
} catch (const Ice::LocalException& e) {
cerr << "interpolate failed: " << e << endl;
}
}
};
The implementation of ice_response reports a successful result, and
ice_exception displays a diagnostic if an exception occurs.
The code to invoke interpolate is equally straightforward:
Demo::ModelPrx model = ...;
AMI_Model_interpolatePtr cb = new AMI_Model_interpolateI;
Demo::Grid grid;
initializeGrid(grid);
model‑>interpolate_async(cb, grid, 0.5);
After obtaining a proxy for a Model object, the client instantiates a callback object, initializes a grid and invokes the asynchronous version of
interpolate. When the Ice run time receives the response to this request, it invokes the callback object supplied by the client.
class AMI_Model_interpolateI extends Demo.AMI_Model_interpolate {
public void ice_response(float[][] result)
{
System.out.println("received the grid");
// ... postprocessing ...
}
public void ice_exception(Ice.UserException ex)
{
assert(ex instanceof Demo.RangeError);
System.err.println("interpolate failed: range error");
}
public void ice_exception(Ice.LocalException ex)
{
System.err.println("interpolate failed: " + ex);
}
}
The implementation of ice_response reports a successful result, and the
ice_exception methods display a diagnostic if an exception occurs.
The code to invoke interpolate is equally straightforward:
Demo.ModelPrx model = ...;
AMI_Model_interpolate cb = new AMI_Model_interpolateI();
float[][] grid = ...;
initializeGrid(grid);
model.interpolate_async(cb, grid, 0.5);
After obtaining a proxy for a Model object, the client instantiates a callback object, initializes a grid and invokes the asynchronous version of
interpolate. When the Ice run time receives the response to this request, it invokes the callback object supplied by the client.
using System;
class AMI_Model_interpolateI : Demo.AMI_Model_interpolate {
public override void ice_response(float[][] result)
{
Console.WriteLine("received the grid");
// ... postprocessing ...
}
public override void ice_exception(Ice.Exception ex)
{
Console.Error.WriteLine("interpolate failed: " + ex);
}
}
The implementation of ice_response reports a successful result, and the
ice_exception method displays a diagnostic if an exception occurs.
The code to invoke interpolate is equally straightforward:
Demo.ModelPrx model = ...;
AMI_Model_interpolate cb = new AMI_Model_interpolateI();
float[][] grid = ...;
initializeGrid(grid);
model.interpolate_async(cb, grid, 0.5);
class AMI_Model_interpolateI(object):
def ice_response(self, result):
print "received the grid"
# ... postprocessing ...
def ice_exception(self, ex):
try:
raise ex
except Demo.RangeError, e:
print "interpolate failed: range error"
except Ice.LocalException, e:
print "interpolate failed: " + str(e)
The implementation of ice_response reports a successful result, and the
ice_exception method displays a diagnostic if an exception occurs.
The code to invoke interpolate is equally straightforward:
model = ...
cb = AMI_Model_interpolateI()
grid = ...
initializeGrid(grid)
model.interpolate_async(cb, grid, 0.5)
Support for asynchronous invocations in Ice is enabled by the client thread pool (see
Section 28.9), whose threads are primarily responsible for processing reply messages. It is important to understand the concurrency issues associated with asynchronous invocations:
•
Calls to the callback object are always made by threads from an Ice thread pool, therefore synchronization may be necessary if the application might interact with the callback object at the same time as the reply arrives. Furthermore, since the Ice run time never invokes callback methods from the client’s calling thread, the client can safely make AMI invocations while holding a lock without risk of a deadlock.
•
The number of threads in the client thread pool determines the maximum number of simultaneous callbacks possible for asynchronous invocations. The default size of the client thread pool is one, meaning invocations on callback objects are serialized. If the size of the thread pool is increased, the application may require synchronization, and replies can be dispatched out of order. The client thread pool can also be configured to serialize messages received over a connection so that AMI replies from a connection are dispatched in the order they are received (see
Section 28.9.4).
The Ice run time queues asynchronous requests when necessary to avoid blocking the calling thread, but places no upper limit on the number of queued requests or the amount of memory they can consume. To prevent unbounded memory utilization, Ice provides the infrastructure necessary for an application to implement its own flow-control logic.
•
The ice_sent method in the AMI callback object
The return value of the proxy method determines whether the request was queued. If the proxy method returns true, no flow control is necessary because the request was accepted by the local transport buffer and therefore the Ice run time did not need to queue it. In this situation, the Ice run time does not invoke the
ice_sent method on the callback object; the return value of the proxy method is sufficient notification that the request was sent.
If the proxy method returns false, the Ice run time has queued the request. Now the application must decide how to proceed with subsequent invocations:
To indicate its interest in receiving ice_sent invocations, an AMI callback object must also derive from the C++ class
Ice::AMISentCallback:
namespace Ice {
class AMISentCallback {
public:
virtual ~AMISentCallback();
virtual void ice_sent() = 0;
};
}
We can modify the example from Section 29.3.4 to include an
ice_sent callback as shown below:
class AMI_Model_interpolateI :
public Demo::AMI_Model_interpolate,
public Ice::AMISentCallback
{
public:
// ...
virtual void ice_sent()
{
cout << "request sent successfully" << endl;
}
};
To indicate its interest in receiving ice_sent invocations, an AMI callback object must also implement the Java interface
Ice.AMISentCallback:
package Ice;
public interface AMISentCallback {
void ice_sent();
}
We can modify the example from Section 29.3.4 to include an
ice_sent callback as shown below:
class AMI_Model_interpolateI
extends Demo.AMI_Model_interpolate
implements Ice.AMISentCallback {
// ...
public void ice_sent()
{
System.out.println("request sent successfully");
}
}
To indicate its interest in receiving ice_sent invocations, an AMI callback object must also implement the C# interface
Ice.AMISentCallback:
namespace Ice {
public interface AMISentCallback
{
void ice_sent();
}
}
We can modify the example from Section 29.3.4 to include an
ice_sent callback as shown below:
class AMI_Model_interpolateI :
Demo.AMI_Model_interpolate,
Ice.AMISentCallback {
// ...
public void ice_sent()
{
Console.Out.WriteLine("request sent successfully");
}
}
To indicate its interest in receiving ice_sent invocations, an AMI callback object need only define the
ice_sent method.
We can modify the example from Section 29.3.4 to include an
ice_sent callback as shown below:
class AMI_Model_interpolateI(object):
# ...
def ice_sent(self):
print "request sent successfully"
Applications that send batched requests (see Section 28.15) can either flush a batch explicitly or allow the Ice run time to flush automatically. The proxy method
ice_flushBatchRequests performs an immediate flush using the synchronous invocation model and may block the calling thread until the entire message can be sent. Ice also provides an asynchronous version of this method for applications that wish to flush batch requests without the risk of blocking.
The proxy method ice_flushBatchRequests_async initiates an asynchronous flush. Its only argument is a callback object; this object must define an
ice_exception method for receiving a notification if an error occurs before the message is sent.
If the application is interested in flow control (see
Section 29.3.6), the return value of
ice_flushBatchRequests_async is a boolean indicating whether the message was sent synchronously. Furthermore, the callback object can define an
ice_sent method that is invoked when an asynchronous flush completes.
The base proxy class ObjectPrx defines the asynchronous flush operation as shown below:
namespace Ice {
class ObjectPrx : ... {
public:
// ...
bool ice_flushBatchRequests_async(
const Ice::AMI_Object_ice_flushBatchRequestsPtr& cb)
};
}
namespace Ice {
class AMI_Object_ice_flushBatchRequests : ... {
public:
virtual void ice_exception(const Ice::Exception& ex) = 0;
};
}
class MyFlushCallbackI :
public Ice::AMI_Object_ice_flushBatchRequests,
public Ice::AMISentCallback
{
public:
virtual void ice_exception(const Ice::Exception& ex);
virtual void ice_sent();
};
The base proxy class ObjectPrx defines the asynchronous flush operation as shown below:
package Ice;
public class ObjectPrx ... {
// ...
boolean ice_flushBatchRequests_async(
AMI_Object_ice_flushBatchRequests cb);
}
package Ice;
public abstract class AMI_Object_ice_flushBatchRequests ...
{
public abstract void ice_exception(LocalException ex);
}
class MyFlushCallbackI
extends Ice.AMI_Object_ice_flushBatchRequests
implements Ice.AMISentCallback
{
public void ice_exception(Ice.LocalException ex) { ... }
public void ice_sent() { ... }
}
The base proxy class ObjectPrx defines the asynchronous flush operation as shown below:
namespace Ice {
public class ObjectPrx : ... {
// ...
bool ice_flushBatchRequests_async(
AMI_Object_ice_flushBatchRequests cb);
}
}
namespace Ice {
public abstract class AMI_Object_ice_flushBatchRequests ... {
public abstract void ice_exception(Ice.Exception ex);
}
}
class MyFlushCallbackI : Ice.AMI_Object_ice_flushBatchRequests,
Ice.AMISentCallback
{
public override void
ice_exception(Ice.LocalException ex) { ... }
public void ice_sent() { ... }
}
def ice_flushBatchRequests_async(self, cb)
The cb argument represents a callback object that must implement an
ice_exception method. As an example, the class below demonstrates how to define a callback class that also receives a notification when the asynchronous flush completes:
class MyFlushCallbackI(object):
def ice_exception(self, ex):
# handle an exception
def ice_sent(self):
# flush has completed
Timeouts for asynchronous invocations behave like those for synchronous invocations: an
Ice::TimeoutException is raised if the response is not received within the given time period. In the case of an asynchronous invocation, however, the exception is reported to the
ice_exception method of the invocation’s callback object. For example, we can handle this exception in C++ as shown below:
class AMI_Model_interpolateI : public Demo::AMI_Model_interpolate
{
public:
// ...
virtual void ice_exception(const Ice::Exception& ex)
{
try {
ex.ice_throw();
} catch (const Demo::RangeError& e) {
cerr << "interpolate failed: range error" << endl;
} catch (const Ice::TimeoutException&) {
cerr << "interpolate failed: timeout" << endl;
} catch (const Ice::LocalException& e) {
cerr << "interpolate failed: " << e << endl;
}
}
};
It is important to remember that all errors encountered by an AMI invocation (except
CommunicatorDestroyedException) are reported back via the
ice_exception callback, even if the error condition is encountered "on the way out", when the operation is invoked. The reason for this is consistency: if an invocation, such as
foo_async could throw exceptions, you would have to handle exceptions in two places in your code: at the point of call for exceptions that are encountered "on the way out", and in
ice_exception for error conditions that are detected after the call is initiated.
p1‑>foo_async(cb1);
p2‑>bar_async(cb2);
If bar depends for its correct working on the successful completion of
foo, this code will not work because the
bar invocation will be sent regardless of whether
foo failed or not.
In such cases, where you need to be sure that one call is dispatched only if a preceding call succeeds, you must instead invoke
bar from within
foo’s
ice_response implementation, instead of from the main-line code.
AMI invocations cannot be sent using collocated optimization. If you attempt to invoke an AMI operation using a proxy that is configured to use collocation optimization, the Ice run time will raise
CollocationOptimizationException if the servant happens to be collocated; the request is sent normally if the servant is not collocated.
Section 28.21 provides more information about this optimization and describes how to disable it when necessary.