Active objects are created on a per-object basis: an application can contain active as well as passive instances of a given class. In this section, we will present the three methods for creating active instances of classes and how to use the arguments passed to the methods to control creation. Although almost any object can be turned into an Active Object, there are some restrictions that will be detailed below. We also present a in depth explanation of the structure and behaviour of an active object.The examples in this chapter contain only the code necessary for creation of active object. To see how to start the nodes, JVMs, and virtual nodes on which objects can be instantiated read Chapter 17, ProActive Basic Configuration
In
ProActive
there are two ways to create active objects: instantiation
based creation and by using an existing object. Instantion
based creation is done through
PAActiveObject.newActive(...)
and
PAActiveObject.newActiveInParallel(...)
and object based creation is done through
PAActiveObject.turnActive(...)
.
The methods are part of the
org.objectweb.proactive.api
package. Each method takes several parameters that control
how the active object is created and most of the parameters
are common between creation methods. All the available
creation methods with parameters are presented in the
PAActiveObject Javadoc
.
For the examples in this chapter we will be using the following class that can be instantiated as an active object:
public class Worker implements Serializable{ public Worker(){}//empty no-arg constructor needed by ProActive public Worker(Long age, String Name){}; public void doNothingGracefully(){}; }
Not all classes can be used to instantiate active objects. There are some restrictions, most of them caused by the 100% Java compliance, which forbids modifying the Java Virtual Machine or the compiler.
Some of these restrictions work are at class-level:
final classes cannot be used to instantiate active object
non public classes cannot be used to instantiate active object
classes without a no-argument constructor cannot be reified
There are also restrictions at method level within a class. Final methods cannot be used because the stub is created from the object and having methods final prevents the stub from overriding the methods.
When creating new instances of active objects we can use
PAActiveObject.newActive(...)
or
PAActiveObject.newActiveInParallel(...)
. ProActive also provides a way to create multiple active
objects in parallel on several nodes.
PAActiveObject.newActiveInParallel(...)
creates a number of active objects deployed on one or more
nodes. The object creation is optimized by a thread pool.
When using instantiation based creation, any argument passed
to the constructor of the reified object through
PAActiveObject.newActive(...)
or
PAActiveObject.newActiveInParallel(...)
is serialized and passed by copy to the object. That is
because the model behind
ProActive
is uniform whether the active object is instantiated locally
or remotely. The parameters are therefore guaranteed to be
passed by copy to the constructor. When using
PAActiveObject.newActive(...)
, one needs to make sure that the constructor arguments are
Serializable
. On the other hand, the class used to create the active
object does not need to be
Serializable
even in the case of remotely-created Active Objects. Bear in
mind also that a reified object must have a declared empty
no-args constructor in order to be properly created.
To create a single active object from the class
Worker
in the local JVM we use the following code. If the
invocation of the constructor of class
Worker
throws an exception, it is placed inside an exception of
type
ActiveObjectCreationException
. When the call to
newActive
returns, the active object has been created and its
active thread is started.
Worker charlie; //set the constructor values to be used Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" }; try { charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(), params); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
The following code deploys an active object on each node
contained in the virtual node
someVirtualNode
. In this case the
Worker
constructor doesn't take any arguments. However, we have
to create an
Object
array equal in length with the number of nodes. The
creation of active objects is optimized by a thread
pool. When the call to
newActiveInParallel
returns, the active objects have been created and their
threads have been started.
Worker[] workers; Node[] nodes; try{ nodes = someVirtualNode.getNodes(); Worker[] workers = (Worker[]) PAActiveObject.newActiveInParallel(Worker.class.getName(), new Object [nodes.length][],nodes); } catch (ActiveObjectCreationException aoExcep){ System.err.println(aoExcep.getMessage()); } catch (ClassNotFoundException classExcep) { System.err.println(classExcep.getMessage()); }
Object based creation is used for turning an existing passive object instance into an active one. It has been introduced in ProActive as an answer to the problem of creating active objects from already existing objects for which we do not have access to the source code.
Because the object already exists before turning it active,
there is no serialization involved when we create the
object. When we invoke
PAActiveObject.turnActive
on the object, two cases are possible. If we create the
active object locally (on a local node), it will not be
serialized. If we create the active object remotely (on a
remote node), the reified object will be serialized.
Therefore, if the
turnActive
is done on a remote node, the class used to create the
active object this way has to be
Serializable
. In addition, when using
turnActive
, care must be taken that no other references to the
originating object are kept by other objects after the call
to turnActive. A direct call to a method of the originating
object without passing by a ProActive stub on this object
will break the ProActive model.
The simplest code for object based creation looks like this:
Worker charlie = new Worker(); try { charlie = (Worker) PAActiveObject.turnActive(charlie, null); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
The second parameter of
turnActive
is the location where the active object will be created. No
parameter or
null
means that the active object is created locally in the
current node.
When using this method, the programmer has to make sure that
no other reference on the passive object exist after the
call to
PAActiveObject.turnActive(...)
.
If such references are used for calling methods directly
on the passive object (without going through its stub,
proxy, and body), the model will not be consistent and
specialization of synchronization will no be guaranteed.
PAActiveObject.newActive(...)
and
PAActiveObject.newActiveInParallel(...)
always take as a first argument the class name from
which the active object will be instantiated. The
classname
argument must be of type
java.lang.String
and its value is usually obtained by calling
ClassToInstantiateFrom.class.getName()
.
PAActiveObject.turnActive(...)
does not create an active object from a class but from
an existing object, therefore it takes as a first
argument the object to be turned active.
In order to create the active object
PAActiveObject.newActive(...)
and
PAActiveObject.newActiveInParallel(...)
must take a list of constructor arguments to be passed
to the constructor of the class to be instantiated as an
active object. Arguments to the constructor of the class
have to be passed as an array of
Object
. Also, we have to make sure that the constructor
arguments are
Serializable
since they are passed by a serialized copy to the
object. The ProActive runtime determines which
constructor of the class to call according to the type
of the elements of this array. Nevertheless, there is
still room for some ambiguity in resolving the
constructor because as the arguments of the constructor
are stored in an array of type
Object[]
or
Object[][]
. If one argument is null the runtime can obviously not
determine its type. In this case a exception is thrown
specifying that ProActive cannot determine the
constructor. In the example below, an ambiguity exists
between the two constructors if the corresponding
element of the
Object
array is
null
.
//class for which passing null as //an argument for the constructor //leads to the generation of an exception class DontPassNullForTheConstructor{ public DontPassNullForTheConstructor (String s){ } public DontPassNullForTheConstructor (Vector v){ } }
If we use
PAActiveObject.newActiveInParallel(..., Nodes[]
nodes)
we have the make sure the length of the first dimension
of
java.lang.Object[][]
is equal to the number of nodes since an active object
will be deployed on each node. The second dimension of
the array contains the constructor arguments for each
deployed active object. Different active objects can
have different constructor arguments.
If we use
PAActiveObject.newActiveInParallel(..., VirtualNode
virtualNode)
the virtual node will be activated if it is not active
and one active object will be started on each node
contained in the virtual node. In this case all the
active objects take the same constructor arguments as we
use a one dimensional array
java.lang.Object[]
.
It is possible to pass an argument to the call to
newActive
in order to create the new active object on a specific
JVM, possibly remote. The JVM is identified using a
Node
object, an array of
Node
or a
String
that contains an URL pointing to the node. If the
parameter is not given, the active object is created in
the current JVM and is attached to a default
Node
. The node argument and the URL string are used with
PAActiveObject.newActive(...)
and
PAActiveObject.turnActive(...)
.
The array of
Node
and the virtual node argument is used with
PAActiveObject.newActiveInParallel(...)
. The virtual node will be activated if it is not active
and one active object will be started on each node
contained in the virtual node.
Worker charlie; Node node; //set the constructor values to be used Object[] params = new Object[] { new Long(26), "Charlie" }; try { //get the node form a virtual node node = someVirtualNode.getNode(); charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(), params, node); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
To deploy using the node's URL we just have the use the URL instead of the Node as argument:
Worker charlie; String nodeURL; Node node; //set the constructor values to be used Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" }; try { //get the node form a virtual node node = someVirtualNode.getNode(); //get the node URL nodeURL = new String(node.getNodeInformation().getURL()); charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(), params, nodeURL); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
The usage is similar for
PAActiveObject.turnActive(...)
as in the examples above.
In order to deploy several active objects with
PAActiveObject.newActiveInParallel(...)
we can just pass an array of
Node
(see the example in
Section 9.3.2, “
Using
PAActiveObject.newActiveInParallel(...)
”
) or a
VirtualNode
.
Worker[] workers; //the virtual nodes needs to be initialized before using it try{ Worker[] workers = (Worker[]) PAActiveObject.newActiveInParallel(Worker.class.getName(), new Object [], someVirtualNode); } catch (ActiveObjectCreationException aoExcep){ System.err.println(aoExcep.getMessage()); } catch (ClassNotFoundException classExcep) { System.err.println(classExcep.getMessage()); }
Customizing the activity of the active object is at the core of ProActive because it allows to specify fully the behavior of an active object. By default, an object turned into an active object serves its incoming requests in a FIFO manner. In order to specify another policy for serving the requests or to specify any other behaviors one can implement interfaces defining methods that will be automatically called by ProActive.
It is possible to specify what to do before the activity starts, what the activity is and what to do after it ends. The three steps are:
the initialization of the activity (done only once)
the activity itself
the end of the activity (done only once)
Three interfaces are used to define and implement each step:
InitActive
RunActive
EndActive
In case of a migration, an active object stops and restarts its activity automatically without invoking the initialization or ending phases. Only the activity itself is restarted.
Two ways are possible to define each of the three phases of an active object.
implementing one or more of the three interfaces directly in the class used to create the active object
passing an object implementing one or more of
the three interfaces in parameter to the method
newActive
or
turnActive
(parameter
active
in those methods)
Note that the methods defined by those 3 interfaces are guaranteed to be called by the active thread of the active object.
The algorithms for each running phase
are the following (
activity
is the object passed as a parameter to
newActive
or
turnActive
):
Implementing the interfaces directly in the class
used to create the active object is the easiest
solution when you control the class that you make
active. Depending on which phase in the life of the
active object you want to customize, you implement
the corresponding interface (one or more):
InitActive
,
RunActive
and
EndActive
. Here is an example that has a custom
initialization and activity.
import org.objectweb.proactive.Service; import org.objectweb.proactive.RunActive; import org.objectweb.proactive.InitActive; import org.objectweb.proactive.Body; import org.objectweb.proactive.api.PAActiveObject; public class CustomActivity implements InitActive, RunActive { private String myName; public String getName() { return myName; } // -- implements InitActive public void initActivity(Body body) { myName = body.getName(); } // -- implements RunActive for serving request in a LIFO fashion public void runActivity(Body body) { Service service = new Service(body); while (body.isActive()) { //LIFO order service.blockingServeYoungest(); } } public static void main(String[] args) throws Exception { CustomActivity a = (CustomActivity) PAActiveObject.newActive(CustomActivity.class.getName(), null); String call1 = a.getName(); } }
Bellow is the skeleton code for a class that
can run, suspend, restart and stop a
simulation. It uses a implementation of
RunActive
to provide the necessary control.
import org.objectweb.proactive.ProActive; import org.objectweb.proactive.Service; import org.objectweb.proactive.RunActive; import org.objectweb.proactive.Body; import org.objectweb.proactive.api.PAActiveObject; public class Simulation implements RunActive { private boolean stoppedSimulation=false; private boolean startedSimulation=false; private boolean suspendedSimulation=false; private boolean notStarted = true; public void startSimulation(){ // Simulation starts System.out.println("Simulation started..."); notStarted = false; startedSimulation=true; } public void restartSimulation(){ // Simulation is restarted System.out.println("Simulation restarted..."); startedSimulation=true; suspendedSimulation=false; } public void suspendSimulation(){ // Simulation is suspended System.out.println("Simulation suspended..."); suspendedSimulation=true; startedSimulation = false; } public void stopSimulation(){ // Simulation is stopped System.out.println("Simulation stopped..."); stoppedSimulation=true; } public void runActivity(Body body) { Service service = new Service(body); while (body.isActive()) { // If the simulation is not yet started wait until startSimulation // method if(notStarted) service.blockingServeOldest("startSimulation"); // If the simulation is started serve request with FIFO if(startedSimulation) service.blockingServeOldest(); // If simulation is suspended wait until restartSimulation method if(suspendedSimulation) service.blockingServeOldest("restartSimulation"); // If simulation is stopped, exit if(stoppedSimulation) { body.terminate(); ProActive.exitSuccess(); } } } }
Example 9.1. Start, stop, suspend, restart a simulation algorithm in runActivity method
Passing an object implementing the interfaces is the
solution to use when we do not control the class
that we make active or when you want to write a
generic activity policy and reuse it with several
active objects. Depending on which phase in the life
of the active object we want to customize, we will
implement the corresponding interface (one or more)
from
InitActive
,
RunActive
and
EndActive
. Following is an example that has a custom
activity.
First we need to implement the activity that it will be passed to the active object. We do so by implementing one or more interfaces.
import org.objectweb.proactive.Body; import org.objectweb.proactive.RunActive; import org.objectweb.proactive.Service; public class LIFOActivity implements RunActive { // -- implements RunActive for serving request in a LIFO fashion public void runActivity(Body body) { Service service = new Service(body); while (body.isActive()) { System.out.println("Serving..."); service.blockingServeYoungest(); } } }
The implemented interface can be used with
PAActiveObject.newActive(..)
or
PAActiveObject.turnActive(...)
.
import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.node.NodeException; public class Main { public static void main(String[] args) { Worker w = new Worker(); try { w = (Worker) PAActiveObject.turnActive(w, null, new LIFOActivity(), null); w.doNothingGracefully(); w.doNothingGracefully(); w.doNothingGracefully(); w.doNothingGracefully(); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } } }
Creating an active object using ProActive might be a
little bit cumbersome and requires more lines of code
that for creating a regular object. A nice solution to
this problem is through the use of the
factory
pattern. This mainly applies to class based creation. It
consists in adding a static method to class
WorkerFactory
that takes care of instantiating the active object and
returns it. The code is:
import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.core.util.wrapper.IntWrapper; public class WorkerFactory extends Worker { public static Worker createActiveWorker (int age, String name, Node node) { Object[] params = new Object[] {new IntWrapper(age), name}; try { return (Worker) PAActiveObject.newActive(PrimeManager.class.getName(), params, node); } catch (ActiveObjectCreationException e) { e.printStackTrace(); return null; } catch (NodeException e) { e.printStackTrace(); return null; } } }
The static method in the factory class is then used to create active objects:
import org.objectweb.proactive.core.node.Node; public class Main { public static void main(String[] args) { Worker w = new Worker(); Node node = null; //deploy locally w = WorkerFactory.createActiveWorker(26, "Charlie", node); w.doNothingGracefully(); } }
It is up to the programmer to decide whether this method has to throw exceptions or not. We recommend that this method only throw exceptions that appear in the signature of the reified constructor (none here as the constructor of Worker that we call doesn't throw any exception). However, the non functional exceptions induced by the creation of the active object have to be dealt with somewhere in the code.
There are many cases where you may want to customize the body used when creating an active object. For instance, one may want to add some debug messages or some timing behavior when sending or receiving requests. The body is a non changeable object that delegates most of its tasks to helper objects called MetaObjects. Standard MetaObjects are already used by default in ProActive but one can easily replace any of those MetaObjects by a custom one.
We have defined a
MetaObjectFactory
interface that is able to create factories for each of
those MetaObjects. This interface is implemented by
ProActiveMetaObjectFactory
which provides all the default factories used in
ProActive.
When creating an active object, it is possible to
specify which
MetaObjectFactory
to use for that particular instance of active object
being created.
First you have to write a new MetaObject factory that
inherits from
ProActiveMetaObjectFactory
or directly implements the
MetaObjectFactory
interface. Inheriting from
ProActiveMetaObjectFactory
is a great time saver as you only redefine what you
really need to as opposed to redefining everything when
inheriting from
MetaObjectFactory
. Here is an example:
import java.io.Serializable; import org.objectweb.proactive.core.body.MetaObjectFactory; import org.objectweb.proactive.core.body.ProActiveMetaObjectFactory; import org.objectweb.proactive.core.body.UniversalBody; import org.objectweb.proactive.core.body.request.Request; import org.objectweb.proactive.core.body.request.RequestFactory; import org.objectweb.proactive.core.body.request.RequestImpl; import org.objectweb.proactive.core.mop.MethodCall; public class CustomMetaObjectFactory extends ProActiveMetaObjectFactory { private static final MetaObjectFactory instance = new CustomMetaObjectFactory(); //return a new factory instance public static MetaObjectFactory newInstance() { return instance; } private CustomMetaObjectFactory() { super(); } protected RequestFactory newRequestFactorySingleton() { System.out.println("Creating the custom metaobject factory..."); return new CustomRequestFactory(); } protected class CustomRequestFactory extends RequestFactoryImpl implements Serializable { public Request newRequest(MethodCall methodCall, UniversalBody sourceBody, boolean isOneWay, long sequenceID) { System.out.println("Received a new request..."); return new CustomRequest(methodCall, sourceBody, isOneWay, sequenceID); } protected class CustomRequest extends RequestImpl { public CustomRequest(MethodCall methodCall, UniversalBody sourceBody, boolean isOneWay, long sequenceID) { super(methodCall, sourceBody, isOneWay, sequenceID); System.out.println("I am a custom request handler"); } }//end inner class RequestHandler }//end inner class CustomRequestFactory }
The factory above simply redefines the
RequestFactory
in order to make the body use a new type of request. The
method
protected RequestFactory
newRequestFactorySingleton()
is one convenience method that
ProActiveMetaObjectFactory
provides to simplify the creation of factories as
singleton.
public class ProActiveMetaObjectFactory implements MetaObjectFactory, java.io.Serializable, Cloneable { public static final String COMPONENT_PARAMETERS_KEY = "component-parameters"; public static final String SYNCHRONOUS_COMPOSITE_COMPONENT_KEY = "synchronous-composite"; protected static Logger logger = ProActiveLogger.getLogger(Loggers.MOP); // // -- PRIVATE MEMBERS ----------------------------------------------- // // private static final MetaObjectFactory instance = new ProActiveMetaObjectFactory(); private static MetaObjectFactory instance = new ProActiveMetaObjectFactory(); public Map<String, Object> parameters = new HashMap<String, Object>(); // // -- PROTECTED MEMBERS ----------------------------------------------- // protected RequestFactory requestFactoryInstance; protected ReplyReceiverFactory replyReceiverFactoryInstance; protected RequestReceiverFactory requestReceiverFactoryInstance; protected RequestQueueFactory requestQueueFactoryInstance; protected MigrationManagerFactory migrationManagerFactoryInstance; // protected RemoteBodyFactory remoteBodyFactoryInstance; protected ThreadStoreFactory threadStoreFactoryInstance; protected ProActiveSPMDGroupManagerFactory proActiveSPMDGroupManagerFactoryInstance; protected ProActiveComponentFactory componentFactoryInstance; protected ProActiveSecurityManager proActiveSecurityManager; protected FTManagerFactory ftmanagerFactoryInstance; protected DebuggerFactory debuggerFactoryInstance; protected Object timItReductor; // // -- CONSTRUCTORS ----------------------------------------------- // protected ProActiveMetaObjectFactory() { this.requestFactoryInstance = newRequestFactorySingleton(); this.replyReceiverFactoryInstance = newReplyReceiverFactorySingleton(); this.requestReceiverFactoryInstance = newRequestReceiverFactorySingleton(); this.requestQueueFactoryInstance = newRequestQueueFactorySingleton(); this.migrationManagerFactoryInstance = newMigrationManagerFactorySingleton(); // this.remoteBodyFactoryInstance = newRemoteBodyFactorySingleton(); this.threadStoreFactoryInstance = newThreadStoreFactorySingleton(); this.proActiveSPMDGroupManagerFactoryInstance = newProActiveSPMDGroupManagerFactorySingleton(); this.ftmanagerFactoryInstance = newFTManagerFactorySingleton(); this.debuggerFactoryInstance = newDebuggerFactorySingleton(); } /** * Constructor with parameters * It is used for per-active-object configurations of ProActive factories * @param parameters the parameters of the factories; these parameters can be of any type */ public ProActiveMetaObjectFactory(Map<String, Object> parameters) { this.parameters = parameters; if (parameters.containsKey(COMPONENT_PARAMETERS_KEY)) { ComponentParameters initialComponentParameters = (ComponentParameters) parameters .get(COMPONENT_PARAMETERS_KEY); this.componentFactoryInstance = newComponentFactorySingleton(initialComponentParameters); this.requestFactoryInstance = newRequestFactorySingleton(); this.replyReceiverFactoryInstance = newReplyReceiverFactorySingleton(); this.requestReceiverFactoryInstance = newRequestReceiverFactorySingleton(); this.requestQueueFactoryInstance = newRequestQueueFactorySingleton(); this.migrationManagerFactoryInstance = newMigrationManagerFactorySingleton(); // this.remoteBodyFactoryInstance = newRemoteBodyFactorySingleton(); this.threadStoreFactoryInstance = newThreadStoreFactorySingleton(); this.proActiveSPMDGroupManagerFactoryInstance = newProActiveSPMDGroupManagerFactorySingleton(); this.ftmanagerFactoryInstance = newFTManagerFactorySingleton(); this.debuggerFactoryInstance = newDebuggerFactorySingleton(); } } // // -- PUBLICS METHODS ----------------------------------------------- // public static MetaObjectFactory newInstance() { return instance; } public static void setNewInstance(MetaObjectFactory mo) { instance = mo; } /** * getter for the parameters of the factory (per-active-object config) * @return the parameters of the factory */ public Map<String, Object> getParameters() { return this.parameters; } // // -- implements MetaObjectFactory ----------------------------------------------- // public RequestFactory newRequestFactory() { return this.requestFactoryInstance; } public ReplyReceiverFactory newReplyReceiverFactory() { return this.replyReceiverFactoryInstance; } public RequestReceiverFactory newRequestReceiverFactory() { return this.requestReceiverFactoryInstance; } public RequestQueueFactory newRequestQueueFactory() { return this.requestQueueFactoryInstance; } public MigrationManagerFactory newMigrationManagerFactory() { return this.migrationManagerFactoryInstance; } // public RemoteBodyFactory newRemoteBodyFactory() { // return this.remoteBodyFactoryInstance; // } public ThreadStoreFactory newThreadStoreFactory() { return this.threadStoreFactoryInstance; } public ProActiveSPMDGroupManagerFactory newProActiveSPMDGroupManagerFactory() { return this.proActiveSPMDGroupManagerFactoryInstance; } public ProActiveComponentFactory newComponentFactory() { return this.componentFactoryInstance; } public FTManagerFactory newFTManagerFactory() { return this.ftmanagerFactoryInstance; } public DebuggerFactory newDebuggerFactory() { return this.debuggerFactoryInstance; } // // -- PROTECTED METHODS ----------------------------------------------- // protected RequestFactory newRequestFactorySingleton() { return new RequestFactoryImpl(); } protected ReplyReceiverFactory newReplyReceiverFactorySingleton() { return new ReplyReceiverFactoryImpl(); } protected RequestReceiverFactory newRequestReceiverFactorySingleton() { return new RequestReceiverFactoryImpl(); } protected RequestQueueFactory newRequestQueueFactorySingleton() { return new RequestQueueFactoryImpl(); } protected MigrationManagerFactory newMigrationManagerFactorySingleton() { return new MigrationManagerFactoryImpl(); } // protected RemoteBodyFactory newRemoteBodyFactorySingleton() { // return new RemoteBodyFactoryImpl(); // } protected ThreadStoreFactory newThreadStoreFactorySingleton() { return new ThreadStoreFactoryImpl(); } protected ProActiveSPMDGroupManagerFactory newProActiveSPMDGroupManagerFactorySingleton() { return new ProActiveSPMDGroupManagerFactoryImpl(); } protected ProActiveComponentFactory newComponentFactorySingleton( ComponentParameters initialComponentParameters) { return new ProActiveComponentFactoryImpl(initialComponentParameters); } protected FTManagerFactory newFTManagerFactorySingleton() { return new FTManagerFactoryImpl(); } protected DebuggerFactory newDebuggerFactorySingleton() { return new DebuggerFactoryImpl(); } // // // // -- INNER CLASSES ----------------------------------------------- // // protected static class RequestFactoryImpl implements RequestFactory, java.io.Serializable { public Request newRequest(MethodCall methodCall, UniversalBody sourceBody, boolean isOneWay, long sequenceID) { //########### exemple de code pour les nouvelles factories // if(System.getProperty("migration.stategy").equals("locationserver")){ // return new RequestWithLocationServer(methodCall, sourceBody, // isOneWay, sequenceID, LocationServerFactory.getLocationServer()); // }else{ return new org.objectweb.proactive.core.body.request.RequestImpl(methodCall, sourceBody, isOneWay, sequenceID); //} } } // end inner class RequestFactoryImpl protected static class ReplyReceiverFactoryImpl implements ReplyReceiverFactory, java.io.Serializable { public ReplyReceiver newReplyReceiver() { return new org.objectweb.proactive.core.body.reply.ReplyReceiverImpl(); } } // end inner class ReplyReceiverFactoryImpl protected class RequestReceiverFactoryImpl implements RequestReceiverFactory, java.io.Serializable { public RequestReceiver newRequestReceiver() { if (ProActiveMetaObjectFactory.this .parameters.containsKey(SYNCHRONOUS_COMPOSITE_COMPONENT_KEY) && ((Boolean) ProActiveMetaObjectFactory.this.parameters .get(ProActiveMetaObjectFactory.SYNCHRONOUS_COMPOSITE_COMPONENT_KEY)).booleanValue()) { return new SynchronousComponentRequestReceiver(); } return new org.objectweb.proactive.core.body.request.RequestReceiverImpl(); } } // end inner class RequestReceiverFactoryImpl protected class RequestQueueFactoryImpl implements RequestQueueFactory, java.io.Serializable { public BlockingRequestQueue newRequestQueue(UniqueID ownerID) { if ("true".equals(ProActiveMetaObjectFactory.this.parameters .get(SYNCHRONOUS_COMPOSITE_COMPONENT_KEY))) { return null; } //if (componentFactoryInstance != null) { // COMPONENTS // we need a request queue for components //return new ComponentRequestQueueImpl(ownerID); //} else { return new org.objectweb.proactive.core.body.request.BlockingRequestQueueImpl(ownerID); //} } } // end inner class RequestQueueFactoryImpl protected static class MigrationManagerFactoryImpl implements MigrationManagerFactory, java.io.Serializable { public MigrationManager newMigrationManager() { //########### example de code pour les nouvelles factories // if(System.getProperty("migration.stategy").equals("locationserver")){ // return new MigrationManagerWithLocationServer(LocationServerFactory.getLocationServer()); // }else{ return new org.objectweb.proactive.core.body.migration.MigrationManagerImpl(); //} } } // end inner class MigrationManagerFactoryImpl // protected static class RemoteBodyFactoryImpl implements RemoteBodyFactory, // java.io.Serializable { // public UniversalBody newRemoteBody(UniversalBody body) { // try { // if (Constants.IBIS_PROTOCOL_IDENTIFIER.equals( // ProActiveConfiguration.getInstance() // .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) { // if (logger.isDebugEnabled()) { // logger.debug( // "Using ibis factory for creating remote body"); // } // return new org.objectweb.proactive.core.body.ibis.IbisBodyAdapter(body); // } else if (Constants.XMLHTTP_PROTOCOL_IDENTIFIER.equals( // ProActiveConfiguration.getInstance() // .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) { // if (logger.isDebugEnabled()) { // logger.debug( // "Using http factory for creating remote body"); // } // // return new org.objectweb.proactive.core.body.http.HttpBodyAdapter(body); // } else if (Constants.RMISSH_PROTOCOL_IDENTIFIER.equals( // ProActiveConfiguration.getInstance() // .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) { // if (logger.isDebugEnabled()) { // logger.debug( // "Using rmissh factory for creating remote body"); // } // return new org.objectweb.proactive.core.body.rmi.SshRmiBodyAdapter(body); // } else { // if (logger.isDebugEnabled()) { // logger.debug( // "Using rmi factory for creating remote body"); // } // return new org.objectweb.proactive.core.body.rmi.RmiBodyAdapter(body); // } // } catch (ProActiveException e) { // throw new ProActiveRuntimeException("Cannot create Remote body adapter ", // e); // } // } // } // end inner class RemoteBodyFactoryImpl protected static class ThreadStoreFactoryImpl implements ThreadStoreFactory, java.io.Serializable { public ThreadStore newThreadStore() { return new org.objectweb.proactive.core.util.ThreadStoreImpl(); } } // end inner class ThreadStoreFactoryImpl protected static class ProActiveSPMDGroupManagerFactoryImpl implements ProActiveSPMDGroupManagerFactory, java.io.Serializable { public ProActiveSPMDGroupManager newProActiveSPMDGroupManager() { return new ProActiveSPMDGroupManager(); } } // end inner class ProActiveGroupManagerFactoryImpl protected class ProActiveComponentFactoryImpl implements ProActiveComponentFactory, java.io.Serializable { // COMPONENTS private ComponentParameters componentParameters; public ProActiveComponentFactoryImpl(ComponentParameters initialComponentParameters) { this.componentParameters = initialComponentParameters; } public ProActiveComponent newProActiveComponent(Body myBody) { return new ProActiveComponentImpl(this.componentParameters, myBody); } } // FAULT-TOLERANCE protected class FTManagerFactoryImpl implements FTManagerFactory, Serializable { public FTManager newFTManager(int protocolSelector) { switch (protocolSelector) { case FTManagerFactory.PROTO_CIC_ID: return new FTManagerCIC(); case FTManagerFactory.PROTO_PML_ID: return new FTManagerPMLRB(); default: logger.error("Error while creating fault-tolerance manager : " + "no protocol is associated to selector value " + protocolSelector); return null; } } public FTManager newHalfFTManager(int protocolSelector) { switch (protocolSelector) { case FTManagerFactory.PROTO_CIC_ID: return new HalfFTManagerCIC(); case FTManagerFactory.PROTO_PML_ID: return new HalfFTManagerPMLRB(); default: logger.error("Error while creating fault-tolerance manager : " + "no protocol is associated to selector value " + protocolSelector); return null; } } } protected static class DebuggerFactoryImpl implements DebuggerFactory, java.io.Serializable { public Debugger newDebugger() { return new DebuggerImpl(); } } // SECURITY public void setProActiveSecurityManager(ProActiveSecurityManager psm) { this.proActiveSecurityManager = psm; } public ProActiveSecurityManager getProActiveSecurityManager() { return this.proActiveSecurityManager; } @Override public Object clone() throws CloneNotSupportedException { ProActiveMetaObjectFactory clone = null; try { return MakeDeepCopy.WithObjectStream.makeDeepCopy(this); } catch (IOException e) { //TODO replace by CloneNotSupportedException(Throwable e) java 1.6 throw (CloneNotSupportedException) new CloneNotSupportedException(e.getMessage()).initCause(e); } catch (ClassNotFoundException e) { throw (CloneNotSupportedException) new CloneNotSupportedException(e.getMessage()).initCause(e); } } public void setTimItReductor(Object timItReductor) { this.timItReductor = timItReductor; } public Object getTimItReductor() { return this.timItReductor; } }
More explanations can be found in the javadoc of that class . The use of that factory is fairly simple. All you have to do is to pass an instance of the factory when creating a new active object. If we take the same example as before we have:
Object[] params = new Object[] {new IntWrapper(26), "Charlie"}; Worker w; try { w = (Worker) PAActiveObject.newActive(Worker.class.getName(), null,null,null,null,CustomMetaObjectFactory.newInstance()); } catch (ActiveObjectCreationException e) { e.printStackTrace(); } catch (NodeException e) { e.printStackTrace(); }
In the case of a
turnActive
we would have:
Worker w = new Worker(26, "Charlie"); try { charlie = (Worker) ProActive.turnActive(Worker.class.getName(), null, null, CustomMetaObjectFactory.newInstance()); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
In this section, we'll have a very close look at what happens when an active object is created. This section aims at providing a better understanding of how the library works and where the restrictions of Proactive come from.
Consider that some code in an instance of class
A
creates an active object of class
Worker
using a piece of code like this:
Worker charlie; //set the constructor values to be used Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" }; try { charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(), params); } catch (ActiveObjectCreationException aoExcep) { // creation of ActiveObject failed System.err.println(aoExcep.getMessage()); } catch (NodeException nodeExcep){ System.err.println(nodeExcep.getMessage()); }
If the creation of the active instance of B is successful, the graph of objects is as described in figure below (with arrows denoting references).
The active instance of Worker is composed of 2 objects:
a body (
Body
)
an instance of
Worker
Besides the active object composed of a body and an object there is also a proxy and a stub for each active or passive object referencing the active object.
The role of the class
Stub_Worker
is to reify all method calls that can be performed
through a reference of type
Worker
.
Reifying a call simply means constructing an object
that represents the call(in
our case, all reified calls are instance of class
org.objectweb.proactive.core.mop.MethodCall
),
so it can be
manipulated as any other object. The reified call is
then processed by the other components of the active
object in order to achieve the behavior we expect from
an active object.
Using a standard object for representing elements of the language that are not normally objects (such as method calls, constructor calls, references, types,...) is at the core of metaobject programming. The metaobject protocol (MOP) ProActive is built on is described in Chapter 43, MOP: Metaobject Protocol but it is not a prerequisite for understanding and using ProActive.
As one of our objectives is to provide transparent
active objects, references to active objects of class
Worker
need to be of the same type as references to passive
instances of
Worker
(this feature is called
polymorphism
between passive and active instances of the same class).
This is why
Stub_Worker
is a subclass of class
Worker
by construction,
therefore allowing instances of class
Stub_Worker
to be assigned to variables of type
Worker
.
Class
Stub_Worker
redefines each of the methods inherited from its
superclasses. The code of each method of class
Stub_Worker
builds an instance of class
org.objectweb.proactive.core.mop.MethodCall
in order to represent the call to this method. This
object is then passed to the
BodyProxy
, which returns an object that is returned as the result
of the method call. From the caller's point of view,
everything looks like if the call had been performed on
an instance of
Worker
.
Now that we know how stubs work, we can understand some of the limitations of ProActive:
Obviously,
Stub_Worker
cannot redefine
final
methods inherited from class
Worker
.
If the methods are final, calls to these methods are not
reified but are executed on the stub, which may
lead to unpredictable behavior.
As there are 6 final methods in the base class
Object
,
there is the question of how ProActive deals with these
methods. In
fact, 5 out of this 6 methods deal with thread
synchronization (
notify()
,
notifyAll()
and the 3 versions of
wait()
). Those methods should not be used since an
active object provides thread synchronization.
Using the standard thread
synchronization mechanism and ProActive thread
synchronization mechanism at the same time might
conflict and result in an very hard to debug behaviour.
The last final method in the class
Object
is
getClass()
. When invoked on an active object,
getClass()
is not reified and therefore performed on the
stub object, which returns an object of class
Class
that represents the class of the stub (
Stub_Worker
in our example) and not the class of the active
object itself (
Worker
in our example). However, this method is seldom
used in standard applications and it doesn't
prevent the operator
instanceof
from working thanks to its polymorphic behavior.
Therefore the expression
(foo instanceof Worker)
has the same value whether Worker is active or not.
Getting or setting instance variables directly
(not through a get or a set method) must be
avoided for active objects because it
results in getting or setting the value on the
stub object and not on the instance of the class
Worker
.
This problem is usually worked around by using
get/set methods for setting or reading
attributes. This rule of strict encapsulation
can also be found in JavaBeans or in most
distributed object systems like RMI or CORBA.
The role of the proxy is to handle asynchronism in calls
to active object. More specifically, it creates future
objects if possible and needed, forwards calls to bodies,
and returns future objects to the stubs. As this class
operates on
MethodCall
objects, it is absolutely generic and does not depend at
all on the type of the stub that feeds calls in through
its
reify
method.
The
body
is responsible for storing calls (actually,
Request
objects) in a queue of pending requests and processing
these request according to a given synchronization
policy. The default behaviour for the synchronization policy is FIFO.
The Body has its
own thread which chooses requests from the
queue of request and executes the associated call.
There is also a standard instance of class
Worker
. It may contain some synchronized information in its
live
method, if any. As the body executes calls one by one,
there cannot be any concurrent execution of two portions
of code for this object by two different threads. This
enables the use of pre and post-conditions and class
invariants. As a consequence, the use of the keyword
synchronized
in class
Worker
should not be necessary. Any synchronization scheme that
can be expressed through monitors and
synchronized
statements can be expressed using ProActive's high-level
synchronization mechanism in a much more natural and
user-friendly way.
Whenever possible a method call on an active object is reified as an asynchronous request. If not possible the call is synchronous and blocks until the reply is received. In case the request is asynchronous, it immediately returns a future object.
This object acts as a placeholder for the result of the not-yet-performed method invocation. As a consequence, the calling thread can go on with executing its code, as long as it doesn't need to invoke methods on the returned object, in which case the calling thread is automatically blocked if the result of the method invocation is not yet available. Below are shown the different cases that can lead to an asynchronous call. Note that this table does not apply when using the tryWithCatch anotators to deal with asynchronous exceptions. See Chapter 12, Exception Handling .
Method Signature |
Creation of a future |
Asynchronous |
|
Return type |
Can throw checked exception |
||
Void |
No |
No |
Yes |
Void |
Yes |
No |
No |
Non Reifiable Object |
No |
No |
No |
Non Reifiable Object |
Yes |
No |
No |
Reifiable Object |
No |
Yes |
Yes |
Reifiable Object |
Yes |
No |
No |
As we can see, the creation of a future depends not only on the caller type, but also on the return object type. Creating a future is only possible if the object is reifiable. Although a future has a similar structure to an active object, a future object is not active. It only has a Stub and a Proxy as shown in the figure below:
During its lifetime, an active object can create many future objects. There are all automatically kept in a FuturePool.
Each time a future is created, it is inserted in the
future pool of the corresponding active object. When the
result becomes available, the future object is removed
from the pool. There are several methods in
org.objectweb.proactive.api.ProFuture
which provide control over the way futures are returned.
Any call to a future object is reified in order to
be blocked if the future is not yet available and
later executed on the result object. However, two
methods don't follow this scheme: equals and
hashCode. They are often called by other methods
from the Java library, like
HashTable.add()
and so are most of the time out of control from the
user. This can lead very easily to deadlocks if they
are called on a not yet available object.
There are some drawbacks with this technique, the
main one being the impossibility to have a user
override the default
HashTable
and
equals()
methods.
hashCode() Instead of returning the hashcode of the object, it returns the hashcode of its proxy. Since there is only one proxy per future object, there is a unique equivalence between them.
equals()
The default implementation of
equals()
in the Object class is to compare the references of
two objects. In ProActive it is redefined to compare
the hashcode of two proxies. As a consequence it is
only possible to compare two future object, and not
a future object with a normal object.
toString()
The
toString()
method is most of the time called with
System.out.println()
to turn an object into a printable string. In the
current implementation, a call to this method will
block on a future object like any other call, thus,
one has to be careful when using it. As an example,
trying to print a future object for debugging
purpose will most of the time lead to a deadlock.
Instead of displaying the corresponding string of a
future object, you might consider displaying its
hashCode.
First, let's introduce the example we'll use
throughout this section. Let us say that some piece
of code in an instance of class
A
calls method
foo
on an active instance of class
Worker
.
This call is asynchronous and returns a future
object of class
Value
.
Then, possibly after having executed some other
code, the same thread that issued the call calls
method
bar
on the future object returned by the call to
foo
.
The code for the following example is:
Value v; v = Worker.foo(); v.bar();
In a sequential, single-threaded version of the same
application, the thread would have executed the code
of the caller object A
up to the call of foo
, then the code of
foo()
in class
Worker
,
then the code of the calling method in
object A
up to the call to
bar
, then the code of
bar
in class
V
, and finally the code of the calling method
in class
A
until its end. The sequence diagram below summarizes
this execution. You can notice how the single thread
successively executes code of different methods in
different classes.
Now we will show how an active object handles the call for the code above.
Instead of blocking on the execution of foo()
it will create a future
and continue execution.
Let us first get an idea of what the graph of
objects at execution (the objects with their
references to each other) looks like at three
different moments of the execution:
Before calling
foo
we have exactly the same setup as after
the creation of the active instance of
worker
: an
instance of class
A
and an active instance of class
Worker
. As with all active objects, the instance of
the class
Worker
is composed a
Body
with a request queue
and the actual instance of
Worker
.
After the asynchronous call to
foo
has returned,
A
now holds a reference onto a future object
representing the not yet available result of
the call. It is actually composed of a
Stub_Value
and a
FutureProxy
as shown on the figure below.
Right after having executed
foo
on the instance of
Worker
, the thread of the
Body
sets the result in the future, which results
in the
FutureProxy
having a reference onto a
Value
(see figure below).
The sequence of calls is more complex for the active object
case. We have now two
threads: the thread that belongs to object's A
subsystem, and the thread that belongs to
the Worker
's subsystem.
We will call these the first and second threads.
The first thread invokes
foo
on an instance of
Stub_Worker
, which builds a
MethodCall
object and passes it to the
BodyProxy
as a parameter of the call to be
reified
. The proxy then checks the return type of the call
(in this case
Value
) and generates a future object of type
Value
for representing the result of the method
invocation. The future object is actually composed
of a
Stub_V
and a
FutureProxy
.
A reference onto this future object is set in the
MethodCall
object. This reference will be used to set the value
after the call has executed. When the
MethodCall
object is ready, it is passed as a parameter for a Request to the
Body
of the Worker
active object. The
Body
simply
appends this request to the queue of pending
requests and returns immediately. The call to
foo
issued by A
returns a future object of type Stub_Value
,
that is a subclass of
Value
.
At some point, possibly after having served some
other requests, the
second thread
(the active object's thread) picks up the previous request issued by
the first thread. It then executes the embedded call by
calling
foo
on the instance of
Worker
with the actual parameters stored in the
MethodCall
object. As specified in its signature, this call
returns an object of type
Value
. The second thread
is responsible for setting this object in the
future object (which is the reason why
MethodCall
objects hold a reference on the future object
created by the FutureProxy
).
The execution of the call is now over, and the
second thread can select and serve another request from
the queue.
In the meantime, the
first thread
has continued executing the code of the calling
method in class
A
.
At some point, it calls
bar
on the object of type
Stub_Value
that was returned by the call to
foo
. This call is reified thanks to the
Stub_Value
and processed by the
FutureProxy
. If the object the
future represents is available
(the second thread has already set it in the future object),
the call is executed and returns a value to the calling code in
A
. The sequence diagram below presents the
process graphically.
If the value in the future it is not yet available, the first thread is
suspended in FutureProxy
until the second thread sets the result in the
future object (see figure below). This is called
wait-by-necessity
.
There are few things to remember with active objects, futures, and asynchronous method calls, in order to avoid annoying debugging sessions:
A constructor with no-args needs to be used either for the Active Objects creation(if not present, an exception might be thrown) or Future creation for a method call (if not present, the method call is synchronous). Avoid placing initialization code in this constructor, as it may lead to unexpected behavior because this constructor is called for the stub creation.
Make your classes implement Serializable interface since ProActive deals with objects that will be sent across networks.
Use wrappers instead of primitive types or final classes for methods result types. Otherwise you will loose the asynchronism capabilities. For instance if one of your object has a method
int giveSolution(parameter)
calling this method with ProActive is synchronous. To have asynchronous calls use
IntWrapper giveSolution(parameter)
All wrappers are in the package:
org.objectweb.proactive.core.util.wrapper
ProActive provides several primitive type wrappers, with 2 versions of each, one mutable, and another immutable.
To make the call asynchronous only the methods return types have to use wrappers. The parameters can use the regular Java types.
Avoid
to return null
in Active Object methods:
on the caller side the test
if(result_from_method == null)
has no sense. Result_from_method
is a
couple Stub - FutureProxy as explained above, so
even if the method returns null
,
result_from_method cannot be null
:
public class MyObject { public MyObject() {} //empty constructor with no-args public Object getObject{ if(.....) { return new Object(); } else { return null; //to avoid in ProActive } } }
On the caller side:
MyObject o = new MyObject(); Object result_from_method = o.getObject(); if(result_from_method == null){ ...... }
This test is never true because
result_from_method is
Stub-->Proxy-->null
if the future is not yet available or the method
returns null
or
Stub-->Proxy-->Object
if the future is available, but
result_from_method
is never null
.
Waiting for the update of a future in a ProActive application could freeze the application if the node responsible for updating the future experiences a failure. To prevent this, ProActive has a comprehensive generic fault tolerance mechanism. However, it can be too expensive to checkpoint all the active objects, when fault detection is only needed for certain objects.
By default, ProActive continuously pings active objects expected to update awaited futures. When such a ping fails, the associated awaited future is updated with a runtime exception. Accessing the future will throw an exception.
public static void function() { F future = ao.asyncCall(); String str; try { str = future.toString(); } catch (FutureMonitoringPingFailureException fmpfe) { System.out.println("The active object 'ao' had a failure" + fmpfe.printStackTrace()); } }
The ping is started when the future is being awaited, but
it is also possible to start it beforehand using the
ProFuture.monitorFuture(java.lang.Object future)
.
To find out more about the fault tolerance mechanism read Chapter 32, Fault-Tolerance
Automatic continuation is the propagation of a future outside the activity that has sent the corresponding request.
Automatic continuations allows using non-updated futures in the normal program flow without blocking to wait for updating the result contained in the future. When the result is available on the object that originated the creation of the future, this object updates the result in all the objects to which it passed the future. If the objects passed the future further they will updated the value in the corresponding objects.
An automatic continuation can occur when sending a
request (parameter of the request is a future or
contains a future) or when sending a reply (the
result is a future or contains a future).
Outgoing futures are registered in the
FuturePool
of the Active Object sending the future.
Registration for the couple Future - destination Body
as an automatic continuation occurs when the future is
serialized. Every request or reply are
serialized before being sent, and the future is
part of the request or the reply.
A thread
Thread
sending the message keeps in a static table (FuturePool.bodyDestination
)
a reference to the destination body. Hence
when a future
Future
is serialized by the same thread, looks up in the
static table the destination
Destination
registered for the thread
Thread
.
If it finds a destination for the future, the future notifies the
FuturePool
that it is going to leave, which in turn
registers the couple Future - Destination Body as an automatic
continuation
When the value is available for the future, it is
is propagated to all objects that received the
future Future
.
This type of update is realized by a thread
located in the FuturePool
.
When a message containing a future is received by
an active object the active object registers the
future in the
FuturePool
to be able to update it when the value will be
available.
After the future is deserialized, it is
registered in a static table
FuturePool.incomingFutures
.
To illustrate how automatic continuation takes place we will use three classes:
A
, B
, and Worker
. The
A
and B
classes are used to instantiate passive
objects and the Worker
class is used to instantiate an active object.
The Worker
class has a foo()
method and the
B
class has a bar(Value v)
method.
public class A { .... public static void main(String[] args){ ...... B b = new B(); Worker worker = (Worker) PAActiveObject.newActive(Worker.class.getName(), //class name null);//constructor arguments Value v1 = worker.foo(); //v1 is a future Value v2 = b.bar(v1); //v1 is passed as parameter ........ } //end of main } //end of class A
where
public class Worker { ... public Result foo(){ ... } ... } //end of class Worker
and
public class B { ... public Result bar (Value v) { ... } ... } //end of class B
We will first call foo()
on the worker. The call goes through the worker stub
and the body proxy and it is placed in the queue of the body. We are using
a passive object as an example, however the process is similar if the
caller was an active object or if the the objects were all on the same machine.In
those cases the call
will also go through the stub and proxy as described in the sequence diagrams above.
After sending the request the thread of object A
continues immediately
(asynchronous call)
and v1
points to a future that has not been updated yet.
Next, object A
calls bar(v1)
on B.
The call on B
leads to the creation of another future
to which v2
will point to. Also because v1
has not been updated yet there will be a another future created on object
B
.
At some point the active object finishes executing foo
and the value of the future for v1
will be updated by the
Body
.
After the value
on A
is
updated, A
updates the value in the future on B
.
Once B
has the value for the future it will execute
bar()
with the recieved value and update the value
v2
in A
.
The final state is with all the future values updated.
As seen above, as long there is a future needed for a computation, it will be passed to the object doing the computation as long as it is not need it immediately. If the value is needed right away, the caller thread will block until the future is updated, just like in single threaded call.
Exchange operation offers a way to exchange potentially a very large amount of data between two active objects, making an implicit synchronization. The Exchange operator takes advantage of the serialization / de-serialization steps to exchange data in an efficient way.
This operator is *not* compatible with the ProActive Fault-Tolerance mechanism.
To use it, you have to specify different variables :
a tag
which is an unique identifier for the exchange
operation in case of multiple exchanges performed in parallel,
a reference to the active object
you want to exchange
data with,
a reference to the data source
you want to send to the
other Active Object,
a reference to the location
where the received data
will be put,
the amount of data
you want to exchange (must be
symmetric).
To illustrate the usage of the Exchange
operator, here is an
example with two active objects, AO1 and AO2 which are exchanging the half of a
local array to get a full one.
public class AO1 { private int[] myArray; (...) public void foo() { Exchanger exchanger = PAActiveObject.getExchanger(); myArray = new int[] {1,2,0,0}; // Before : myArray = [1, 2, 0, 0] exchanger.exchange("myExch", ao2, myArray, 0, myArray, 2, 2); // After : myArray = [1, 2, 3, 4] } } public class AO2 { private int[] myArray; (...) public void bar() { Exchanger exchanger = PAActiveObject.getExchanger(); myArray = new int[] {0,0,3,4}; // Before : myArray = [0, 0, 3, 4] exchanger.exchange("myExch", ao1, myArray, 2, myArray, 0, 2); // After : myArray = [1, 2, 3, 4] } }
Standard exchangeable data are array of integers, doubles and bytes. Also, you
can exchange your own complex double structure by implementing the
ExchangeablDouble
interface. Here is an example of an
implementation:
public class ComplexDoubleArray implements ExchangeableDouble { private double[] array; private int getPos, putPos; public ComplexDoubleArray(int size, boolean odd) { this.array = new double[size]; this.getPos = odd ? 1 : 0; this.putPos = odd ? 0 : 1; for (int i = getPos; i < array.length; i += 2) { array[i] = Math.random(); } } public double get() { double res = array[getPos]; getPos += 2; return res; } public boolean hasNextGet() { return getPos < array.length; } public boolean hasNextPut() { return putPos < array.length; } public void put(double value) { array[putPos] = value; putPos += 2; } }
© 1997-2008 INRIA Sophia Antipolis All Rights Reserved