Active Objects: Creation And Advanced Concepts

10.1. Overview

Active objects are created on a per-object basis: an application can contain active as well as passive instances of a given class. In this section, we will present the three methods for creating active instances of classes and how to use the arguments passed to the methods to control creation. Although almost any object can be turned into an Active Object, there are some restrictions that will be detailed below. We also present a in depth explanation of the structure and behaviour of an active object.The examples in this chapter contain only the code necessary for creation of active object. To see how to start the nodes, JVMs, and virtual nodes on which objects can be instantiated read Chapter 18, ProActive Basic Configuration

In ProActive there are two ways to create active objects: instantiation based creation and by using an existing object. Instantion based creation is done through PAActiveObject.newActive(...) and PAActiveObject.newActiveInParallel(...) and object based creation is done through PAActiveObject.turnActive(...). The methods are part of the org.objectweb.proactive.api package. Each method takes several parameters that control how the active object is created and most of the parameters are common between creation methods. All the available creation methods with parameters are presented in the PAActiveObject Javadoc .

PAActiveObject class

Figure 10.1. PAActiveObject class


For the examples in this chapter we will be using the following class that can be instantiated as an active object:

public class Worker implements Serializable{
	public Worker(){}//empty no-arg constructor needed by ProActive
	public Worker(Long age, String Name){};
	public void doNothingGracefully(){};
}

10.2. Restrictions on creating active objects

Not all classes can be used to instantiate active objects. There are some restrictions, most of them caused by the 100% Java compliance, which forbids modifying the Java Virtual Machine or the compiler.

Some of these restrictions work are at class-level:

  • final classes cannot be used to instantiate active object

  • non public classes cannot be used to instantiate active object

  • classes without a no-argument constructor cannot be reified

There are also restrictions at method level within a class. Final methods cannot be used because the stub is created from the object and having methods final prevents the stub from overriding the methods.

10.3. Instantiation Based Creation

When creating new instances of active objects we can use PAActiveObject.newActive(...) or PAActiveObject.newActiveInParallel(...) . ProActive also provides a way to create multiple active objects in parallel on several nodes. PAActiveObject.newActiveInParallel(...) creates a number of active objects deployed on one or more nodes. The object creation is optimized by a thread pool.

When using instantiation based creation, any argument passed to the constructor of the reified object through PAActiveObject.newActive(...) or PAActiveObject.newActiveInParallel(...) is serialized and passed by copy to the object. That is because the model behind ProActive is uniform whether the active object is instantiated locally or remotely. The parameters are therefore guaranteed to be passed by copy to the constructor. When using PAActiveObject.newActive(...) , one needs to make sure that the constructor arguments are Serializable . On the other hand, the class used to create the active object does not need to be Serializable even in the case of remotely-created Active Objects. Bear in mind also that a reified object must have a declared empty no-args constructor in order to be properly created.

10.3.1.  Using PAActiveObject.newActive(...)

To create a single active object from the class Worker in the local JVM we use the following code. If the invocation of the constructor of class Worker throws an exception, it is placed inside an exception of type ActiveObjectCreationException . When the call to newActive returns, the active object has been created and its active thread is started.

Worker charlie;
//set the constructor values to be used
Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" };
try {
	charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(),
				params);
} 
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

10.3.2.  Using PAActiveObject.newActiveInParallel(...)

The following code deploys an active object on each node contained in the virtual node someVirtualNode . In this case the Worker constructor doesn't take any arguments. However, we have to create an Object array equal in length with the number of nodes. The creation of active objects is optimized by a thread pool. When the call to newActiveInParallel returns, the active objects have been created and their threads have been started.

Worker[] workers;
Node[] nodes;
try{
	nodes =  someVirtualNode.getNodes();
	Worker[] workers = (Worker[]) PAActiveObject.newActiveInParallel(Worker.class.getName(), 
						new Object [nodes.length][],nodes);
}						
catch (ActiveObjectCreationException aoExcep){
	System.err.println(aoExcep.getMessage());
}
catch (ClassNotFoundException classExcep) {
	System.err.println(classExcep.getMessage());
}

10.4. Object Based Creation using PAActiveObject.turnActive(...)

Object based creation is used for turning an existing passive object instance into an active one. It has been introduced in ProActive as an answer to the problem of creating active objects from already existing objects for which we do not have access to the source code.

Because the object already exists before turning it active, there is no serialization involved when we create the object. When we invoke PAActiveObject.turnActive on the object, two cases are possible. If we create the active object locally (on a local node), it will not be serialized. If we create the active object remotely (on a remote node), the reified object will be serialized. Therefore, if the turnActive is done on a remote node, the class used to create the active object this way has to be Serializable . In addition, when using turnActive , care must be taken that no other references to the originating object are kept by other objects after the call to turnActive. A direct call to a method of the originating object without passing by a ProActive stub on this object will break the ProActive model.

The simplest code for object based creation looks like this:

Worker charlie = new Worker();
try {
	charlie = (Worker) PAActiveObject.turnActive(charlie, null);
}
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

The second parameter of turnActive is the location where the active object will be created. No parameter or null means that the active object is created locally in the current node.

When using this method, the programmer has to make sure that no other reference on the passive object exist after the call to PAActiveObject.turnActive(...). If such references are used for calling methods directly on the passive object (without going through its stub, proxy, and body), the model will not be consistent and specialization of synchronization will no be guaranteed.

10.5. Active Object Creation Arguments

10.5.1.  Using classname and target

PAActiveObject.newActive(...) and PAActiveObject.newActiveInParallel(...) always take as a first argument the class name from which the active object will be instantiated. The classname argument must be of type java.lang.String and its value is usually obtained by calling ClassToInstantiateFrom.class.getName() . PAActiveObject.turnActive(...) does not create an active object from a class but from an existing object, therefore it takes as a first argument the object to be turned active.

10.5.2. Using Constructor Arguments

In order to create the active object PAActiveObject.newActive(...) and PAActiveObject.newActiveInParallel(...) must take a list of constructor arguments to be passed to the constructor of the class to be instantiated as an active object. Arguments to the constructor of the class have to be passed as an array of Object . Also, we have to make sure that the constructor arguments are Serializable since they are passed by a serialized copy to the object. The ProActive runtime determines which constructor of the class to call according to the type of the elements of this array. Nevertheless, there is still room for some ambiguity in resolving the constructor because as the arguments of the constructor are stored in an array of type Object[] or Object[][] . If one argument is null the runtime can obviously not determine its type. In this case a exception is thrown specifying that ProActive cannot determine the constructor. In the example below, an ambiguity exists between the two constructors if the corresponding element of the Object array is null .

//class for which passing null as 
//an argument for the constructor 
//leads to the generation of an exception
class DontPassNullForTheConstructor{
	public DontPassNullForTheConstructor (String s){
	}
	public DontPassNullForTheConstructor (Vector v){
	}
}

If we use PAActiveObject.newActiveInParallel(..., Nodes[] nodes) we have the make sure the length of the first dimension of java.lang.Object[][] is equal to the number of nodes since an active object will be deployed on each node. The second dimension of the array contains the constructor arguments for each deployed active object. Different active objects can have different constructor arguments. If we use PAActiveObject.newActiveInParallel(..., VirtualNode virtualNode) the virtual node will be activated if it is not active and one active object will be started on each node contained in the virtual node. In this case all the active objects take the same constructor arguments as we use a one dimensional array java.lang.Object[] .

10.5.3. Using A Node

It is possible to pass an argument to the call to newActive in order to create the new active object on a specific JVM, possibly remote. The JVM is identified using a Node object, an array of Node or a String that contains an URL pointing to the node. If the parameter is not given, the active object is created in the current JVM and is attached to a default Node . The node argument and the URL string are used with PAActiveObject.newActive(...) and PAActiveObject.turnActive(...). The array of Node and the virtual node argument is used with PAActiveObject.newActiveInParallel(...) . The virtual node will be activated if it is not active and one active object will be started on each node contained in the virtual node.

Worker charlie;
Node node;
//set the constructor values to be used
Object[] params = new Object[] { new Long(26), "Charlie" };
try {
	//get the node form a virtual node
	node = someVirtualNode.getNode();
	charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(),
				params, node);
} 
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

To deploy using the node's URL we just have the use the URL instead of the Node as argument:

Worker charlie;
String nodeURL;
Node node;
//set the constructor values to be used
Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" };
try {
	//get the node form a virtual node
	node = someVirtualNode.getNode();
	//get the node URL
	nodeURL = new String(node.getNodeInformation().getURL());
	charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(),
				params, nodeURL);
} 
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

The usage is similar for PAActiveObject.turnActive(...) as in the examples above.

In order to deploy several active objects with PAActiveObject.newActiveInParallel(...) we can just pass an array of Node (see the example in Section 10.3.2, “ Using PAActiveObject.newActiveInParallel(...) ) or a VirtualNode .

Worker[] workers;
//the virtual nodes needs to be initialized before using it
try{
	Worker[] workers = (Worker[]) PAActiveObject.newActiveInParallel(Worker.class.getName(), 
						new Object [], someVirtualNode);
}						
catch (ActiveObjectCreationException aoExcep){
	System.err.println(aoExcep.getMessage());
}
catch (ClassNotFoundException classExcep) {
	System.err.println(classExcep.getMessage());
}

10.5.4. Using A Custom Activity

Customizing the activity of the active object is at the core of ProActive because it allows to specify fully the behavior of an active object. By default, an object turned into an active object serves its incoming requests in a FIFO manner. In order to specify another policy for serving the requests or to specify any other behaviors one can implement interfaces defining methods that will be automatically called by ProActive.

It is possible to specify what to do before the activity starts, what the activity is and what to do after it ends. The three steps are:

  • the initialization of the activity (done only once)

  • the activity itself

  • the end of the activity (done only once)

Three interfaces are used to define and implement each step:

  • InitActive

  • RunActive

  • EndActive

In case of a migration, an active object stops and restarts its activity automatically without invoking the initialization or ending phases. Only the activity itself is restarted.

Two ways are possible to define each of the three phases of an active object.

  • implementing one or more of the three interfaces directly in the class used to create the active object

  • passing an object implementing one or more of the three interfaces in parameter to the method newActive or turnActive (parameter active in those methods)

Note that the methods defined by those 3 interfaces are guaranteed to be called by the active thread of the active object.

10.5.4.1.  Algorithms deciding which activity to invoke

The algorithms for each running phase are the following ( activity is the object passed as a parameter to newActive or turnActive ):

Activity algorithm

Figure 10.2. Activity algorithm


10.5.4.2.  Implementing the interfaces directly in the class

Implementing the interfaces directly in the class used to create the active object is the easiest solution when you control the class that you make active. Depending on which phase in the life of the active object you want to customize, you implement the corresponding interface (one or more): InitActive , RunActive and EndActive . Here is an example that has a custom initialization and activity.

import org.objectweb.proactive.Service;
import org.objectweb.proactive.RunActive;
import org.objectweb.proactive.InitActive;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.api.PAActiveObject;

public class CustomActivity implements InitActive, RunActive {
	private String myName;
	public String getName() {
		return myName;
	}
	// -- implements InitActive
	public void initActivity(Body body) {
		myName = body.getName();
	}
	// -- implements RunActive for serving request in a LIFO fashion
	public void runActivity(Body body) {
		Service service = new Service(body);
		while (body.isActive()) {
			//LIFO order
			service.blockingServeYoungest();
		}
	}
	public static void main(String[] args) throws Exception {
		CustomActivity a = (CustomActivity) 
							PAActiveObject.newActive(CustomActivity.class.getName(),
							null);
		String call1 = a.getName();
	}
  }

Bellow is the skeleton code for a class that can run, suspend, restart and stop a simulation. It uses a implementation of RunActive to provide the necessary control.

import org.objectweb.proactive.ProActive;
import org.objectweb.proactive.Service;
import org.objectweb.proactive.RunActive;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.api.PAActiveObject;


public class Simulation implements RunActive {
	private boolean stoppedSimulation=false;
	private boolean startedSimulation=false;
	private boolean suspendedSimulation=false;
	private boolean notStarted = true;
	
	public void startSimulation(){
		// Simulation starts
		System.out.println("Simulation started...");
		notStarted = false;
		startedSimulation=true;
	}
	public void restartSimulation(){
		// Simulation is restarted
		System.out.println("Simulation restarted...");
		startedSimulation=true;
		suspendedSimulation=false;
	}
	public void suspendSimulation(){
		// Simulation is suspended
		System.out.println("Simulation suspended...");
		suspendedSimulation=true;
		startedSimulation = false;
	}
	public void stopSimulation(){
		// Simulation is stopped
		System.out.println("Simulation stopped...");
		stoppedSimulation=true;
	}
	public void runActivity(Body body) {
		Service service = new Service(body);
		while (body.isActive()) {
			// If the simulation is not yet started wait until startSimulation
			// method
			if(notStarted) service.blockingServeOldest("startSimulation");
			// If the simulation is started serve request with FIFO
			if(startedSimulation) service.blockingServeOldest();
			// If simulation is suspended wait until restartSimulation method
			if(suspendedSimulation) service.blockingServeOldest("restartSimulation");
			// If simulation is stopped, exit
			if(stoppedSimulation) 
				{
					body.terminate();
					ProActive.exitSuccess();
				}
		}
	}
}

Example 10.1.  Start, stop, suspend, restart a simulation algorithm in runActivity method


10.5.4.3.  Passing an object implementing the interfaces at creation time

Passing an object implementing the interfaces is the solution to use when we do not control the class that we make active or when you want to write a generic activity policy and reuse it with several active objects. Depending on which phase in the life of the active object we want to customize, we will implement the corresponding interface (one or more) from InitActive , RunActive and EndActive . Following is an example that has a custom activity.

First we need to implement the activity that it will be passed to the active object. We do so by implementing one or more interfaces.

import org.objectweb.proactive.Body;
import org.objectweb.proactive.RunActive;
import org.objectweb.proactive.Service;

public class LIFOActivity implements RunActive {

	// -- implements RunActive for serving request in a LIFO fashion
	public void runActivity(Body body) {
		Service service = new Service(body);
		while (body.isActive()) {
			System.out.println("Serving...");
			service.blockingServeYoungest();
		}
	}
}

The implemented interface can be used with PAActiveObject.newActive(..) or PAActiveObject.turnActive(...) .

import org.objectweb.proactive.ActiveObjectCreationException;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.node.NodeException;

public class Main  {
	public static void main(String[] args)  {
		Worker w = new Worker();
		try {
			w = (Worker) PAActiveObject.turnActive(w, 
						null,   new LIFOActivity(), null);
			w.doNothingGracefully();
			w.doNothingGracefully();
			w.doNothingGracefully();
			w.doNothingGracefully();

		} 
		catch (ActiveObjectCreationException aoExcep) {
			System.err.println(aoExcep.getMessage());
		}
		catch (NodeException nodeExcep) {
			System.err.println(nodeExcep.getMessage());
		}
	
	}
}

10.5.5. Using the factory pattern with Active Objects

Creating an active object using ProActive might be a little bit cumbersome and requires more lines of code that for creating a regular object. A nice solution to this problem is through the use of the factory pattern. This mainly applies to class based creation. It consists in adding a static method to class WorkerFactory that takes care of instantiating the active object and returns it. The code is:

import org.objectweb.proactive.ActiveObjectCreationException;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.node.Node;
import org.objectweb.proactive.core.node.NodeException;
import org.objectweb.proactive.core.util.wrapper.IntWrapper;

public class WorkerFactory extends Worker {
	public static Worker createActiveWorker (int age, String name, Node node) {
		Object[] params = new Object[] {new IntWrapper(age), name};
		try {
			return (Worker) PAActiveObject.newActive(PrimeManager.class.getName(),
					params, node);
		}
		catch (ActiveObjectCreationException e) {
			e.printStackTrace();
			return null;
		}
		catch (NodeException e) {
			e.printStackTrace();
			return null;
		}
	}
}

The static method in the factory class is then used to create active objects:

import org.objectweb.proactive.core.node.Node;

public class Main  {
	public static void main(String[] args)  {
		Worker w = new Worker();
		Node node = null; //deploy locally
		w = WorkerFactory.createActiveWorker(26, "Charlie", node);
		w.doNothingGracefully();
	}
}

It is up to the programmer to decide whether this method has to throw exceptions or not. We recommend that this method only throw exceptions that appear in the signature of the reified constructor (none here as the constructor of Worker that we call doesn't throw any exception). However, the non functional exceptions induced by the creation of the active object have to be dealt with somewhere in the code.

10.5.6.  Using MetaObjectFactory to customize the meta objects

There are many cases where you may want to customize the body used when creating an active object. For instance, one may want to add some debug messages or some timing behavior when sending or receiving requests. The body is a non changeable object that delegates most of its tasks to helper objects called MetaObjects. Standard MetaObjects are already used by default in ProActive but one can easily replace any of those MetaObjects by a custom one.

We have defined a MetaObjectFactory interface that is able to create factories for each of those MetaObjects. This interface is implemented by ProActiveMetaObjectFactory which provides all the default factories used in ProActive.

When creating an active object, it is possible to specify which MetaObjectFactory to use for that particular instance of active object being created.

First you have to write a new MetaObject factory that inherits from ProActiveMetaObjectFactory or directly implements the MetaObjectFactory interface. Inheriting from ProActiveMetaObjectFactory is a great time saver as you only redefine what you really need to as opposed to redefining everything when inheriting from MetaObjectFactory . Here is an example:

import java.io.Serializable;
import org.objectweb.proactive.core.body.MetaObjectFactory;
import org.objectweb.proactive.core.body.ProActiveMetaObjectFactory;
import org.objectweb.proactive.core.body.UniversalBody;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.body.request.RequestFactory;
import org.objectweb.proactive.core.body.request.RequestImpl;
import org.objectweb.proactive.core.mop.MethodCall;

public class CustomMetaObjectFactory extends ProActiveMetaObjectFactory {
	private static final MetaObjectFactory instance = new CustomMetaObjectFactory();
	//return a new factory instance 
	public static MetaObjectFactory newInstance() {
		return instance;
	}

	private CustomMetaObjectFactory() {
		super();
	}

	protected RequestFactory newRequestFactorySingleton() {
		System.out.println("Creating the  custom metaobject factory...");
		return new CustomRequestFactory();
	}

	protected class CustomRequestFactory extends RequestFactoryImpl implements
			Serializable {
		public Request newRequest(MethodCall methodCall,
				UniversalBody sourceBody, boolean isOneWay, long sequenceID) {
			System.out.println("Received a new request...");
			return new CustomRequest(methodCall, sourceBody, isOneWay,
					sequenceID);
		}

		protected class CustomRequest extends RequestImpl {
			public CustomRequest(MethodCall methodCall,
					UniversalBody sourceBody, boolean isOneWay, long sequenceID) {
				super(methodCall, sourceBody, isOneWay, sequenceID);
				System.out.println("I am a custom request handler");
			}
		}//end inner class RequestHandler
	}//end inner class CustomRequestFactory
}

The factory above simply redefines the RequestFactory in order to make the body use a new type of request. The method protected RequestFactory newRequestFactorySingleton() is one convenience method that ProActiveMetaObjectFactory provides to simplify the creation of factories as singleton.

package org.objectweb.proactive.core.body;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.Logger;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.core.UniqueID;
import org.objectweb.proactive.core.body.ft.protocols.FTManager;
import org.objectweb.proactive.core.body.ft.protocols.FTManagerFactory;
import org.objectweb.proactive.core.body.ft.protocols.cic.managers.FTManagerCIC;
import org.objectweb.proactive.core.body.ft.protocols.cic.managers.HalfFTManagerCIC;
import org.objectweb.proactive.core.body.ft.protocols.pmlrb.managers.FTManagerPMLRB;
import org.objectweb.proactive.core.body.ft.protocols.pmlrb.managers.HalfFTManagerPMLRB;
import org.objectweb.proactive.core.body.migration.MigrationManager;
import org.objectweb.proactive.core.body.migration.MigrationManagerFactory;
import org.objectweb.proactive.core.body.reply.ReplyReceiver;
import org.objectweb.proactive.core.body.reply.ReplyReceiverFactory;
import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.body.request.RequestFactory;
import org.objectweb.proactive.core.body.request.RequestQueueFactory;
import org.objectweb.proactive.core.body.request.RequestReceiver;
import org.objectweb.proactive.core.body.request.RequestReceiverFactory;
import org.objectweb.proactive.core.component.ComponentParameters;
import org.objectweb.proactive.core.component.identity.ProActiveComponent;
import org.objectweb.proactive.core.component.identity.ProActiveComponentFactory;
import org.objectweb.proactive.core.component.identity.ProActiveComponentImpl;
import org.objectweb.proactive.core.component.request.SynchronousComponentRequestReceiver;
import org.objectweb.proactive.core.group.spmd.ProActiveSPMDGroupManager;
import org.objectweb.proactive.core.group.spmd.ProActiveSPMDGroupManagerFactory;
import org.objectweb.proactive.core.mop.MethodCall;
import org.objectweb.proactive.core.security.ProActiveSecurityManager;
import org.objectweb.proactive.core.util.ThreadStore;
import org.objectweb.proactive.core.util.ThreadStoreFactory;
import org.objectweb.proactive.core.util.converter.MakeDeepCopy;
import org.objectweb.proactive.core.util.log.Loggers;
import org.objectweb.proactive.core.util.log.ProActiveLogger;


// TODO JAVADOC SHOULD BE REWRITTEN
/**
 * <p>
 * This class provides singleton instances of all default factories
 * creating MetaObjects used in the Body.
 * </p>
 *
 * <b>Since version 1.8, it is also possible to parameterized the factories on a per-object
 basis. </b>
 * In that case,  public ProActiveMetaObjectFactory(Hashtable parameters) is the constructor to
 use.
 * <p>
 * One can inherit from this class in order to provide custom implementation
 * of one or several factories. This class provide a default implementation that
 * makes the factories a singleton. One instance of each mata object factory is
 * created when this object is built and the same instance is returned each time
 * somebody ask for an instance.
 * </p>
 * <p>
 * In order to change one meta object factory following that singleton pattern,
 * only the protected method <code>newXXXSingleton</code> has to be overwritten.
 * The method <code>newXXXSingleton</code> is guarantee to be called only once at
 * construction time of this object.
 * </p>
 * <p>
 * In order to change one meta object factory that does not follow the singleton
 * pattern, the public method <code>newXXX</code> has to be overwritten in order
 * to return a new instance of the factory each time. The default implementation
 * of each <code>newXXX</code> method if to return the singleton instance of the
 * factory created from <code>newXXXSingleton</code> method call.
 * </p>
 * <p>
 * Each sub class of this class should be implemented as a singleton and provide
 * a static method <code>newInstance</code> for this purpose.
 * </p>
 *
 * @author The ProActive Team
 * @version 1.0,  2002/05
 * @since   ProActive 0.9.2
 */
public class ProActiveMetaObjectFactory implements MetaObjectFactory, java.io.Serializable,
 Cloneable {
    public static final String COMPONENT_PARAMETERS_KEY = "component-parameters";
    public static final String SYNCHRONOUS_COMPOSITE_COMPONENT_KEY = "synchronous-composite";
    protected static Logger logger = ProActiveLogger.getLogger(Loggers.MOP);

    //
    // -- PRIVATE MEMBERS -----------------------------------------------
    //
    // private static final MetaObjectFactory instance = new ProActiveMetaObjectFactory();
    private static MetaObjectFactory instance = new ProActiveMetaObjectFactory();
    public Map<String, Object> parameters = new HashMap<String, Object>();

    //
    // -- PROTECTED MEMBERS -----------------------------------------------
    //
    protected RequestFactory requestFactoryInstance;
    protected ReplyReceiverFactory replyReceiverFactoryInstance;
    protected RequestReceiverFactory requestReceiverFactoryInstance;
    protected RequestQueueFactory requestQueueFactoryInstance;
    protected MigrationManagerFactory migrationManagerFactoryInstance;

    //    protected RemoteBodyFactory remoteBodyFactoryInstance;
    protected ThreadStoreFactory threadStoreFactoryInstance;
    protected ProActiveSPMDGroupManagerFactory proActiveSPMDGroupManagerFactoryInstance;
    protected ProActiveComponentFactory componentFactoryInstance;
    protected ProActiveSecurityManager proActiveSecurityManager;
    protected FTManagerFactory ftmanagerFactoryInstance;
    protected Object timItReductor;

    //
    // -- CONSTRUCTORS -----------------------------------------------
    //
    protected ProActiveMetaObjectFactory() {
        this.requestFactoryInstance = newRequestFactorySingleton();
        this.replyReceiverFactoryInstance = newReplyReceiverFactorySingleton();
        this.requestReceiverFactoryInstance = newRequestReceiverFactorySingleton();
        this.requestQueueFactoryInstance = newRequestQueueFactorySingleton();
        this.migrationManagerFactoryInstance = newMigrationManagerFactorySingleton();
        //        this.remoteBodyFactoryInstance = newRemoteBodyFactorySingleton();
        this.threadStoreFactoryInstance = newThreadStoreFactorySingleton();
        this.proActiveSPMDGroupManagerFactoryInstance =
 newProActiveSPMDGroupManagerFactorySingleton();
        this.ftmanagerFactoryInstance = newFTManagerFactorySingleton();
    }

    /**
     * Constructor with parameters
     * It is used for per-active-object configurations of ProActive factories
     * @param parameters the parameters of the factories; these parameters can be of any type
     */
    public ProActiveMetaObjectFactory(Map<String, Object> parameters) {
        this.parameters = parameters;
        if (parameters.containsKey(COMPONENT_PARAMETERS_KEY)) {
            ComponentParameters initialComponentParameters = (ComponentParameters) parameters
                    .get(COMPONENT_PARAMETERS_KEY);
            this.componentFactoryInstance =
 newComponentFactorySingleton(initialComponentParameters);
            this.requestFactoryInstance = newRequestFactorySingleton();
            this.replyReceiverFactoryInstance = newReplyReceiverFactorySingleton();
            this.requestReceiverFactoryInstance = newRequestReceiverFactorySingleton();
            this.requestQueueFactoryInstance = newRequestQueueFactorySingleton();
            this.migrationManagerFactoryInstance = newMigrationManagerFactorySingleton();
            //            this.remoteBodyFactoryInstance = newRemoteBodyFactorySingleton();
            this.threadStoreFactoryInstance = newThreadStoreFactorySingleton();
            this.proActiveSPMDGroupManagerFactoryInstance =
 newProActiveSPMDGroupManagerFactorySingleton();
            this.ftmanagerFactoryInstance = newFTManagerFactorySingleton();
        }
    }

    //
    // -- PUBLICS METHODS -----------------------------------------------
    //
    public static MetaObjectFactory newInstance() {
        return instance;
    }

    public static void setNewInstance(MetaObjectFactory mo) {
        instance = mo;
    }

    /**
     * getter for the parameters of the factory (per-active-object config)
     * @return the parameters of the factory
     */
    public Map<String, Object> getParameters() {
        return this.parameters;
    }

    //
    // -- implements MetaObjectFactory -----------------------------------------------
    //
    public RequestFactory newRequestFactory() {
        return this.requestFactoryInstance;
    }

    public ReplyReceiverFactory newReplyReceiverFactory() {
        return this.replyReceiverFactoryInstance;
    }

    public RequestReceiverFactory newRequestReceiverFactory() {
        return this.requestReceiverFactoryInstance;
    }

    public RequestQueueFactory newRequestQueueFactory() {
        return this.requestQueueFactoryInstance;
    }

    public MigrationManagerFactory newMigrationManagerFactory() {
        return this.migrationManagerFactoryInstance;
    }

    //    public RemoteBodyFactory newRemoteBodyFactory() {
    //        return this.remoteBodyFactoryInstance;
    //    }
    public ThreadStoreFactory newThreadStoreFactory() {
        return this.threadStoreFactoryInstance;
    }

    public ProActiveSPMDGroupManagerFactory newProActiveSPMDGroupManagerFactory() {
        return this.proActiveSPMDGroupManagerFactoryInstance;
    }

    public ProActiveComponentFactory newComponentFactory() {
        return this.componentFactoryInstance;
    }

    public FTManagerFactory newFTManagerFactory() {
        return this.ftmanagerFactoryInstance;
    }

    //
    // -- PROTECTED METHODS -----------------------------------------------
    //
    protected RequestFactory newRequestFactorySingleton() {
        return new RequestFactoryImpl();
    }

    protected ReplyReceiverFactory newReplyReceiverFactorySingleton() {
        return new ReplyReceiverFactoryImpl();
    }

    protected RequestReceiverFactory newRequestReceiverFactorySingleton() {
        return new RequestReceiverFactoryImpl();
    }

    protected RequestQueueFactory newRequestQueueFactorySingleton() {
        return new RequestQueueFactoryImpl();
    }

    protected MigrationManagerFactory newMigrationManagerFactorySingleton() {
        return new MigrationManagerFactoryImpl();
    }

    //    protected RemoteBodyFactory newRemoteBodyFactorySingleton() {
    //        return new RemoteBodyFactoryImpl();
    //    }
    protected ThreadStoreFactory newThreadStoreFactorySingleton() {
        return new ThreadStoreFactoryImpl();
    }

    protected ProActiveSPMDGroupManagerFactory newProActiveSPMDGroupManagerFactorySingleton() {
        return new ProActiveSPMDGroupManagerFactoryImpl();
    }

    protected ProActiveComponentFactory newComponentFactorySingleton(
            ComponentParameters initialComponentParameters) {
        return new ProActiveComponentFactoryImpl(initialComponentParameters);
    }

    protected FTManagerFactory newFTManagerFactorySingleton() {
        return new FTManagerFactoryImpl();
    }

    //  //
    //  // -- INNER CLASSES -----------------------------------------------
    //  //
    protected static class RequestFactoryImpl implements RequestFactory, java.io.Serializable {
        public Request newRequest(MethodCall methodCall, UniversalBody sourceBody, boolean
 isOneWay,
                long sequenceID) {
            //########### exemple de code pour les nouvelles factories
            //			if(System.getProperty("migration.stategy").equals("locationserver")){
            //				  return new RequestWithLocationServer(methodCall, sourceBody,
            //                isOneWay, sequenceID, LocationServerFactory.getLocationServer());
            //			}else{
            return new org.objectweb.proactive.core.body.request.RequestImpl(methodCall,
 sourceBody,
                isOneWay, sequenceID);
            //}
        }
    }

    // end inner class RequestFactoryImpl
    protected static class ReplyReceiverFactoryImpl implements ReplyReceiverFactory,
 java.io.Serializable {
        public ReplyReceiver newReplyReceiver() {
            return new org.objectweb.proactive.core.body.reply.ReplyReceiverImpl();
        }
    }

    // end inner class ReplyReceiverFactoryImpl
    protected class RequestReceiverFactoryImpl implements RequestReceiverFactory,
 java.io.Serializable {
        public RequestReceiver newRequestReceiver() {
            if (ProActiveMetaObjectFactory.this
.parameters.containsKey(SYNCHRONOUS_COMPOSITE_COMPONENT_KEY) &&
                ((Boolean) ProActiveMetaObjectFactory.this.parameters
                       
 .get(ProActiveMetaObjectFactory.SYNCHRONOUS_COMPOSITE_COMPONENT_KEY)).booleanValue()) {
                return new SynchronousComponentRequestReceiver();
            }
            return new org.objectweb.proactive.core.body.request.RequestReceiverImpl();
        }
    }

    // end inner class RequestReceiverFactoryImpl
    protected class RequestQueueFactoryImpl implements RequestQueueFactory, java.io.Serializable {
        public BlockingRequestQueue newRequestQueue(UniqueID ownerID) {
            if ("true".equals(ProActiveMetaObjectFactory.this.parameters
                    .get(SYNCHRONOUS_COMPOSITE_COMPONENT_KEY))) {
                return null;
            }

            //if (componentFactoryInstance != null) {
            // COMPONENTS
            // we need a request queue for components
            //return new ComponentRequestQueueImpl(ownerID);
            //} else {
            return new org.objectweb.proactive.core.body.request.BlockingRequestQueueImpl(ownerID);
            //}
        }
    }

    // end inner class RequestQueueFactoryImpl
    protected static class MigrationManagerFactoryImpl implements MigrationManagerFactory,
            java.io.Serializable {
        public MigrationManager newMigrationManager() {
            //########### example de code pour les nouvelles factories
            //			if(System.getProperty("migration.stategy").equals("locationserver")){
            //				return new
 MigrationManagerWithLocationServer(LocationServerFactory.getLocationServer());
            //			}else{
            return new org.objectweb.proactive.core.body.migration.MigrationManagerImpl();
            //}
        }
    }

    // end inner class MigrationManagerFactoryImpl
    //    protected static class RemoteBodyFactoryImpl implements RemoteBodyFactory,
    //        java.io.Serializable {
    //        public UniversalBody newRemoteBody(UniversalBody body) {
    //            try {
    //                if (Constants.IBIS_PROTOCOL_IDENTIFIER.equals(
    //                            ProActiveConfiguration.getInstance()
    //                                                     
 .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) {
    //                    if (logger.isDebugEnabled()) {
    //                        logger.debug(
    //                            "Using ibis factory for creating remote body");
    //                    }
    //                    return new org.objectweb.proactive.core.body.ibis.IbisBodyAdapter(body);
    //                } else if (Constants.XMLHTTP_PROTOCOL_IDENTIFIER.equals(
    //                            ProActiveConfiguration.getInstance()
    //                                                     
 .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) {
    //                    if (logger.isDebugEnabled()) {
    //                        logger.debug(
    //                            "Using http factory for creating remote body");
    //                    }
    //
    //                    return new org.objectweb.proactive.core.body.http.HttpBodyAdapter(body);
    //                } else if (Constants.RMISSH_PROTOCOL_IDENTIFIER.equals(
    //                            ProActiveConfiguration.getInstance()
    //                                                     
 .getProperty(Constants.PROPERTY_PA_COMMUNICATION_PROTOCOL))) {
    //                    if (logger.isDebugEnabled()) {
    //                        logger.debug(
    //                            "Using rmissh factory for creating remote body");
    //                    }
    //                    return new org.objectweb.proactive.core.body.rmi.SshRmiBodyAdapter(body);
    //                } else {
    //                    if (logger.isDebugEnabled()) {
    //                        logger.debug(
    //                            "Using rmi factory for creating remote body");
    //                    }
    //                    return new org.objectweb.proactive.core.body.rmi.RmiBodyAdapter(body);
    //                }
    //            } catch (ProActiveException e) {
    //                throw new ProActiveRuntimeException("Cannot create Remote body adapter ",
    //                    e);
    //            }
    //        }
    //    }

    // end inner class RemoteBodyFactoryImpl
    protected static class ThreadStoreFactoryImpl implements ThreadStoreFactory,
 java.io.Serializable {
        public ThreadStore newThreadStore() {
            return new org.objectweb.proactive.core.util.ThreadStoreImpl();
        }
    }

    // end inner class ThreadStoreFactoryImpl
    protected static class ProActiveSPMDGroupManagerFactoryImpl implements
 ProActiveSPMDGroupManagerFactory,
            java.io.Serializable {
        public ProActiveSPMDGroupManager newProActiveSPMDGroupManager() {
            return new ProActiveSPMDGroupManager();
        }
    }

    // end inner class ProActiveGroupManagerFactoryImpl
    protected class ProActiveComponentFactoryImpl implements ProActiveComponentFactory,
 java.io.Serializable {
        // COMPONENTS
        private ComponentParameters componentParameters;

        public ProActiveComponentFactoryImpl(ComponentParameters initialComponentParameters) {
            this.componentParameters = initialComponentParameters;
        }

        public ProActiveComponent newProActiveComponent(Body myBody) {
            return new ProActiveComponentImpl(this.componentParameters, myBody);
        }
    }

    // FAULT-TOLERANCE
    protected class FTManagerFactoryImpl implements FTManagerFactory, Serializable {
        public FTManager newFTManager(int protocolSelector) {
            switch (protocolSelector) {
                case FTManagerFactory.PROTO_CIC_ID:
                    return new FTManagerCIC();
                case FTManagerFactory.PROTO_PML_ID:
                    return new FTManagerPMLRB();
                default:
                    logger.error("Error while creating fault-tolerance manager : " +
                        "no protocol is associated to selector value " + protocolSelector);
                    return null;
            }
        }

        public FTManager newHalfFTManager(int protocolSelector) {
            switch (protocolSelector) {
                case FTManagerFactory.PROTO_CIC_ID:
                    return new HalfFTManagerCIC();
                case FTManagerFactory.PROTO_PML_ID:
                    return new HalfFTManagerPMLRB();
                default:
                    logger.error("Error while creating fault-tolerance manager : " +
                        "no protocol is associated to selector value " + protocolSelector);
                    return null;
            }
        }
    }

    // SECURITY
    public void setProActiveSecurityManager(ProActiveSecurityManager psm) {
        this.proActiveSecurityManager = psm;
    }

    public ProActiveSecurityManager getProActiveSecurityManager() {
        return this.proActiveSecurityManager;
    }

    @Override
    public Object clone() throws CloneNotSupportedException {
        ProActiveMetaObjectFactory clone = null;

        try {
            return MakeDeepCopy.WithObjectStream.makeDeepCopy(this);
        } catch (IOException e) {
            //TODO replace by CloneNotSupportedException(Throwable e) java 1.6
            throw (CloneNotSupportedException) new
 CloneNotSupportedException(e.getMessage()).initCause(e);
        } catch (ClassNotFoundException e) {
            throw (CloneNotSupportedException) new
 CloneNotSupportedException(e.getMessage()).initCause(e);
        }
    }

    public void setTimItReductor(Object timItReductor) {
        this.timItReductor = timItReductor;
    }

    public Object getTimItReductor() {
        return this.timItReductor;
    }
}

More explanations can be found in the javadoc of that class . The use of that factory is fairly simple. All you have to do is to pass an instance of the factory when creating a new active object. If we take the same example as before we have:

Object[] params = new Object[] {new IntWrapper(26), "Charlie"};
Worker w;
try {
	w = (Worker) PAActiveObject.newActive(Worker.class.getName(),
			null,null,null,null,CustomMetaObjectFactory.newInstance());
} 
catch (ActiveObjectCreationException e) {
	e.printStackTrace();
} 
catch (NodeException e) {
	e.printStackTrace();
}

In the case of a turnActive we would have:

Worker w = new Worker(26, "Charlie");

try {
	charlie = (Worker) ProActive.turnActive(Worker.class.getName(), null, null,
 CustomMetaObjectFactory.newInstance());
}
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

10.6.  Elements of an active object and futures

In this section, we'll have a very close look at what happens when an active object is created. This section aims at providing a better understanding of how the library works and where the restrictions of Proactive come from.

Consider that some code in an instance of class A creates an active object of class Worker using a piece of code like this:

Worker charlie;
//set the constructor values to be used
Object[] params = new Object[] { new IntegerWrapper (26), "Charlie" };
try {
	charlie = (Worker)PAActiveObject.newActive(Worker.class.getName(),
				params);
} 
catch (ActiveObjectCreationException aoExcep) {
	// creation of ActiveObject failed
	System.err.println(aoExcep.getMessage());
}
catch (NodeException nodeExcep){
	System.err.println(nodeExcep.getMessage());
}

If the creation of the active instance of B is successful, the graph of objects is as described in figure below (with arrows denoting references).

The components of an active object and of a referencing object

Figure 10.3. The components of an active object and of a referencing object


The active instance of Worker is composed of 2 objects:

  • a body ( Body )

  • an instance of Worker

Besides the active object composed of a body and an object there is also a proxy and a stub for each active or passive object referencing the active object.

10.6.1. Role of the stub

The role of the class Stub_Worker is to reify all method calls that can be performed through a reference of type Worker. Reifying a call simply means constructing an object that represents the call(in our case, all reified calls are instance of class org.objectweb.proactive.core.mop.MethodCall), so it can be manipulated as any other object. The reified call is then processed by the other components of the active object in order to achieve the behavior we expect from an active object.

Using a standard object for representing elements of the language that are not normally objects (such as method calls, constructor calls, references, types,...) is at the core of metaobject programming. The metaobject protocol (MOP) ProActive is built on is described in Chapter 48, MOP: Metaobject Protocol but it is not a prerequisite for understanding and using ProActive.

As one of our objectives is to provide transparent active objects, references to active objects of class Worker need to be of the same type as references to passive instances of Worker (this feature is called polymorphism between passive and active instances of the same class). This is why Stub_Worker is a subclass of class Worker by construction, therefore allowing instances of class Stub_Worker to be assigned to variables of type Worker.

Class Stub_Worker redefines each of the methods inherited from its superclasses. The code of each method of class Stub_Worker builds an instance of class org.objectweb.proactive.core.mop.MethodCall in order to represent the call to this method. This object is then passed to the BodyProxy , which returns an object that is returned as the result of the method call. From the caller's point of view, everything looks like if the call had been performed on an instance of Worker .

Now that we know how stubs work, we can understand some of the limitations of ProActive:

  • Obviously, Stub_Worker cannot redefine final methods inherited from class Worker. If the methods are final, calls to these methods are not reified but are executed on the stub, which may lead to unpredictable behavior.

    As there are 6 final methods in the base class Object, there is the question of how ProActive deals with these methods. In fact, 5 out of this 6 methods deal with thread synchronization ( notify() , notifyAll() and the 3 versions of wait() ). Those methods should not be used since an active object provides thread synchronization. Using the standard thread synchronization mechanism and ProActive thread synchronization mechanism at the same time might conflict and result in an very hard to debug behaviour.

    The last final method in the class Object is getClass() . When invoked on an active object, getClass() is not reified and therefore performed on the stub object, which returns an object of class Class that represents the class of the stub ( Stub_Worker in our example) and not the class of the active object itself ( Worker in our example). However, this method is seldom used in standard applications and it doesn't prevent the operator instanceof from working thanks to its polymorphic behavior. Therefore the expression (foo instanceof Worker) has the same value whether Worker is active or not.

  • Getting or setting instance variables directly (not through a get or a set method) must be avoided for active objects because it results in getting or setting the value on the stub object and not on the instance of the class Worker. This problem is usually worked around by using get/set methods for setting or reading attributes. This rule of strict encapsulation can also be found in JavaBeans or in most distributed object systems like RMI or CORBA.

10.6.2. Role of the proxy

The role of the proxy is to handle asynchronism in calls to active object. More specifically, it creates future objects if possible and needed, forwards calls to bodies, and returns future objects to the stubs. As this class operates on MethodCall objects, it is absolutely generic and does not depend at all on the type of the stub that feeds calls in through its reify method.

10.6.3. Role of the body

The body is responsible for storing calls (actually, Request objects) in a queue of pending requests and processing these request according to a given synchronization policy. The default behaviour for the synchronization policy is FIFO. The Body has its own thread which chooses requests from the queue of request and executes the associated call.

10.6.4. Role of the instance of class Worker

There is also a standard instance of class Worker . It may contain some synchronized information in its live method, if any. As the body executes calls one by one, there cannot be any concurrent execution of two portions of code for this object by two different threads. This enables the use of pre and post-conditions and class invariants. As a consequence, the use of the keyword synchronized in class Worker should not be necessary. Any synchronization scheme that can be expressed through monitors and synchronized statements can be expressed using ProActive's high-level synchronization mechanism in a much more natural and user-friendly way.

10.7. Asynchronous calls and futures

10.7.1. Creation of a Future Object

Whenever possible a method call on an active object is reified as an asynchronous request. If not possible the call is synchronous and blocks until the reply is received. In case the request is asynchronous, it immediately returns a future object.

This object acts as a placeholder for the result of the not-yet-performed method invocation. As a consequence, the calling thread can go on with executing its code, as long as it doesn't need to invoke methods on the returned object, in which case the calling thread is automatically blocked if the result of the method invocation is not yet available. Below are shown the different cases that can lead to an asynchronous call. Note that this table does not apply when using the tryWithCatch anotators to deal with asynchronous exceptions. See Chapter 13, Exception Handling .

<title> Future creation, and asynchronous calls depending on return type </title>

Method Signature

Creation of a future

Asynchronous

Return type

Can throw checked exception

Void

No

No

Yes

Void

Yes

No

No

Non Reifiable Object

No

No

No

Non Reifiable Object

Yes

No

No

Reifiable Object

No

Yes

Yes

Reifiable Object

Yes

No

No

As we can see, the creation of a future depends not only on the caller type, but also on the return object type. Creating a future is only possible if the object is reifiable. Although a future has a similar structure to an active object, a future object is not active. It only has a Stub and a Proxy as shown in the figure below:

A future object

Figure 10.4. A future object


During its lifetime, an active object can create many future objects. There are all automatically kept in a FuturePool.

Each time a future is created, it is inserted in the future pool of the corresponding active object. When the result becomes available, the future object is removed from the pool. There are several methods in org.objectweb.proactive.api.ProFuture which provide control over the way futures are returned.

10.7.2. Methods affected by futures

Any call to a future object is reified in order to be blocked if the future is not yet available and later executed on the result object. However, two methods don't follow this scheme: equals and hashCode. They are often called by other methods from the Java library, like HashTable.add() and so are most of the time out of control from the user. This can lead very easily to deadlocks if they are called on a not yet available object.

There are some drawbacks with this technique, the main one being the impossibility to have a user override the default HashTable and equals() methods.

  • hashCode() Instead of returning the hashcode of the object, it returns the hashcode of its proxy. Since there is only one proxy per future object, there is a unique equivalence between them.

  • equals() The default implementation of equals() in the Object class is to compare the references of two objects. In ProActive it is redefined to compare the hashcode of two proxies. As a consequence it is only possible to compare two future object, and not a future object with a normal object.

  • toString() The toString() method is most of the time called with System.out.println() to turn an object into a printable string. In the current implementation, a call to this method will block on a future object like any other call, thus, one has to be careful when using it. As an example, trying to print a future object for debugging purpose will most of the time lead to a deadlock. Instead of displaying the corresponding string of a future object, you might consider displaying its hashCode.

10.7.3. Asynchronous calls in details

10.7.3.1. The setup

First, let's introduce the example we'll use throughout this section. Let us say that some piece of code in an instance of class A calls method foo on an active instance of class Worker. This call is asynchronous and returns a future object of class Value. Then, possibly after having executed some other code, the same thread that issued the call calls method bar on the future object returned by the call to foo.

The code for the following example is:

Value v;
v = Worker.foo();
v.bar();

10.7.3.2.  What would have happened in a sequential world

In a sequential, single-threaded version of the same application, the thread would have executed the code of the caller object A up to the call of foo, then the code of foo() in class Worker, then the code of the calling method in object A up to the call to bar , then the code of bar in class V , and finally the code of the calling method in class A until its end. The sequence diagram below summarizes this execution. You can notice how the single thread successively executes code of different methods in different classes.

Single-threaded version of the program

Figure 10.5.  Single-threaded version of the program


10.7.3.3. Visualizing the graph of active and passive objects

Now we will show how an active object handles the call for the code above. Instead of blocking on the execution of foo() it will create a future and continue execution. Let us first get an idea of what the graph of objects at execution (the objects with their references to each other) looks like at three different moments of the execution:

  • Before calling foo we have exactly the same setup as after the creation of the active instance of worker: an instance of class A and an active instance of class Worker . As with all active objects, the instance of the class Worker is composed a Body with a request queue and the actual instance of Worker.

    The components of an active object and of a referencing object

    Figure 10.6. The components of an active object and of a referencing object


  • After the asynchronous call to foo has returned, A now holds a reference onto a future object representing the not yet available result of the call. It is actually composed of a Stub_Value and a FutureProxy as shown on the figure below.

    The components of a future object before the result is set

    Figure 10.7.  The components of a future object before the result is set


  • Right after having executed foo on the instance of Worker , the thread of the Body sets the result in the future, which results in the FutureProxy having a reference onto a Value (see figure below).

    All components of a future object

    Figure 10.8.  All components of a future object


10.7.3.4. Sequence Diagram

The sequence of calls is more complex for the active object case. We have now two threads: the thread that belongs to object's A subsystem, and the thread that belongs to the Worker's subsystem. We will call these the first and second threads.

The first thread invokes foo on an instance of Stub_Worker , which builds a MethodCall object and passes it to the BodyProxy as a parameter of the call to be reified . The proxy then checks the return type of the call (in this case Value ) and generates a future object of type Value for representing the result of the method invocation. The future object is actually composed of a Stub_V and a FutureProxy. A reference onto this future object is set in the MethodCall object. This reference will be used to set the value after the call has executed. When the MethodCall object is ready, it is passed as a parameter for a Request to the Body of the Worker active object. The Body simply appends this request to the queue of pending requests and returns immediately. The call to foo issued by A returns a future object of type Stub_Value, that is a subclass of Value.

At some point, possibly after having served some other requests, the second thread (the active object's thread) picks up the previous request issued by the first thread. It then executes the embedded call by calling foo on the instance of Worker with the actual parameters stored in the MethodCall object. As specified in its signature, this call returns an object of type Value. The second thread is responsible for setting this object in the future object (which is the reason why MethodCall objects hold a reference on the future object created by the FutureProxy). The execution of the call is now over, and the second thread can select and serve another request from the queue.

In the meantime, the first thread has continued executing the code of the calling method in class A. At some point, it calls bar on the object of type Stub_Value that was returned by the call to foo. This call is reified thanks to the Stub_Value and processed by the FutureProxy. If the object the future represents is available (the second thread has already set it in the future object), the call is executed and returns a value to the calling code in A. The sequence diagram below presents the process graphically.

Call flow when a future is set before being used

Figure 10.9. Call flow when a future is set before being used


If the value in the future it is not yet available, the first thread is suspended in FutureProxy until the second thread sets the result in the future object (see figure below). This is called wait-by-necessity.

Call flow when the future is not set before being used (wait-by-necessity)

Figure 10.10. Call flow when the future is not set before being used (wait-by-necessity)


10.7.4. Good ProActive programming practices

There are few things to remember with active objects, futures, and asynchronous method calls, in order to avoid annoying debugging sessions:

  • A constructor with no-args needs to be used either for the Active Objects creation(if not present, an exception might be thrown) or Future creation for a method call (if not present, the method call is synchronous). Avoid placing initialization code in this constructor, as it may lead to unexpected behavior because this constructor is called for the stub creation.

  • Make your classes implement Serializable interface since ProActive deals with objects that will be sent across networks.

  • Use wrappers instead of primitive types or final classes for methods result types. Otherwise you will loose the asynchronism capabilities. For instance if one of your object has a method

    int giveSolution(parameter)
    

    calling this method with ProActive is synchronous. To have asynchronous calls use

    IntWrapper giveSolution(parameter)
    

    All wrappers are in the package: org.objectweb.proactive.core.util.wrapper

    ProActive provides several primitive type wrappers, with 2 versions of each, one mutable, and another immutable.

    To make the call asynchronous only the methods return types have to use wrappers. The parameters can use the regular Java types.

  • Avoid to return null in Active Object methods: on the caller side the test

    if(result_from_method == null)
    

    has no sense. Result_from_method is a couple Stub - FutureProxy as explained above, so even if the method returns null, result_from_method cannot be null:

    public class MyObject {
    	public MyObject() {} 	//empty constructor with no-args
    	public Object getObject{ 
    		if(.....) {
    	 		return new Object(); 
    		}
    		else {
    			return null; //to avoid in ProActive 
    		}
    	}
    }
    

    On the caller side:

    MyObject o = new MyObject();
    Object result_from_method = o.getObject();
    if(result_from_method == null){
    	...... 
    }
    

    This test is never true because result_from_method is Stub-->Proxy-->null if the future is not yet available or the method returns null or Stub-->Proxy-->Object if the future is available, but result_from_method is never null.

10.7.5. Lightweight failure detection

Waiting for the update of a future in a ProActive application could freeze the application if the node responsible for updating the future experiences a failure. To prevent this, ProActive has a comprehensive generic fault tolerance mechanism. However, it can be too expensive to checkpoint all the active objects, when fault detection is only needed for certain objects.

By default, ProActive continuously pings active objects expected to update awaited futures. When such a ping fails, the associated awaited future is updated with a runtime exception. Accessing the future will throw an exception.

public static void function() {
	F future = ao.asyncCall();
	String str;
	try {
		str = future.toString();
	} 
	catch (FutureMonitoringPingFailureException fmpfe) {
		System.out.println("The active object 'ao' had a failure" + 
				fmpfe.printStackTrace());
	}
}

The ping is started when the future is being awaited, but it is also possible to start it beforehand using the ProFuture.monitorFuture(java.lang.Object future). To find out more about the fault tolerance mechanism read Chapter 33, Fault-Tolerance

10.8. Automatic Continuation in ProActive

Automatic continuation is the propagation of a future outside the activity that has sent the corresponding request.

Automatic continuations allows using non-updated futures in the normal program flow without blocking to wait for updating the result contained in the future. When the result is available on the object that originated the creation of the future, this object updates the result in all the objects to which it passed the future. If the objects passed the future further they will updated the value in the corresponding objects.

10.8.1. Sending Futures

An automatic continuation can occur when sending a request (parameter of the request is a future or contains a future) or when sending a reply (the result is a future or contains a future). Outgoing futures are registered in the FuturePool of the Active Object sending the future. Registration for the couple Future - destination Body as an automatic continuation occurs when the future is serialized. Every request or reply are serialized before being sent, and the future is part of the request or the reply. A thread Thread sending the message keeps in a static table (FuturePool.bodyDestination) a reference to the destination body. Hence when a future Future is serialized by the same thread, looks up in the static table the destination Destination registered for the thread Thread. If it finds a destination for the future, the future notifies the FuturePool that it is going to leave, which in turn registers the couple Future - Destination Body as an automatic continuation

When the value is available for the future, it is is propagated to all objects that received the future Future. This type of update is realized by a thread located in the FuturePool.

10.8.2. Receiving Futures

When a message containing a future is received by an active object the active object registers the future in the FuturePool to be able to update it when the value will be available. After the future is deserialized, it is registered in a static table FuturePool.incomingFutures.

10.8.3. Illustration of an Automatic Continuation

To illustrate how automatic continuation takes place we will use three classes: A, B, and Worker. The A and B classes are used to instantiate passive objects and the Worker class is used to instantiate an active object. The Worker class has a foo() method and the B class has a bar(Value v) method.

public class A {
 	.... 
	public static void main(String[] args){
		......
		B b = new B(); 
		Worker worker = (Worker) PAActiveObject.newActive(Worker.class.getName(), //class name
						null);//constructor arguments 
		Value v1 = worker.foo(); //v1 is a future
		Value v2 = b.bar(v1); //v1 is passed as parameter
		........
	}	//end of main
} //end of class A

where

public class Worker {
	... 
	public Result foo(){ 
		...
	}
	...
} //end of class Worker

and

public class B {
	...
	public Result bar (Value v) {
		...
	}
	...
} //end of class B
			

We will first call foo() on the worker. The call goes through the worker stub and the body proxy and it is placed in the queue of the body. We are using a passive object as an example, however the process is similar if the caller was an active object or if the the objects were all on the same machine.In those cases the call will also go through the stub and proxy as described in the sequence diagrams above.

After sending the request the thread of object A continues immediately (asynchronous call) and v1 points to a future that has not been updated yet.

Next, object A calls bar(v1) on B.

The call on B leads to the creation of another future to which v2 will point to. Also because v1 has not been updated yet there will be a another future created on object B.

At some point the active object finishes executing foo and the value of the future for v1 will be updated by the Body.

After the value on A is updated, A updates the value in the future on B.

Once B has the value for the future it will execute bar() with the recieved value and update the value v2 in A.

The final state is with all the future values updated.

As seen above, as long there is a future needed for a computation, it will be passed to the object doing the computation as long as it is not need it immediately. If the value is needed right away, the caller thread will block until the future is updated, just like in single threaded call.

10.9. Data Exchange between Active Objects

Exchange operation offers a way to exchange potentially a very large amount of data between two active objects, making an implicit synchronization. The Exchange operator takes advantage of the serialization / de-serialization steps to exchange data in an efficient way.

This operator is *not* compatible with the ProActive Fault-Tolerance mechanism.

To use it, you have to specify different variables :

  • a tag which is an unique identifier for the exchange operation in case of multiple exchanges performed in parallel,

  • a reference to the active object you want to exchange data with,

  • a reference to the data source you want to send to the other Active Object,

  • a reference to the location where the received data will be put,

  • the amount of data you want to exchange (must be symmetric).

To illustrate the usage of the Exchange operator, here is an example with two active objects, AO1 and AO2 which are exchanging the half of a local array to get a full one.

				public class AO1 {
  private int[] myArray;
  (...)
  public void foo() {
    Exchanger exchanger = PAActiveObject.getExchanger();
    myArray = new int[] {1,2,0,0};
    // Before : myArray = [1, 2, 0, 0]
    exchanger.exchange("myExch", ao2, myArray, 0, myArray, 2, 2);
    // After : myArray = [1, 2, 3, 4]
  }
}
 
public class AO2 {
  private int[] myArray;
  (...)
  public void bar() {
    Exchanger exchanger = PAActiveObject.getExchanger();
    myArray = new int[] {0,0,3,4};
    // Before : myArray = [0, 0, 3, 4]
    exchanger.exchange("myExch", ao1, myArray, 2, myArray, 0, 2);
    // After : myArray = [1, 2, 3, 4]
  }
}

			

Standard exchangeable data are array of integers, doubles and bytes. Also, you can exchange your own complex double structure by implementing the ExchangeablDouble interface. Here is an example of an implementation:

				public class ComplexDoubleArray implements ExchangeableDouble {
    private double[] array;
    private int getPos, putPos;

    public ComplexDoubleArray(int size, boolean odd) {
        this.array = new double[size];
        this.getPos = odd ? 1 : 0;
        this.putPos = odd ? 0 : 1;
        for (int i = getPos; i < array.length; i += 2) {
            array[i] = Math.random();
        }
    }

    public double get() {
        double res = array[getPos];
        getPos += 2;
        return res;
    }

    public boolean hasNextGet() {
        return getPos < array.length;
    }

    public boolean hasNextPut() {
        return putPos < array.length;
    }

    public void put(double value) {
        array[putPos] = value;
        putPos += 2;
    }
}