In this chapter we present a step by step introduction to programming with ProActive.
The program that we will develop is a remote computation and monitoring agent. As we progress through the example we will increase the complexity of the agent by using different features of ProActive.
In part one, we will code a 'client-server' application, the server being an active object that acts as a monitoring agent.
In part two, we will see how we can control the activity of an active object.
In part four the exercise will explain message synchronization and synchronous and asynchronous method calls using a monitoring agent that makes chained calls.
In part five we will add mobility to this active object and have the agent migrate to another computer and report on the status of the JVM on the machine.
In part six we will show how to use ProActive groups to monitor several machines at the same time.
In part seven will show how to expose an active object as a web service.
The next parts are dedicate to showing how we can transform a sequential application that finds a number of primes into a distributed application. In this part we will also use the ProActive Master-Worker API.
All the exercises in this chapter are available as an Eclipse workspace. The Eclipse environment is set up to run each exercises as soon as the needed code is inserted. The tasks needed for the examples to run properly are annotated with //TODO tags and show up under the Eclipse tasks list.
To further learn what is needed for the running ProActive applications read Chapter 2, ProActive Installation.
Welcome to your first ProActive program! This is one of the simplest applications that can be written using ProActive. We will create an active object locally and get the state of the JVM through it. Our application is composed of three classes with a client-server structure.
The example illustrates the creation of an active object from the
CMAgent
class that will be
used by the Main
class to retrieve the JVM state
for a machine and print it to the standard output.
The Main
class corresponds to the client, and is
only a container for the main()
method, while
the CMAgent
class corresponds to the server and
its instance is an active object which provides a getCurrentState()
method as a remote service.
To safely use the CMAgent
class as an active object
we have to meet three requirements.
no direct access to field variables - If
public
variables are used then the stub class generated from
the original class may become decoupled from the original class. If a change is
affected on the public field variable in the stub instance, the change will not be
propagated to the the class instance from which the stub was generated. The safe
way to change variables is to set them as private
and access
them through public get/set
methods.
provide a no-argument and preferably an empty constructor - A no-argument constructor is necessary to create the stub class needed for communication. A stub cannot be created if there are only constructors with arguments since the stub is only meant to abstract the communication from the active objects. If there is no constructor defined, the Java compiler will automatically create a no-argument constructor that initializes all instance variables to the default value. However, if there is an already defined constructor with arguments then no default no-argument constructor will be created by the compiler. In this case the definition of a no-argument constructor is mandatory for stub creation. The safest way is to always define a no-argument constructor. Also, the constructor should be empty so that on stub creation no initialization is done on the stub.
provide remote functionalities as public methods with return types that can be subclassed and are serializable - Since
the stub is created through inheritance, the only methods it can use for communication are
the inherited public methods from the superclass. The return types of the methods have to be
subclassable and therefore not final. ProActive provides several wrappers for Java types
that are final.The example uses the StringWrapper
class in order to provide a wrapper for the
final String
class. Since ProActive uses a proxy mechanism and
the String
class is final, it’s not
possible to subclass a String and to perform asynchronous calls.
ProActive provides several wrappers for final classes: StringWrapper, BooleanWrapper,
IntegerWrapper, DoubleWrapper
and FloatWrapper
.
These have to be used in
replacement of String, Boolean, Integer, Double
and Float
in classes which will be active objects. If you don’t use wrappers, method calls will be synchronous.
In our case the return type is State
which is not final.
ProActive
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
Other
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
Figure 6.1. Client-Server architecture - the Main
class acts as
a client and uses the CMAgent
class as a server
For our Monitoring agent we use three classes. The first two classes are regular Java objects.
The first class is State
which we use to get some information on the JVM the
object is located on. This object will be use as return value for the getCurrentState()
method in the CMAgent
class.
package org.objectweb.proactive.examples.userguide.cmagent.simple; import java.io.Serializable; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Date; //TODO 4. remove Serializable and run the agent public class State implements Serializable { private long commitedMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getCommitted(); private long initMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getInit(); private long maxMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); private long usedMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); private String osArch = ManagementFactory.getOperatingSystemMXBean().getArch(); private String osName = ManagementFactory.getOperatingSystemMXBean().getName(); private String osVersion = ManagementFactory.getOperatingSystemMXBean().getVersion(); private int osProcs = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); private int liveThreads = ManagementFactory.getThreadMXBean().getThreadCount(); private long startedThreads = ManagementFactory.getThreadMXBean().getTotalStartedThreadCount(); private int peakThreads = ManagementFactory.getThreadMXBean().getPeakThreadCount(); private int deamonThreads = ManagementFactory.getThreadMXBean().getDaemonThreadCount(); private Date timePoint = new Date(); private String hostname; { try { hostname = InetAddress.getLocalHost().toString(); } catch (UnknownHostException e) { e.printStackTrace(); } } public State() { } public String toString() { return new String("\n======= [" + "State at " + timePoint + " on " + hostname + "] =======" + "\nCommited memory: " + commitedMemory + " bytes\nInitial memory requested: " + initMemory + " bytes\nMaximum memory available: " + maxMemory + " bytes\nUsed memory: " + usedMemory + " bytes\nOperating System: " + osName + " " + osVersion + " " + osArch + "\nProcessors: " + osProcs + "\nCurrent live threads: " + liveThreads + "\nTotal started threads: " + startedThreads + "\nPeak number of live threads: " + peakThreads + "\nCurrent daemon threads: " + deamonThreads + "\n===============================================================================\n"); } }
The Monitoring agent class is a regular Java class
package org.objectweb.proactive.examples.userguide.cmagent.simple; public class CMAgent { // empty constructor is required by Proactive public CMAgent() { } public State getCurrentState() { return new State(); } }
with only one method.
For this simple exercise we only need to add ProActive code in the Main class where we instantiate the active object.
package org.objectweb.proactive.examples.userguide.cmagent.simple; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.ActiveObjectCreationException; public class Main { public static void main(String args[]) { //TODO 1. Create the active object //TODO 2. Get the current state //TODO 3. Print the state //TODO 4. Stop the active object } }
You can find the skeleton code for the exercise in the Eclipse exercises workspace under the name
1. SimpleCMAgent
.
Create an active object using the org.objectweb.proactive.api.PAActiveObject.newActive(...)
static method.
Call the getCurrentState()
method and display the results.
Since the active object has a never ending thread we need to make an explicit call in order
to stop it. Use PAActiveObject.terminateActiveObject(...)
to stop the
active object.
Remove the Serializable
from the class State
and explain the results when running the example.
We will now show how to create the server object. For now, we want the
CMAgent
active object
to be created on the current Node (we will see later how to distribute the program). To create an instance of
a remotely accessible object we must use the PAActiveObject.newActive(...)
static method. We pass
as an argument the name of the class to be instantiated and arguments for the constructor of the class.
In our case CMAgent
does not need any arguments for the constructor and therefore we use null
.
//TODO 1. Create the active object CMAgent ao = (CMAgent) PAActiveObject.newActive(CMAgent.class.getName(), null);
Invoking a method on a remote active object is transparent and is similar to invoking a method on a local object of the same type. The user does not have to deal with catching exceptions related to the remote communication. The only modification brought to the code by ProActive is during the active objects creation. All the rest of the code can remain unmodified, fostering software reuse.
To invoke the getCurrentState()
method we execute:
//TODO 2. Get the current state
currentState = ao.getCurrentState().toString();
To stop the active object we call the PAActiveObject.terminateActiveObject(ao, false)
method. The
false
argument is used in order to call the terminate method as a regular request. If the
argument is true
the method call is served as an immediate service (executed as soon as the
current executing request is completed regardless of how many other requests might be waiting)
and synchronously (the caller thread blocks until it receives the results).
//TODO 4. Stop the active object
PAActiveObject.terminateActiveObject(ao, true);
Passive objects in ProActive are always passed by deep copy when returned as a method results.
If the object State does not implement Serializable
ProActive will not be able to
to make a deep copy of the object and will throw an exception.
The full code of the Main
class is the following.
package org.objectweb.proactive.examples.userguide.cmagent.simple; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.ActiveObjectCreationException; public class Main { public static void main(String args[]) { try { String currentState = new String(); //TODO 1. Create the active object CMAgent ao = (CMAgent) PAActiveObject.newActive(CMAgent.class.getName(), null); //TODO 2. Get the current state currentState = ao.getCurrentState().toString(); //TODO 3. Print the state System.out.println(currentState); //TODO 4. Stop the active object PAActiveObject.terminateActiveObject(ao, true); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } } }
Active objects, as their name indicates, have an activity of their
own (an internal thread). By default the active object steps through
the constructor, the initActivity
, the runActivity
,
and when the terminate
method is called on
the Body
of the active object through the endActivity
method.
It is possible to control the initialization, running, and ending phase of this
thread by implementing three interfaces: InitActive
,
RunActive
,and EndActive
.These interfaces define the
initActivity
, runActivity
and endActivity
methods. One of the reasons for using initActivity
method is the presence of the empty constructor in an active object. The initActivity
method is automatically called on the creation of an active object in order to set up the
object without using the constructor. The runActivity
method allows the user
to control the active object request queue. By implementing the EndActive
interface is also possible to
do clean up before the active object thread is stopped.
The following example will help you to understand how and when you
can initialize and clean the activity. The example will implement the InitActive
, RunActive
,
and
EndActive
interfaces. However RunActive
has a more complex structure than
it is presented here. To understand how to use EndActive
RunActive
interface read Chapter 9, Active Objects: Creation And Advanced Concepts
New classes used
org.objectweb.proactive.Body
- used to access the body of the active object
org.objectweb.proactive.Service
- used to access the queue of the active object
org.objectweb.proactive.InitActive
- used for defining the initActivity(Body body)
method, which is run at active object initialization
org.objectweb.proactive.EndActive
- used for defining the endActivity(Body body)
method, which is run at active object destruction
org.objectweb.proactive.RunActive
- used for defining the runActivity(Body body)
method, which manages the queue of requests
org.objectweb.proactive.core.util.wrapper.LongWrapper
- used to wrap the Long
return type
Previously used classes
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
The CMAgentInitialized
class extends the
CMAgent
class from the previous example, and implements
the interfaces InitActive
and EndActive
.
It acts as a server for the Main
class.
To implement the application we will create a class that inherits from the
CMAgent
class and implements the InitActive
, RunActive
, and EndActive
interfaces.
package org.objectweb.proactive.examples.userguide.cmagent.initialized; import org.objectweb.proactive.Body; import org.objectweb.proactive.EndActive; import org.objectweb.proactive.InitActive; import org.objectweb.proactive.RunActive; import org.objectweb.proactive.Service; import org.objectweb.proactive.core.util.wrapper.LongWrapper; import org.objectweb.proactive.examples.userguide.cmagent.simple.CMAgent; public class CMAgentInitialized extends CMAgent implements InitActive, RunActive, EndActive { public void initActivity(Body body) { //TODO 1. Print start information //TODO 2. Record start time } public void runActivity(Body body) { Service service = new Service(body); while(body.isAlive()){ //process requests while the body of the active object is alive //TODO 3. Wait for a request //TODO 4. Record time //TODO 5. Serve request //TODO 6. Calculate request duration //TODO 7. Increment the number of requests served } } public void endActivity(Body body) { //TODO 8. Calculate the running time of the active object using the start time recorded in initActivity() //TODO 9. Print various stop information } public LongWrapper getLastRequestServeTime() { //TODO 10. Use wrappers for primitive types so the calls are asynchronous return null; } }
By default an active object has a never ending thread that should be stopped when the object is
not needed anymore. The method terminate()
serves the purpose of destroying the
object. However, if an explicit call to terminate the object is not made, ProActive has its own
distributed garbage collection system that is able to decide when an active object can be destroyed.
The Main
is similar with the one in the previous example.
We will change the object created from CMAAgent
to
CMAAgentInitialized
and also we will call the terminate()
method to
destroy the object.
You can find the skeleton code for the exercise in the Eclipse exercises workspace under the name
2. InitializedCMAgent
.
Use initActivity(Body body)
to print information
about the start location of the active object and record the start time.
Use endActivity(Body body)
to print information about
the stop location of the active object, calculate the running time,
and print the number of requests served.
Calculate the last request duration
and count the requests using org.objectweb.proactive.Service.waitForRequest()
and org.objectweb.proactive.Service.serveOldest()
Use org.objectweb.proactive.core.util.wrapper.LongWrapper
to return a wrapped Long
value.
We only need to extend the CMAgent
class and implement the
interfaces. In initActivity(Body body)
we use body.getName()
to get the name of the active object and body.getNodeUrl()
to get the location.
In endActivity(Body body)
we also use body.getNodeUrl()
to get the location.
public void initActivity(Body body) { //TODO 1. Print start information System.out.println("### Started Active object " + body.getMBean().getName() + " on " + body.getMBean().getNodeUrl()); //TODO 2. Record start time startTime = System.currentTimeMillis(); } public void runActivity(Body body) { Service service = new Service(body); long currentRequestDuration = 0; while (body.isActive()) { //TODO 3. wait for a request service.waitForRequest(); // block until a request is received //TODO 4. Record time currentRequestDuration = System.currentTimeMillis(); //TODO 5. Serve request service.serveOldest(); //server the requests in a FIFO manner //TODO 6. Calculate request duration currentRequestDuration = System.currentTimeMillis() - currentRequestDuration; // an intermediary variable is used so // when calling getLastRequestServeTime() // we get the first value before the last request // i.e when calling getLastRequestServeTime // the lastRequestDuration is update with the // value of the getLastRequestServeTime call // AFTER the previous calculated value has been returned lastRequestDuration = currentRequestDuration; //TODO 7. Increment the number of requests served requestsServed++; } } public void endActivity(Body body) { //TODO 8. Calculate the running time of the active object using the start time recorded in initActivity() long runningTime = System.currentTimeMillis() - startTime; //TODO 9. Print various stop information System.out.println("### You have killed the active object. The final" + " resting place is on " + body.getNodeURL() + "\n### It has faithfully served " + requestsServed + " requests " + "and has been an upstanding active object for " + runningTime + " ms "); } public LongWrapper getLastRequestServeTime() { //TODO 10. Use wrappers for primitive types so the calls are asynchronous return new LongWrapper(lastRequestDuration); }
In the previous example the applications were deployed inside the same JVM. This section will focus on showing how to deploy the application on different nodes using deployment descriptors.
A first principle is to fully eliminate from the source code the following elements:
machine names
creation protocols
registry lookup protocols
The goal is to deploy any application anywhere without changing the source code. For instance, we must be able to use various protocols, rsh, ssh, Globus, LSF, etc., for the creation of the JVMs needed by the application. In the same manner, the discovery of existing resources or the registration of the ones created by the application can be done with various protocols such as RMIregistry, Globus etc. Therefore, we see that the creation, registration and discovery of resources have to be done externally to the application.
A second key principle is the capability to abstractly describe an application, or part of it, in terms of its conceptual activities. The description should indicate the various parallel or distributed entities in the program. For instance, an application that is designed to use three interactive visualization nodes, a node to capture input from a physics experiment, and a simulation engine designed to run on a cluster of machines should somewhere clearly advertise this information.
Now, one should note that the abstract description of an application and the way to deploy it are not independent piece of information. If for example, we have a simulation engine, it might register in a specific registry protocol, and if so, the other entities of the computation might have to use that lookup protocol to bind to the engine. Moreover, one part of the program can just lookup for the engine (assuming it is started independently), or explicitly create the engine itself. To summarize, in order to abstract away the underlying execution platform, and to allow a source-independent deployment, a framework has to provide the following elements:
an abstract description of the distributed entities of a parallel program or component,
an external mapping of those entities to real machines, using actual creation, registry, and lookup protocols.
To reach that goal, the programming model relies on the specific notion of Virtual Nodes (VNs):
a VN is identified as a name (a simple string)
a VN is used in a program source
a VN is defined and configured in a deployment descriptor (XML)
a VN, after activation, is mapped to one or to a set of actual ProActive Nodes
Of course, distributed entities (Active Objects), are created on Nodes, not on Virtual Nodes. There is a strong need for both Nodes and Virtual Nodes. Virtual Nodes are a much richer abstraction, as they provide mechanisms such as set or cyclic mapping. Another key aspect is the capability to describe and trigger the mapping of a single VN that generates the allocation of several JVMs. This is critical if we want to get at once machines from a cluster of PCs managed through Globus or LSF. It is even more critical in a Grid application, when trying to achieve the co-allocation of machines from several clusters across several continents.
Moreover, a Virtual Node is a concept of a distributed program or component, while a Node is actually a deployment concept: it is an object that lives in a JVM, hosting Active Objects. There is of course a correspondence between Virtual Nodes and Nodes: the function created by the deployment, the mapping. This mapping is specified in the Application Descriptor. The grid facilities are described in two deployment descriptor separated by the different concerns of the application developer and grid infrastructure administrator. In the grid deployment descriptor we describe
the resources provided by the infrastructure
how to acquire the resources provided by the infrastructure
In the application deployment descriptor we describe
how to launch the application
the resources needed by the application
the resource providers
The deployment descriptor is an XML
file containing
information on the properties listed above. We will use a
simple XML
file to deploy the monitoring agent on a
remote machine. The deployment XML
file is composed of several
parts, each with different options. For our example we
will use a simple version. To find out more about
deployment and deployment descriptors read
Chapter 18, ProActive Grid Component Model Deployment.
To read about the deprecated deployment structure read
Chapter 19, XML Deployment Descriptors.
.
The document uses the XML Schema present at the Oasis website.
To avoid mistakes when building XML
descriptors, ProActive
provides two XML
Schemas, one for each descriptor type. To
validate your file against the appropriate schema, the following line
must be put at the top of the XML
document.
<ProActiveDescriptor xmlns="urn:proactive:deployment:3.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:gcm:deployment:1.0 http://proactive.inria.fr/schemas/gcm/1.0/ExtensionSchemas.xsd">
We start with the Deployment Descriptor.
Both XML
files have a section for defining variables needed
later in the document. In our case, we define the
location of the ProActive and Java installations on the
local and remote machine. To start the JVM and use
ProActive on the remote machine, we need to define the
location of the Java and ProActive files. This can be done
by specifying the paths in the infrastructure section of
the deployment descriptor or by setting the CLASSPATH
and
JAVA_HOME
variables as described in
Chapter 2, ProActive Installation
.
<environment> <!-- LOCAL PATHS --> <descriptorVariable name="PROACTIVE_HOME" value="/user/vjuresch/home/workspace/ProActive" /> <descriptorVariable name="JAVA_HOME" value="/user/vjuresch/home/work/jdk1.6.0_02" /> <!-- REMOTE PATHS --> <descriptorVariable name="REMOTE_PROACTIVE_HOME" value="/user/vjuresch/home/workspace/ProActive" /> <descriptorVariable name="REMOTE_JAVA_HOME" value="/user/vjuresch/home/work/jdk1.6.0_02" /> </environment>
We first need to specify how the grid resources are organized together. This is done in the resources section, in which we describe a tree-like structure corresponding to the grid setup. There are three types of elements :
host : a single machine
bridge : a gateway to a set of machines which cannot be reached individually
group : a group of machine sharing an identical configuration (which is described by a host)
<resources> <bridge refid="rsh_cheypa"> <host refid="chepya" /> </bridge> </resources>
Next is the infrastructure part that defines the elements (hosts, bridges, groups) which are referenced in the resources part. Hosts define the needed file paths to access the JVM, and the machine's workload capacity, among other things. Bridges and groups define what kind of configuration they are. In this case we have a single machine accessible through rsh. The machine's configuration is described by the 'cheypa' host. The way to access it (through rsh) is defined by the 'rsh_cheypa' bridge.
<infrastructure> <hosts> <host id="cheypa" os="unix" hostCapacity="2" vmCapacity="1" > <homeDirectory base="root" relpath="/user/${USERNAME}/home" /> <tool id="java" path="/usr/java/j2sdk/bin/java" /> </host> <hosts> <bridges> <rshBridge id="rsh_chepypa" hostname="cheypa.inria.fr" /> </bridges> </infrastructure>
Next, we create the Application Descriptor, where we define the application and its requirements.
Like a Deployment Descriptor, an Application Descriptor starts with an environment section, which follows the same syntax. Next is the application section in which the application itself is described : its type (ProActive or stand-alone executable), its configuration (dependencies, invocation options) and its resource requirements (the virtual nodes). In this example we have a single virtual node named 'Hello'. The virtual nodes refer to node providers, which are defined in the next section, as sources of physical nodes.
<application> <proactive base="root" relpath="${proactive.home}"> <configuration> <applicationClasspath> <pathElement base="proactive" relpath="dist/lib/ProActive_examples.jar"/> </applicationClasspath> </configuration> <virtualNode id="Hello" capacity="1"> <nodeProvider refid="provider" /> </virtualNode> </proactive> </application>
Finally we link the Application Descriptor to one or several Deployment Descriptors through its resources part. In here you can define the node providers that are referenced by the virtual nodes in the previous section.
<?xml version="1.0" encoding="UTF-8"?> <GCMApplication xmlns="urn:gcm:application:1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:gcm:application:1.0 http://proactive.inria.fr/schemas/gcm/1.0/ApplicationDescriptorSchema.xsd"> <environment> <javaPropertyVariable name="proactive.home" /> <javaPropertyVariable name="user.dir" /> </environment> <application> <proactive base="root" relpath="${proactive.home}"> <configuration> <applicationClasspath> <pathElement base="proactive" relpath="dist/lib/ProActive_examples.jar"/> </applicationClasspath> </configuration> <virtualNode id="Hello" capacity="1"> <nodeProvider refid="provider" /> </virtualNode> </proactive> </application> <resources> <nodeProvider id="provider"> <file path="/path/to/deployment/descriptor" /> </nodeProvider> </resources> </GCMApplication>
The following figure ilustrates a simplified view of the deployment process. The ProActive application loads the deployment descriptor and deploys on the remote machine according to the settings in the descriptor. Although the process behind the deployment is fairly complicated it is made seamless by ProActive. In the application we only need to specify the deployment descriptor and tell ProActive to start the virtual nodes, nodes and active objects. The communication details are handled by ProActive according to the descriptor.
In this part of the tutorial we will see how to start a monitoring agent on a remote machine using the deployment methods explained previously. To be able to deploy on remote machines we just have to use the deployment file, add a method that tells ProActive to activate the nodes used and tell the active object to start on the remote node.
New classes
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
- used to create a GCMApplication from an Application Descriptor
org.objectweb.proactive.gcmdeployment.GCMApplication
- represents the application which is being deployed
org.objectweb.proactive.core.ProActiveException
- used to catch exception
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
- used to control and instantiate virtual node objects
org.objectweb.proactive.core.node.Node
- used to control and instantiate node objects
Previously used classes
org.objectweb.proactive.Body
- used to access the body of the active object
org.objectweb.proactive.PALifeCycle
- controls the lifecycle of the ProActive application
org.objectweb.proactive.Service
- used to access the queue of the active object
org.objectweb.proactive.InitActive
- used for defining the initActivity(Body body)
method, which is run at active object initialization
org.objectweb.proactive.EndActive
- used for defining the endActivity(Body body)
method, which is run at active object destruction
org.objectweb.proactive.RunActive
- used for defining the runActivity(Body body)
method, which manages the queue of requests
org.objectweb.proactive.core.util.wrapper.LongWrapper
- used to wrap the Long
return type
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
We will change the
Main
class to declare and load the deployment descriptors to be
used. For this we will use a
deploy()
method that returns the a Virtual Node which has several nodes (as specified in the deployment file)
that we can deploy on.
First, the method creates an object representation
of the deployment file, then activates all the nodes, and
then returns the first available node. We also have to change the deployment descriptor files to fit our local
settings.
package org.objectweb.proactive.examples.userguide.cmagent.deployed; import java.io.File; import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.api.PALifeCycle; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguideskeletons.cmagent.initialized.CMAgentInitialized; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; public class Main { //deployment method private static GCMVirtualNode deploy(String descriptor) throws NodeException, ProActiveException { //TODO 1. Create object representation of the deployment file //TODO 2. Activate all Virtual Nodes //TODO 3. Wait for all the virtual nodes to become ready //TODO 4. Get the first Virtual Node specified in the descriptor file //TODO 5. Return the virtual node return null; } public static void main(String args[]) { //TODO 6. Get the virtual node through the deploy method //TODO 7. Create the active object using a node on the virtual node //TODO 8. Get the current state from the active object //TODO 9. Print the state //TODO 10. Stop the active object //TODO 11. Stop the virtual node } }
Create GCMApplication from the application descriptor file using PAGCMDeployment.loadDescriptor(String descriptor)
Start the deployment of all the virtual nodes using GCMApplication.startDeployment()
Get a virtual node using GCMApplication.getVirtualNode(String virtualNodeName)
Create a virtual object node using the deployment method
Create the active object using an node from the virtual node
Get and print the state from the active object
Stop the application using GCMApplication.kill()
Change the State
class so the initialization of the variables takes place in the toString()
method.
Run the deployed application again and explain the different results.
In the deployment method we return the first virtual node found in the deployment descriptor:
//deployment method private static GCMVirtualNode deploy(String descriptor) throws NodeException, ProActiveException { //TODO 1. Create object representation of the deployment file pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //TODO 2. Activate all Virtual Nodes pad.startDeployment(); //TODO 3. Wait for all the virtual nodes to become ready pad.waitReady(); //TODO 4. Get the first Virtual Node specified in the descriptor file GCMVirtualNode vn = pad.getVirtualNodes().values().iterator().next(); //TODO 5. Return the virtual node return vn; }
The active object is created by using the first node on the virtual node:
//TODO 7. Create the active object using a node on the virtual node CMAgentInitialized ao = (CMAgentInitialized) PAActiveObject.newActive(CMAgentInitialized.class .getName(), new Object[] {}, vn.getANode());
The full
Main
class:
package org.objectweb.proactive.examples.userguide.cmagent.deployed; import java.io.File; import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.api.PALifeCycle; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguide.cmagent.initialized.CMAgentInitialized; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; public class Main { private static GCMApplication pad; //deployment method private static GCMVirtualNode deploy(String descriptor) throws NodeException, ProActiveException { //TODO 1. Create object representation of the deployment file pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //TODO 2. Activate all Virtual Nodes pad.startDeployment(); //TODO 3. Wait for all the virtual nodes to become ready pad.waitReady(); //TODO 4. Get the first Virtual Node specified in the descriptor file GCMVirtualNode vn = pad.getVirtualNodes().values().iterator().next(); //TODO 5. Return the virtual node return vn; } public static void main(String args[]) { try { //TODO 6. Get the virtual node through the deploy method GCMVirtualNode vn = deploy(args[0]); //TODO 7. Create the active object using a node on the virtual node CMAgentInitialized ao = (CMAgentInitialized) PAActiveObject.newActive(CMAgentInitialized.class .getName(), new Object[] {}, vn.getANode()); //TODO 8. Get the current state from the active object String currentState = ao.getCurrentState().toString(); //TODO 9. Print the state System.out.println(currentState); //TODO 10. Stop the active object PAActiveObject.terminateActiveObject(ao, false); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } catch (ProActiveException poExcep) { System.err.println(poExcep.getMessage()); } finally { //TODO 11. Stop the virtual node if (pad != null) pad.kill(); PALifeCycle.exitSuccess(); } } }
To deploy on a remote machine you will need to change the
hostname
tag to fit your machine name. In our case the remote
machine needs to have the
rsh
service running since we are using rsh
deployment.
next full deployment descriptor file is:
<?xml version="1.0" encoding="UTF-8"?> <GCMDeployment xmlns="urn:gcm:deployment:1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:gcm:deployment:1.0 http://proactive.inria.fr/schemas/gcm/1.0/ExtensionSchemas.xsd"> <environment> <javaPropertyVariable name="user.dir" /> <javaPropertyDescriptorDefault name="os" value="windows" /> <!-- LOCAL PATHS --> <descriptorVariable name="PROACTIVE_HOME" value="/user/vjuresch/home/workspace/ProActive" /> <descriptorVariable name="JAVA_HOME" value="/user/vjuresch/home/work/jdk1.6.0_02" /> <!-- REMOTE PATHS --> <descriptorVariable name="REMOTE_PROACTIVE_HOME" value= "/user/vjuresch/home/workspace/ProActive" /> <descriptorVariable name="REMOTE_JAVA_HOME" value="/user/vjuresch/home/work/jdk1.6.0_02" /> </environment> <resources> <bridge refid="rsh_cheypa"> <host refid="cheypa" /> </bridge> </resources> <infrastructure> <hosts> <host id="cheypa" os="unix" hostCapacity="2" vmCapacity="1"> <homeDirectory base="root" relpath="/user/${USERNAME}/home" /> <tool id="java" path="/usr/java/j2sdk/bin/java" /> </host> </hosts> <bridges> <rshBridge id="rsh_chepypa" hostname="cheypa.inria.fr" /> </bridges> </infrastructure> </GCMDeployment>
In this example we will use a chained call between agents deployed on several node in order to retrieve
information about the nodes. We have several agents that know their previous and next neighbour.
When an agent receives a request for a state returns its state and asks the its respective neighbour for the state.
If it doesn't have a neighbour then it just returns its state. We will use this example to show how
PAActiveObject.getStubOnThis()
method call is employed. Since an active object is actually a composite of
two objects, the method call PAActiveObject.getStubOnThis()
is the equivalent of this
in a regular java object. If we were to use this
we would get
a reference to the passive object and not to the stub of the active object.
There are no new classes used in this exercise. Instead we will show how to use
thePAActiveObject.getStubOnThis()
method.
Previously used classes
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
-
org.objectweb.proactive.core.ProActiveException
org.objectweb.proactive.gcmdeployment.GCMApplication
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
org.objectweb.proactive.core.node.Node
org.objectweb.proactive.Body
- used to access the body of the active object
org.objectweb.proactive.PALifeCycle
- controls the lifecycle of the ProActive application
org.objectweb.proactive.Service
- used to access the queue of the active object
org.objectweb.proactive.InitActive
- used for defining the initActivity(Body body)
method, which is run at active object initialization
org.objectweb.proactive.EndActive
- used for defining the endActivity(Body body)
method, which is run at active object destruction
org.objectweb.proactive.RunActive
- used for defining the runActivity(Body body)
method, which manages the queue of requests
org.objectweb.proactive.core.util.wrapper.LongWrapper
- used to wrap the Long
return type
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
There are no new concepts present in the Main
class
therefore we present it in full. In the Main
class
we just create the active objects and make the method calls on one of the active objects.
public static void main(String args[]) { try { GCMVirtualNode vn = deploy(args[0]); Vector<CMAgentChained> agents = new Vector<CMAgentChained>(); //create the active objects //create a collection of active objects for (Node node : vn.getCurrentNodes()) { CMAgentChained ao = (CMAgentChained) PAActiveObject.newActive(CMAgentChained.class .getName(), null, node); agents.add(ao); //connect to the neighbour int size = agents.size(); if (size > 1) { CMAgentChained lastAgent = agents.get(size - 1); CMAgentChained previousAgent = agents.get(size - 2); lastAgent.setPreviousNeighbour(previousAgent); } } //start chained call Vector<State> states = agents.get(agents.size() / 2).getAllPreviousStates(); for (State s : states) { System.out.println(s.toString()); } states = agents.get(agents.size() / 2).getAllNextStates(); for (State s : states) { System.out.println(s.toString()); } } catch (ActiveObjectCreationException e) { System.err.println(e.getMessage()); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ProActiveException e) { System.err.println(e.getMessage()); } finally { //stopping all the objects and JVMS if (pad != null) pad.kill(); PALifeCycle.exitSuccess(); } }
The agent class inherits from CMAgentInitialized
package org.objectweb.proactive.examples.userguide.cmagent.synch; import java.io.Serializable; import java.util.Vector; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.examples.userguide.cmagent.initialized.CMAgentInitialized; import org.objectweb.proactive.examples.userguide.cmagent.simple.State; public class CMAgentChained extends CMAgentInitialized implements Serializable { private CMAgentChained previousNeighbour; private CMAgentChained nextNeighbour; public void setPreviousNeighbour(CMAgentChained neighbour) { this.previousNeighbour = neighbour; //TODO 1. Pass a remote reference of this object to the neighbour // Hint: This object is "nextNeighbour" for previous neighbour if not null } public void setNextNeighbour(CMAgentChained neighbour) { this.nextNeighbour = neighbour; //TODO 2. Pass a remote reference of this object to the neighbour // Hint: This object is "previousNeighbour" for next neighbour if not null } public CMAgentChained getPreviousNeigbour() { return previousNeighbour; } public CMAgentChained getNextNeigbour() { return nextNeighbour; } public Vector<State> getAllPreviousStates() { System.out.println(PAActiveObject.getStubOnThis()); if (this.previousNeighbour != null) { System.out.println("Passing the call to the previous neighbour..."); // wait-by-necessity Vector<State> states = this.previousNeighbour.getAllPreviousStates(); // states is a future // TODO 3. Is this explicit synchronization mandatory ? (NO the wait was removed) return states; } else { System.out.println("No more previous neighbours.."); Vector<State> states = new Vector<State>(); states.add(this.getCurrentState()); return states; } } public Vector<State> getAllNextStates() { System.out.println(PAActiveObject.getStubOnThis()); if (this.nextNeighbour != null) { // wait-by-necessity System.out.println("Passing the call to the next neighbour.."); Vector<State> states = this.nextNeighbour.getAllNextStates(); // states is a future // TODO 4. Is this explicit synchronization mandatory ? (NO the wait was removed) return states; } else { System.out.println("No more next neighbours"); Vector<State> states = new Vector<State>(); states.add(this.getCurrentState()); return states; } } }
Write the code for connecting to the previous neighbour
Write the code for connecting to the next neighbour
Add and remove explicit synchronization between the agents when retrieving the states
To pass a remote reference to the active object we are currently in, we use PAActiveObject.getStubOnThis()
.
A simple use of the this
keyword will return a reference to the passive object components of the
active object and not a reference to the stub.
public void setPreviousNeighbour(CMAgentChained neighbour) { this.previousNeighbour = neighbour; //TODO 1. Pass a remote reference of this object to the neighbour // Hint: This object is "nextNeighbour" for previous neighbour if not null if (neighbour.getNextNeigbour() == null) neighbour.setNextNeighbour((CMAgentChained) PAActiveObject.getStubOnThis()); } public void setNextNeighbour(CMAgentChained neighbour) { this.nextNeighbour = neighbour; //TODO 2. Pass a remote reference of this object to the neighbour // Hint: This object is "previousNeighbour" for next neighbour if not null if (neighbour.getPreviousNeigbour() == null) neighbour.setPreviousNeighbour((CMAgentChained) PAActiveObject.getStubOnThis()); }
The full code for the CMAAgentChained
class is:
package org.objectweb.proactive.examples.userguide.cmagent.synch; import java.io.Serializable; import java.util.Vector; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.examples.userguide.cmagent.initialized.CMAgentInitialized; import org.objectweb.proactive.examples.userguide.cmagent.simple.State; public class CMAgentChained extends CMAgentInitialized implements Serializable { private CMAgentChained previousNeighbour; private CMAgentChained nextNeighbour; public void setPreviousNeighbour(CMAgentChained neighbour) { this.previousNeighbour = neighbour; //TODO 1. Pass a remote reference of this object to the neighbour // Hint: This object is "nextNeighbour" for previous neighbour if not null if (neighbour.getNextNeigbour() == null) neighbour.setNextNeighbour((CMAgentChained) PAActiveObject.getStubOnThis()); } public void setNextNeighbour(CMAgentChained neighbour) { this.nextNeighbour = neighbour; //TODO 2. Pass a remote reference of this object to the neighbour // Hint: This object is "previousNeighbour" for next neighbour if not null if (neighbour.getPreviousNeigbour() == null) neighbour.setPreviousNeighbour((CMAgentChained) PAActiveObject.getStubOnThis()); } public CMAgentChained getPreviousNeigbour() { return previousNeighbour; } public CMAgentChained getNextNeigbour() { return nextNeighbour; } public Vector<State> getAllPreviousStates() { System.out.println(PAActiveObject.getStubOnThis()); if (this.previousNeighbour != null) { System.out.println("Passing the call to the previous neighbour..."); // wait-by-necessity Vector<State> states = this.previousNeighbour.getAllPreviousStates(); // states is a future // TODO 3. Is this explicit synchronization mandatory ? (NO the wait was removed) states.add(this.getCurrentState()); return states; } else { System.out.println("No more previous neighbours.."); Vector<State> states = new Vector<State>(); states.add(this.getCurrentState()); return states; } } public Vector<State> getAllNextStates() { System.out.println(PAActiveObject.getStubOnThis()); if (this.nextNeighbour != null) { // wait-by-necessity System.out.println("Passing the call to the next neighbour.."); Vector<State> states = this.nextNeighbour.getAllNextStates(); // states is a future // TODO 4. Is this explicit synchronization mandatory ? (NO the wait was removed) states.add(this.getCurrentState()); return states; } else { System.out.println("No more next neighbours"); Vector<State> states = new Vector<State>(); states.add(this.getCurrentState()); return states; } } }
In the previous example we have some parts of the code that may trigger a wait-by-necessity
. A wait-by-necessity
happens when we try to use
a future that has not been updated yet. When a future in this state is used the thread trying to use the future blocks
until the value of the future is updated.
Our next example deals with migrating the Monitoring Agent between remote nodes. We will start the monitoring agent on one machine and then move it to other machines and report on the state of each JMV on the machines. To do that we will need to change the descriptor file to specify the nodes and the machines the nodes are mapped to. We also have to add a a method that enables us to tell the active object to migrate.
An active object must implement the Serializable interface (as it will be transferred through the network) in order to be able to migrate. For more information on the topic of object migration, check Chapter 11, Mobile Agents And Migration .
New classes
org.objectweb.proactive.api.PAMobileAgent
- used to tell the active object to migrate
org.objectweb.proactive.core.body.migration.MigrationException
- used to catch migration related exceptions
Previously used classes
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
-
org.objectweb.proactive.core.ProActiveException
org.objectweb.proactive.gcmdeployment.GCMApplication
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
org.objectweb.proactive.core.node.Node
org.objectweb.proactive.Body
- used to access the body of the active object
org.objectweb.proactive.PALifeCycle
- controls the lifecycle of the ProActive application
org.objectweb.proactive.Service
- used to access the queue of the active object
org.objectweb.proactive.InitActive
- used for defining the initActivity(Body body)
method, which is run at active object initialization
org.objectweb.proactive.EndActive
- used for defining the endActivity(Body body)
method, which is run at active object destruction
org.objectweb.proactive.RunActive
- used for defining the runActivity(Body body)
method, which manages the queue of requests
org.objectweb.proactive.core.util.wrapper.LongWrapper
- used to wrap the Long
return type
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
We create a CMAgentMigrator
class, that inherits from
CMAInitialized
. This class will implement all the non-functional
behavior concerning the migration, for which this example is created.
The migration has to be initiated by the active object itself. We will have to write the migrate method in the code of CMAgentMigrator - i.e. a method that contains an explicit call to the migration primitive.
In the main class we only have to change a few lines of code to migrate the agent. As you may notice there is more code for creating the textual menu than for controlling the agents.
package org.objectweb.proactive.examples.userguide.cmagent.migration; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.api.PALifeCycle; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.ActiveObjectCreationException; public class Main { private static GCMApplication pad; private static GCMVirtualNode deploy(String descriptor) throws NodeException, ProActiveException { //Create object representation of the deployment file pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //Activate all Virtual Nodes pad.startDeployment(); //Wait for all the virtual nodes to become ready pad.waitReady(); //Get the first Virtual Node specified in the descriptor file GCMVirtualNode vn = pad.getVirtualNodes().values().iterator().next(); return vn; } public static void main(String args[]) { try { BufferedReader inputBuffer = new BufferedReader(new InputStreamReader(System.in)); GCMVirtualNode vn = deploy(args[0]); //create the active object CMAgentMigrator ao = (CMAgentMigrator) PAActiveObject.newActive(CMAgentMigrator.class .getName(), new Object[] {}, vn.getANode()); int k = 1; int choice; while (k != 0) { //display the menu with the available nodes k = 1; for (Node node : vn.getCurrentNodes()) { //TODO 2. Add the node URL to the menu System.out.println(k + ". Statistics for node :" ); k++; } System.out.println("0. Exit"); //select a node do { System.out.print("Choose a node :> "); try { // Read am option from keyboard choice = Integer.parseInt(inputBuffer.readLine().trim()); } catch (NumberFormatException noExcep) { choice = -1; } } while (!(choice >= 1 && choice < k || choice == 0)); if (choice == 0) break; //TODO 3. Migrate the active object to the selected node: choice-1 //TODO 4. Get the state and the last request time and print them out //TODO 5. Display information for the selected node } } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } catch (IOException e) { System.err.println(e.getMessage()); } catch (ProActiveException e) { System.err.println(e.getMessage()); } finally{ //TODO 6. Stop all the objects and JVMS } } }
CMAgentMigrator skeleton class:
package org.objectweb.proactive.examples.userguide.cmagent.migration; import java.io.Serializable; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.examples.userguide.cmagent.initialized.CMAgentInitialized; public class CMAgentMigrator extends CMAgentInitialized implements Serializable { public void migrateTo(Node whereTo) { //TODO 1. Migrate the active object to the Node received as parameter } }
Write the code for the migration method using org.objectweb.proactive.api.PAMobileAgent.migrateTo(Node URL)
Create the menu using the node URLs - org.objectweb.proactive.api.Node.getNodeInformation().getURL()
Migrate the node
Get and print the state of the node
Start the application and exit immediately (press 0). While is there a 1 request processed ?
Start the application migrate the agent once and then exit. Why are there 3 requests processed ?
We will use the CMAgentInitialized
class and add a method in the Main
class that
makes the active object able to migrate. The new class also
has to implement the Serializable
interface in order to be sent over
the network.
Note that the call to the ProActive primitive
migrateTo
is the last one of the method migrateTo()
.
See Chapter 11, Mobile Agents And Migration for more information.
During the execution the JVMs are started according to the descriptor and one active object is started on the first JVM. The active object is then migrated to the first node on each virtual node created, each time returning Hello and the computer on which is located.
The full deployment descriptor files are:
<?xml version="1.0" encoding="UTF-8"?> <GCMApplication xmlns="urn:gcm:application:1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:gcm:application:1.0 http://proactive.inria.fr/schemas/gcm/1.0/ApplicationDescriptorSchema.xsd"> <environment> <javaPropertyVariable name="proactive.home" /> <javaPropertyVariable name="user.dir" /> <descriptorVariable name="hostCapacity" value="3"/> <descriptorVariable name="vmCapacity" value="1"/> </environment> <application> <proactive base="root" relpath="${proactive.home}"> <configuration> <applicationClasspath> <pathElement base="proactive" relpath="dist/lib/ProActive_examples.jar"/> </applicationClasspath> </configuration> <virtualNode id="Agent" capacity="3"/> </proactive> </application> <resources> <nodeProvider id="LOCAL"> <file path="GCMD_Local.xml" /> </nodeProvider> </resources> </GCMApplication>
and
<?xml version="1.0" encoding="UTF-8"?> <GCMDeployment xmlns="urn:gcm:deployment:1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:gcm:deployment:1.0 http://proactive.inria.fr/schemas/gcm/1.0/ExtensionSchemas.xsd "> <environment> <javaPropertyVariable name="user.dir" /> <javaPropertyDescriptorDefault name="os" value="unix" /> </environment> <resources> <host refid="hLocalhost" /> </resources> <infrastructure> <hosts> <host id="hLocalhost" os="${os}" hostCapacity="${hostCapacity}" vmCapacity="${vmCapacity}" > <homeDirectory base="root" relpath="${user.dir}" /> </host> </hosts> </infrastructure> </GCMDeployment>
In this part of the tutorial we will show how to use groups of active objects.
We will create several active objects that we add and remove from a group.
The group will be used to retrieve the a State
object from all the active objects in the
group.
In order to ease the use of the group communication,
ProActive provides a set of static methods in the PAGroup
class and a set of methods in the Group
interface.
ProActive also provides typed group communication,
meaning that only methods defined on classes or interfaces
implemented by members of the group can be called.
There are several ways to create groups of active objects.
Similar to active objects, we have instantiation based
creation and object based creation. Instantiation based creation
is done through newGroup(..) and newGroupInParallel while object
based creation is done through turnActiveAsGroup(...).
New Classes
org.objectweb.proactive.api.PAGroup
- used to create a group of active objects
org.objectweb.proactive.core.group.Group
- used to control the group of objects
Previously used classes
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
- used to create a object version of the deployment descriptor
org.objectweb.proactive.core.ProActiveException
- used to handle ProActive exceptions
org.objectweb.proactive.gcmdeployment.GCMApplication
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
org.objectweb.proactive.core.node.Node
- used to instantiate and control Node objects
org.objectweb.proactive.Body
- used to access the body of the active object
org.objectweb.proactive.PALifeCycle
- controls the lifecycle of the ProActive application
org.objectweb.proactive.Service
- used to access the queue of the active object
org.objectweb.proactive.InitActive
- used for defining the initActivity(Body body)
method, which is run at active object initialization
org.objectweb.proactive.EndActive
- used for defining the endActivity(Body body)
method, which is run at active object destruction
org.objectweb.proactive.RunActive
- used for defining the runActivity(Body body)
method, which manages the queue of requests
org.objectweb.proactive.core.util.wrapper.LongWrapper
- used to wrap the Long
return type
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
java.io.Serializable
- used to make the State
object serializable so it can
be sent across the network
java.lang.management.ManagementFactory
- used to get various information about the machine the active object is running on
java.net.InetAddress
- used to get the address of the active object is running on
java.net.UnknownHostException
- used to catch the exceptions that might be thrown when requesting host information
java.util.Date
- used to get the a time for the requested state
In this example we only need to modify the Main
class
to create the group of objects. To instantiate the active object we will use the CMAgentInitialized
class that we have defined previously.
Main skeleton class:
package org.objectweb.proactive.examples.userguide.cmagent.groups; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.util.Vector; import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.api.PAGroup; import org.objectweb.proactive.api.PALifeCycle; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.group.Group; import org.objectweb.proactive.core.mop.ClassNotReifiableException; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguide.cmagent.migration.CMAgentMigrator; import org.objectweb.proactive.examples.userguide.cmagent.simple.State; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; public class Main { private static GCMApplication pad; private static GCMVirtualNode deploy(String descriptor) throws NodeException, ProActiveException { //Create object representation of the deployment file pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //Activate all Virtual Nodes pad.startDeployment(); //Wait for all the virtual nodes to become ready pad.waitReady(); //Get the first Virtual Node specified in the descriptor file GCMVirtualNode vn = pad.getVirtualNodes().values().iterator().next(); return vn; } public static void main(String args[]) { try { Vector<CMAgentMigrator> agents = new Vector<CMAgentMigrator>(); BufferedReader inputBuffer = new BufferedReader(new InputStreamReader(System.in)); GCMVirtualNode vn = deploy(args[0]); //TODO 1. Create a new empty group CMAgentMigrator monitorsGroup = null; //TODO 2. Create a collection of active objects with on object on each node for (Node node : vn.getCurrentNodes()) { CMAgentMigrator ao = null; agents.add(ao); } //TODO 3. Get a management representation of the monitors group Group<CMAgentMigrator> gA = null; //ask for adding or removing nodes //get statistics int k = 1; int choice; while (k != 0) { //display the menu k = 1; System.out.println("Toggle monitored nodes (*) or display statistics: "); for (CMAgentMigrator agent : agents) { if (gA.contains(agent)) //TODO 5. Print the node URL System.out.println(" " + k + ".* " + " TODO: write node url here"); else System.out.println(" " + k + ". " + "TODO: write node url here"); k++; } System.out.println("-1. Display statistics for monitored nodes"); System.out.println(" 0. Exit"); //select a node do { System.out.print("Choose a node to add or remove :> "); try { // Read am option from keyboard choice = Integer.parseInt(inputBuffer.readLine().trim()); } catch (NumberFormatException noExcep) { choice = -1; } } while (!(choice >= 1 && choice < k || choice == 0 || choice == -1)); if (choice == 0) break; if (choice == -1) { State resultsGroup = monitorsGroup.getCurrentState(); while (PAGroup.size(resultsGroup) > 0) { //TODO 6. Use PAGroup.waitAndGetOneThenRemoveIt() to control the list of State futures State statistic = null; System.out.println(statistic.toString()); } } else { if (gA.contains(agents.elementAt(choice - 1))) gA.remove(agents.elementAt(choice - 1)); else gA.add(agents.elementAt(choice - 1)); } } } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } catch (IOException e) { System.err.println(e.getMessage()); } catch (ClassNotReifiableException e) { System.err.println(e.getMessage()); } catch (ClassNotFoundException e) { System.err.println(e.getMessage()); } catch (ProActiveException e) { // TODO Auto-generated catch block System.err.println(e.getMessage()); } finally{ //stopping all the objects and JVMS if (pad!=null) pad.kill(); PALifeCycle.exitSuccess(); } } }
Create a new empty group using PAGroup.newGroup(..)
Create a collection of active objects with on object on each node
Get a management representation of the monitors group using the Group
interface
Print the Node URL using PAActiveObject.getActiveObjectNodeUrl(...)
Use PAGroup.waitAndGetOneThenRemoveIt()
to control the list of State
futures
To create the group we need to specify the type of the group. In our case we use the
previously defined CMAgentMigrator
. We first create the group and then
get a management representation of the group through the Group
interface.
The Group
interface has the necessary methods for adding and removing members
of the group.
//TODO 1. Create a new empty group CMAgentMigrator monitorsGroup = (CMAgentMigrator) PAGroup.newGroup(CMAgentMigrator.class .getName()); //TODO 2. Create a collection of active objects with on object on each node for (Node node : vn.getCurrentNodes()) { CMAgentMigrator ao = (CMAgentMigrator) PAActiveObject.newActive(CMAgentMigrator. class .getName(), new Object[] {}, node); agents.add(ao); } //TODO 3. Get a management representation of the monitors group Group<CMAgentMigrator> gA = PAGroup.getGroup(monitorsGroup);
We use synchronization to wait for all the agents to send the states. As an agent
returns a State
we remove it from the list of futures.
//TODO 5. Use PAGroup.waitAndGetOneThenRemoveIt() to control the list of State futures
State statistic = (State) PAGroup.waitAndGetOneThenRemoveIt(resultsGroup);
The full Main
method:
public static void main(String args[]) { try { Vector<CMAgentMigrator> agents = new Vector<CMAgentMigrator>(); BufferedReader inputBuffer = new BufferedReader(new InputStreamReader(System.in)); GCMVirtualNode vn = deploy(args[0]); //TODO 1. Create a new empty group CMAgentMigrator monitorsGroup = (CMAgentMigrator) PAGroup.newGroup(CMAgentMigrator.class .getName()); //TODO 2. Create a collection of active objects with on object on each node for (Node node : vn.getCurrentNodes()) { CMAgentMigrator ao = (CMAgentMigrator) PAActiveObject.newActive(CMAgentMigrator. class .getName(), new Object[] {}, node); agents.add(ao); } //TODO 3. Get a management representation of the monitors group Group<CMAgentMigrator> gA = PAGroup.getGroup(monitorsGroup); //ask for adding or removing nodes //get statistics int k = 1; int choice; while (k != 0) { //display the menu k = 1; System.out.println("Toggle monitored nodes (*) or display statistics: "); for (CMAgentMigrator agent : agents) { if (gA.contains(agent)) //TODO 5. Print the node URL System.out.println(" " + k + ".* " + PAActiveObject.getActiveObjectNodeUrl(agent)); else System.out.println(" " + k + ". " + PAActiveObject.getActiveObjectNodeUrl(agent)); k++; } System.out.println("-1. Display statistics for monitored nodes"); System.out.println(" 0. Exit"); //select a node do { System.out.print("Choose a node to add or remove :> "); try { // Read am option from keyboard choice = Integer.parseInt(inputBuffer.readLine().trim()); } catch (NumberFormatException noExcep) { choice = -1; } } while (!(choice >= 1 && choice < k || choice == 0 || choice == -1)); if (choice == 0) break; if (choice == -1) { State resultsGroup = monitorsGroup.getCurrentState(); while (PAGroup.size(resultsGroup) > 0) { //TODO 5. Use PAGroup.waitAndGetOneThenRemoveIt() to control the list of State futures State statistic = (State) PAGroup.waitAndGetOneThenRemoveIt(resultsGroup); System.out.println(statistic.toString()); } } else { if (gA.contains(agents.elementAt(choice - 1))) gA.remove(agents.elementAt(choice - 1)); else gA.add(agents.elementAt(choice - 1)); } } } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ActiveObjectCreationException aoExcep) { System.err.println(aoExcep.getMessage()); } catch (IOException e) { System.err.println(e.getMessage()); } catch (ClassNotReifiableException e) { System.err.println(e.getMessage()); } catch (ClassNotFoundException e) { System.err.println(e.getMessage()); } catch (ProActiveException e) { // TODO Auto-generated catch block System.err.println(e.getMessage()); } finally { //stopping all the objects and JVMS if (pad != null) pad.kill(); PALifeCycle.exitSuccess(); } }
In this example we will show how to expose an active object as a web service.
We will use the same monitoring agent as in the previous examples and
expose it as a web service through the Tomcat application server. To do this
we have to use the generate proActive.war
file.
ProActive automatically generates a description of the web service
when the active object is exposed and also create the webservice WDSL file.
New Classes
org.objectweb.proactive.extensions.webservices.WebServices
- used to transform the active object into a web service
Previously used classes
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.core.node.NodeException
- used to catch the exceptions that the creation of the Node might throw
org.objectweb.proactive.ActiveObjectCreationException
- used to catch the exceptions that the creation of the active object might throw
In this example we extend CMAgentInitialized
and create a CMAgentWebService
which we expose through the main method.
Since the usage of the webservice is independent of the underlying implementation we give here only a Java example of how to access the webservice.
public class CMAgentWebServiceClient { public static void main(String[] args) { String address; if (args.length == 0) { address = "http://localhost:8080"; } else { address = args[0]; } if (!address.startsWith("http://")) { address = "http://" + address; } address += WSConstants.SERV_RPC_ROUTER; String namespaceURI = "cmAgentService"; String serviceName = "cmAgentService"; String portName = "getLastRequestServeTime"; ServiceFactory factory; try { factory = ServiceFactory.newInstance(); Service service = factory.createService(new QName(serviceName)); Call call = service.createCall(new QName(portName)); call.setTargetEndpointAddress(address); call.setOperationName(new QName(namespaceURI, portName)); Object[] inParams = new Object[0]; String name = ((String) call.invoke(inParams)); System.out.println(name); } catch (ServiceException e) { e.printStackTrace(); } catch (RemoteException e) { e.printStackTrace(); } } }
In the CMAgentWebService
we only need to write one line of code to use the
active object as an webservice.
package org.objectweb.proactive.examples.userguide.cmagent.webservice; import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguide.cmagent.initialized.CMAgentInitialized; import org.objectweb.proactive.extensions.webservices.WebServices; public class CMAgentService extends CMAgentInitialized { public static void main(String[] args) { String url = "http://localhost:8080"; System.out.println("Started a monitoring agent on : " + url); try { CMAgentService hw = (CMAgentService) PAActiveObject.newActive( "org.objectweb.proactive.examples.userguide.cmagent.webservice.CMAgentService", new Object[] {}); //TODO 1. /*******************************************************/ /* Expose as web service (on URL 'url') the methods /* "getLastRequestServeTime" and "getCurrentState" /* of 'hw' CMAgentService. Name your service "cmAgentService"*/ /*******************************************************/ } catch (ActiveObjectCreationException e) { e.printStackTrace(); } catch (NodeException e) { e.printStackTrace(); } } }
There are several parameters that are needed for creating the webservice.
The first parameter passed is the active object, the second one is the server URL,
the third one the name we want for the webservice, and the fourth is an
array of String
with the list of methods we want exposed.
WebServices.exposeAsWebService(hw, url, "cmAgentService", new String[] { "getLastRequestServeTime", "getCurrentState" });
This chapter presents a step by step tutorial whose the purpose is to develop a distributed application to test if a number is prime using a self-made master/worker. The program to be created is a distributed version of a sequential primality test. The parallelism shown here is called data-level parallelism because the same operation can be applied simultaneously to multiples piece of data.
The first step is to study the sequential version of the application and to determinate what could be parallelized.
The second part introduces the distribution of the computation using the Computing and Monitoring agents from the previous tutorial. This part highlights the use of asynchronous communications and future monitoring as well as a straightforward round-robin load-balancer.
Finally, the last part of the tutorial presents the Master-Worker API featured by ProActive. This API removes all the complexity introduced by the distribution simply by handling it internally.
In the next section we will look at a simple sequential version of the application to find if a candidate number is prime.
The provided application uses a straightforward algorithm : Given a candidate number m, the algorithm loops over all the integers from 2 to square root of m (rather than to m − 1). At each step, it divides the candidate number m by the current integer ( the running divisor ). The loop ends when either :
the running divisor evenly divides the candidate number. The candidate number is not a prime number.
the running divisor is greater than the square root of the candidate number. The candidate number is a prime.
The
main
method selects a range between 2 and the square root of the candidate and
calls
isPrime
method that tries to divide the candidate by a divider in
a given range.
For this example a prime number is given as candidate by
default.
package org.objectweb.proactive.examples.userguide.primes.sequential; /** * This class illustrates a sequential algorithm for primality test. * <p> * Some primes : 4398042316799l, 63018038201, 2147483647 * * @author The ProActive Team * */ public class Main { public static void main(String[] args) { // The default value for the candidate to test (is prime) long candidate = 3093215881333057l; // Parse the number from args if there is some if (args.length > 0) { try { candidate = Long.parseLong(args[0]); } catch (NumberFormatException numberException) { System.err.println(numberException.getMessage()); System.err.println("Usage: Main <candidate>"); } } // We don't need to check numbers greater than the square-root of the // candidate in this algorithm long squareRootOfCandidate = (long) Math.ceil(Math.sqrt(candidate)); // Begin from 2 the first known prime number long begin = 2; // Until the end of the range long end = squareRootOfCandidate; // Check the primality boolean isPrime = Main.isPrime(candidate, begin, end); // Display the result System.out.println("\n" + candidate + (isPrime ? " is prime." : " is not prime.") + "\n"); } /** * Tests a primality of a specified number in a specified range. * * @param candidate * the candidate number to check * @param begin * starts check from this value * @param end * checks until this value * @return <code>true</code> if is prime; <code>false</code> otherwise */ public static Boolean isPrime(long candidate, long begin, long end) { for (long divider = begin; divider < end; divider++) { if ((candidate % divider) == 0) { return false; } } return true; } }
In the next section we will distribute the computation using agents from the precedent tutorial.
ProActive
org.objectweb.proactive.api.PAActiveObject
- used to create an instance of an active object
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
- used for the deployement
org.objectweb.proactive.api.PAFuture
- used to catch the exceptions that the creation
of the active object might throw
org.objectweb.proactive.core.util.wrapper.BooleanWrapper
- used to wrap the primitive boolean type
org.objectweb.proactive.gcmdeployment.GCMApplication
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
Other
org.objectweb.proactive.examples.userguide.cmagent.simple.CMAgent
- parent class of the workers
java.util.Vector
- used by the manager to store future results
The test range from 2 to sqrt(candidate) is divided into a number of intervals that will be sent asynchronously to workers by the manager using a round robin algorithm. Once all intervals was sent the manager waits for any answers and checks the result.
Each worker serves requests in a FIFO order, meanwhile the order of result reception by the manager is indeterministic, it depends on various factors like workers load, network latency, etc ...
Worker and manager classes have several methods
implementing the mechanism described above. The manager
class has a method for adding workers to its list -
addWorker(CMAgentPrimeWorker)
, and a method that distributes the workload by
performing method calls on workers -
isPrime(long number)
. The worker class has a method that checks if a number
is prime -
isPrime(long number, long begin, long end)
.
A main class is used to deploy the manager and its
workers on a specified deployment descriptor.
For this example the manager and worker classes without
code look like this. Try to fill in the code for the
methods. An example implementation is also provided
below.
package org.objectweb.proactive.examples.userguide.primes.distributed; import java.util.Vector; import org.objectweb.proactive.api.PAFuture; import org.objectweb.proactive.core.util.wrapper.BooleanWrapper; /** * @author The ProActive Team */ public class CMAgentPrimeManager { /** * A vector of references on workers */ private Vector<CMAgentPrimeWorker> workers = new Vector<CMAgentPrimeWorker>(); /** * Default interval size */ public static final int INTERVAL_SIZE = 100; /** * Empty no-arg constructor needed by ProActive */ public CMAgentPrimeManager() { } /** * Tests a primality of a specified number. Synchronous ! * * @param number * The number to test * @return <code>true</code> if is prime; <code>false</code> otherwise */ public boolean isPrime(long number) { // We don't need to check numbers greater than the square-root of the // candidate in this algorithm long squareRootOfCandidate = (long) Math.ceil(Math.sqrt(number)); // Begin from 2 the first known prime number long begin = 2; // The number of intervals long nbOfIntervals = (long) Math.ceil(squareRootOfCandidate / INTERVAL_SIZE); // Until the end of the first interval long end = INTERVAL_SIZE; // The vector of futures final Vector<BooleanWrapper> answers = new Vector<BooleanWrapper>(); // Non blocking (asynchronous method call) for (int i = 0; i <= nbOfIntervals; i++) { // Use round robin selection of worker int workerIndex = i % workers.size(); CMAgentPrimeWorker worker = workers.get(workerIndex); //TODO 1. Send asynchronous method call to the worker //TODO 2. Add the future result to the vector of answers // Update the begin and the end of the interval begin = end + 1; end += INTERVAL_SIZE; } // Once all requests was sent boolean prime = true; // Loop until a worker returns false or vector is empty (all results have been checked) int intervalNumber = 0; while (!answers.isEmpty() && prime) { // TODO 3. Block until a new response is available // by using a static method from org.objectweb.proactive.api.PAFuture // Check the answer prime = answers.get(intervalNumber).booleanValue(); // Remove the actualized future answers.remove(intervalNumber); } return prime; } /** * Adds a worker to the local vector * * @param worker * The worker to add to the vector */ public void addWorker(CMAgentPrimeWorker worker) { this.workers.add(worker); } }
package org.objectweb.proactive.examples.userguide.primes.distributed; import org.objectweb.proactive.core.util.wrapper.BooleanWrapper; import org.objectweb.proactive.examples.userguideskeletons.cmagent.simple.CMAgent; /** * @author The ProActive Team */ public class CMAgentPrimeWorker extends CMAgent { /** * Tests a primality of a specified number in a specified range. * * @param candidate * the candidate number to check * @param begin * starts check from this value * @param end * checks until this value * @return <code>true</code> if is prime; <code>false</code> otherwise */ public BooleanWrapper isPrime(final long candidate, final long begin, final long end) { try { //Used for slowing down the application for in order //to let one stop it for checking fault tolerance behavior Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } //TODO 4. Return a reifiable wrapper for the Boolean type // for asynchronous calls return null; } }
package org.objectweb.proactive.examples.userguide.primes.distributed; import java.io.File; import java.util.Collection; import java.util.Iterator; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.api.PADeployment; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.descriptor.data.ProActiveDescriptor; import org.objectweb.proactive.core.descriptor.data.VirtualNode; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguide.cmagent.simple.CMAgent; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; /** * This class illustrates a distributed version of the sequential algorithm for * primality test based on the {@link CMAgent}. * <p> * Some primes : 3093215881333057, 4398042316799, 63018038201, 2147483647 * * @author The ProActive Team * */ public class Main { private static GCMApplication pad; private static Collection<GCMVirtualNode> deploy(String descriptor) { try { pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //active all Virtual Nodes pad.startDeployment(); pad.waitReady(); return pad.getVirtualNodes().values(); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ProActiveException proExcep) { System.err.println(proExcep.getMessage()); } return null; } public static void main(String[] args) { // The default value for the candidate to test (is prime) long candidate = 2147483647l; // Parse the number from args if there is some if (args.length > 1) { try { candidate = Long.parseLong(args[1]); } catch (NumberFormatException numberException) { System.err.println("Usage: Main <candidate>"); System.err.println(numberException.getMessage()); } } try { Collection<GCMVirtualNode> vNodes = deploy(args[0]); GCMVirtualNode vNode = vNodes.iterator().next(); // create the active object on the first node on // the first virtual node available // start the master CMAgentPrimeManager manager = (CMAgentPrimeManager) PAActiveObject.newActive( CMAgentPrimeManager.class.getName(), new Object[] {}, vNode.getANode()); //TODO 5: iterate through all nodes, deploy // a worker per node and add it to the manager // Check the primality (Send a synchronous method call to the manager) boolean isPrime = manager.isPrime(candidate); // Display the result System.out.println("\n" + candidate + (isPrime ? " is prime." : " is not prime.") + "\n"); // Free all resources pad.kill(); } catch (Exception e) { e.printStackTrace(); } finally { System.exit(0); } } }
Once you filled the code and managed to run the application successfully, let's simulate a crash on a worker JVM to know if this application is fault-tolerant.
For this purpose add a Thread.sleep()
in the
isPrime
method of the CMAgentPrimeWorker
class
in order to have time to monitor this application with IC2D.
From IC2D right-click on the JVM that contains a worker and hit the "Kill This JVM" option. Then Expect The Unexpected !
Typically the user will need to catch a
org.objectweb.proactive.core.body.exceptions.FutureMonitoringPingFailureException
exception manually to handle such failures.
Moreover the application will never end or return an incorrect result since the manager will wait infinitely for an answer that never comes from a crashed worker.
The Main
class that deploys the manager and the workers.
package org.objectweb.proactive.examples.userguide.primes.distributed; import java.io.File; import java.util.Collection; import java.util.Iterator; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.node.NodeException; import org.objectweb.proactive.examples.userguide.cmagent.simple.CMAgent; import org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment; import org.objectweb.proactive.gcmdeployment.GCMApplication; import org.objectweb.proactive.gcmdeployment.GCMVirtualNode; /** * This class illustrates a distributed version of the sequential algorithm for * primality test based on the {@link CMAgent}. * <p> * Some primes : 3093215881333057, 4398042316799, 63018038201, 2147483647 * * @author The ProActive Team * */ public class Main { private static GCMApplication pad; private static Collection<GCMVirtualNode> deploy(String descriptor) { try { pad = PAGCMDeployment.loadApplicationDescriptor(new File(descriptor)); //active all Virtual Nodes pad.startDeployment(); pad.waitReady(); return pad.getVirtualNodes().values(); } catch (NodeException nodeExcep) { System.err.println(nodeExcep.getMessage()); } catch (ProActiveException proExcep) { System.err.println(proExcep.getMessage()); } return null; } public static void main(String[] args) { // The default value for the candidate to test (is prime) long candidate = 2147483647l; // Parse the number from args if there is some if (args.length > 1) { try { candidate = Long.parseLong(args[1]); } catch (NumberFormatException numberException) { System.err.println("Usage: Main <candidate>"); System.err.println(numberException.getMessage()); } } try { Collection<GCMVirtualNode> vNodes = deploy(args[0]); GCMVirtualNode vNode = vNodes.iterator().next(); // create the active object on the first node on // the first virtual node available // start the master CMAgentPrimeManager manager = (CMAgentPrimeManager) PAActiveObject.newActive( CMAgentPrimeManager.class.getName(), new Object[] {}, vNode.getANode()); //TODO 5: iterate through all nodes, deploy // a worker per node and add it to the manager Iterator<Node> nodesIt = vNode.getCurrentNodes().iterator(); while (nodesIt.hasNext()) { Node node = nodesIt.next(); CMAgentPrimeWorker worker = (CMAgentPrimeWorker) PAActiveObject.newActive( CMAgentPrimeWorker.class.getName(), new Object[] {}, node); manager.addWorker(worker); } // Check the primality (Send a synchronous method call to the manager) boolean isPrime = manager.isPrime(candidate); // Display the result System.out.println("\n" + candidate + (isPrime ? " is prime." : " is not prime.") + "\n" ); // Free all resources pad.kill(); } catch (Exception e) { e.printStackTrace(); } finally { System.exit(0); } } }
The CMAgentPrimeManager
class that distributes the intervals to workers.
package org.objectweb.proactive.examples.userguide.primes.distributed; import java.util.Vector; import org.objectweb.proactive.api.PAFuture; import org.objectweb.proactive.core.util.wrapper.BooleanWrapper; /** * @author The ProActive Team */ public class CMAgentPrimeManager { /** * A vector of references on workers */ private Vector<CMAgentPrimeWorker> workers = new Vector<CMAgentPrimeWorker>(); /** * Default interval size */ public static final int INTERVAL_SIZE = 100; /** * Empty no-arg constructor needed by ProActive */ public CMAgentPrimeManager() { } /** * Tests a primality of a specified number. Synchronous ! * * @param number * The number to test * @return <code>true</code> if is prime; <code>false</code> otherwise */ public boolean isPrime(long number) { // We don't need to check numbers greater than the square-root of the // candidate in this algorithm long squareRootOfCandidate = (long) Math.ceil(Math.sqrt(number)); // Begin from 2 the first known prime number long begin = 2; // The number of intervals long nbOfIntervals = (long) Math.ceil(squareRootOfCandidate / INTERVAL_SIZE); // Until the end of the first interval long end = INTERVAL_SIZE; // The vector of futures final Vector<BooleanWrapper> answers = new Vector<BooleanWrapper>(); // Non blocking (asynchronous method call) for (int i = 0; i <= nbOfIntervals; i++) { // Use round robin selection of worker int workerIndex = i % workers.size(); CMAgentPrimeWorker worker = workers.get(workerIndex); //TODO 1. Send asynchronous method call to the worker // Send asynchronous method call to the worker BooleanWrapper res = worker.isPrime(number, begin, end); //TODO 2. Add the future result to the vector of answers // Adds the future to the vector answers.add(res); // Update the begin and the end of the interval begin = end + 1; end += INTERVAL_SIZE; } // Once all requests was sent boolean prime = true; int intervalNumber = 0; // Loop until a worker returns false or vector is empty (all results have been checked) while (!answers.isEmpty() && prime) { // TODO 3. Block until a new response is available // by using a static method from org.objectweb.proactive.api.PAFuture // Will block until a new response is available intervalNumber = PAFuture.waitForAny(answers); // Check the answer prime = answers.get(intervalNumber).booleanValue(); // Remove the actualized future answers.remove(intervalNumber); } return prime; } /** * Adds a worker to the local vector * * @param worker * The worker to add to the vector */ public void addWorker(CMAgentPrimeWorker worker) { this.workers.add(worker); } }
The CMAgentPrimeWorker
class that tests if a candidate is prime.
package org.objectweb.proactive.examples.userguide.primes.distributed; import org.objectweb.proactive.core.util.wrapper.BooleanWrapper; import org.objectweb.proactive.examples.userguide.cmagent.simple.CMAgent; /** * @author The ProActive Team */ public class CMAgentPrimeWorker extends CMAgent { /** * Tests a primality of a specified number in a specified range. * * @param candidate * the candidate number to check * @param begin * starts check from this value * @param end * checks until this value * @return <code>true</code> if is prime; <code>false</code> otherwise */ public BooleanWrapper isPrime(final long candidate, final long begin, final long end) { try { //Used for slowing down the application for in order //to let one stop it for checking fault tolerance behavior Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } //TODO 4. Return a reifiable wrapper for the Boolean type // for asynchronous calls. for (long divider = begin; divider < end; divider++) { if ((candidate % divider) == 0) { return new BooleanWrapper(false); } } return new BooleanWrapper(true); } }
To compile and run the application you need the
Main
class of this example.
The command line for running the application is the following the optional second parameter being the number to test:
java -Djava.security.policy=proactive.java.policy -Dlog4j.configuration=file:proactive-log4j Main deployment.xml 2147483647
In the next section we will distribute the computation using the Master-Worker API.
ProActive
org.objectweb.proactive.extensions.gcmdeployment.PAGCMDeployment
- used for the deployement
org.objectweb.proactive.gcmdeployment.GCMApplication
org.objectweb.proactive.gcmdeployment.GCMVirtualNode
org.objectweb.proactive.extensions.masterworker.ProActiveMaster
- used to create a master
org.objectweb.proactive.extensions.masterworker.interfaces.Task
- used to submit tasks to the master
Other
java.util.List
- used by to store the prime computation tasks
The application uses the same trivial distribution pattern as in the precedent version.
Note that with the Master-Worker API there is no need to deal explicitly with primitive type wrappers, these kind of pitfalls are simply handled internally.
For this example the master and worker classes without code look like this. Try to fill in the code for the methods. An example implementation is also provided below.
package org.objectweb.proactive.examples.userguide.primes.distributedmw; import org.objectweb.proactive.extensions.masterworker.interfaces.Task; import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory; public class FindPrimeTask implements Task<Boolean> { private long begin; private long end; private long taskCandidate; //TODO 1. Write the constructor for this task public FindPrimeTask(){} public Boolean run(WorkerMemory memory) { //TOOD 2. Fill the code that checks if the taskCandidate // is prime. Note that no wrappers are needed ! return new Boolean(false); } }
package org.objectweb.proactive.examples.userguide.primes.distributedmw; import java.util.ArrayList; import java.util.List; import java.net.URL; import org.objectweb.proactive.extensions.masterworker.ProActiveMaster; /** * * Some primes : 3093215881333057l, 4398042316799l, 63018038201, 2147483647 * * @author The ProActive Team * */ public class PrimeExampleMW { /** * Default interval size */ public static final int INTERVAL_SIZE = 100; public static void main(String[] args) { // The default value for the candidate to test (is prime) long candidate = 2147483647l; // Parse the number from args if there is some if (args.length > 1) { try { candidate = Long.parseLong(args[1]); } catch (NumberFormatException numberException) { System.err.println("Usage: PrimeExampleMW <candidate>"); System.err.println(numberException.getMessage()); } } try { // Create the Master ProActiveMaster<FindPrimeTask, Boolean> master = new ProActiveMaster<FindPrimeTask, Boolean>(); // Deploy resources master.addResources(new URL("file://" + args[0])); // Create and submit the tasks master.solve(createTasks(candidate)); //TODO 3. Wait all results from master */ // Collect results List<Boolean> results = null; // Test the primality boolean isPrime = true; for (Boolean result : results) { isPrime = isPrime && result; } // Display the result System.out.println("\n" + candidate + (isPrime ? " is prime." : " is not prime.") + "\n"); // Terminate the master and free all resources master.terminate(true); } catch (Exception e) { e.printStackTrace(); } finally { System.exit(0); } } /** * Creates the prime computation tasks to be solved * * @return A list of prime computation tasks */ public static List<FindPrimeTask> createTasks(long number) { List<FindPrimeTask> tasks = new ArrayList<FindPrimeTask>(); // We don't need to check numbers greater than the square-root of the // candidate in this algorithm long squareRootOfCandidate = (long) Math.ceil(Math.sqrt(number)); // Begin from 2 the first known prime number long begin = 2; // The number of intervals long nbOfIntervals = (long) Math.ceil(squareRootOfCandidate / INTERVAL_SIZE); // Until the end of the first interval long end = INTERVAL_SIZE; for (int i = 0; i <= nbOfIntervals; i++) { //TODO 4. Create a new task for the current interval and // add it to the list of tasks // Update the begin and the end of the interval begin = end + 1; end += INTERVAL_SIZE; } return tasks; } }
Once you filled the code and managed to run the application successfully, let's crash a worker to see the difference with the precedent application.
For this purpose add a Thread.sleep()
in the
run()
method of the FindPrimeTask
class.
Like in the precedent application, from IC2D, kill a JVM that contains a worker. Then expect no exceptions or errors !
The main difference with the precedent application is that the master does not send any tasks to the workers. In fact it is the workers that asks the master for tasks. Such "work-stealing" pattern proves to be fault-tolerant and is one the benefits from using a High-Level ProActive API.
The FindPrimeTask
class that performs the computation.
package org.objectweb.proactive.examples.userguide.primes.distributedmw; import org.objectweb.proactive.extensions.masterworker.interfaces.Task; import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory; /** * Task to find if any number in a specified interval divides the given * candidate * * @author The ProActive Team * */ public class FindPrimeTask implements Task<Boolean> { private long begin; private long end; private long taskCandidate; //TODO 1. Write the constructor for this task public FindPrimeTask(long taskCandidate, long begin, long end) { this.begin = begin; this.end = end; this.taskCandidate = taskCandidate; } //TOOD 2. Fill the code that checks if the taskCandidate // is prime. Note that no wrappers are needed ! public Boolean run(WorkerMemory memory) { try { Thread.sleep(300); } catch (Exception e) { e.printStackTrace(); } for (long divider = begin; divider < end; divider++) { if ((taskCandidate % divider) == 0) { return new Boolean(false); } } return new Boolean(true); } }
The PrimeExampleMW
class that deploys the master and the workers.
package org.objectweb.proactive.examples.userguide.primes.distributedmw; import java.util.ArrayList; import java.util.List; import java.net.URL; import org.objectweb.proactive.extensions.masterworker.ProActiveMaster; /** * * Some primes : 3093215881333057l, 4398042316799l, 63018038201, 2147483647 * * @author The ProActive Team * */ public class PrimeExampleMW { /** * Default interval size */ public static final int INTERVAL_SIZE = 100; public static void main(String[] args) { // The default value for the candidate to test (is prime) long candidate = 2147483647l; // Parse the number from args if there is some if (args.length > 1) { try { candidate = Long.parseLong(args[1]); } catch (NumberFormatException numberException) { System.err.println("Usage: PrimeExampleMW <candidate>"); System.err.println(numberException.getMessage()); } } try { // Create the Master ProActiveMaster<FindPrimeTask, Boolean> master = new ProActiveMaster<FindPrimeTask, Boolean>(); // Deploy resources master.addResources(new URL("file://" + args[0])); // Create and submit the tasks master.solve(createTasks(candidate)); //TODO 3. Wait all results from master */ // Collect results List<Boolean> results = master.waitAllResults(); // Test the primality boolean isPrime = true; for (Boolean result : results) { isPrime = isPrime && result; } // Display the result System.out.println("\n" + candidate + (isPrime ? " is prime." : " is not prime.") + "\n" ); // Terminate the master and free all resources master.terminate(true); } catch (Exception e) { e.printStackTrace(); } finally { System.exit(0); } } /** * Creates the prime computation tasks to be solved * * @return A list of prime computation tasks */ public static List<FindPrimeTask> createTasks(long number) { List<FindPrimeTask> tasks = new ArrayList<FindPrimeTask>(); // We don't need to check numbers greater than the square-root of the // candidate in this algorithm long squareRootOfCandidate = (long) Math.ceil(Math.sqrt(number)); // Begin from 2 the first known prime number long begin = 2; // The number of intervals long nbOfIntervals = (long) Math.ceil(squareRootOfCandidate / INTERVAL_SIZE); // Until the end of the first interval long end = INTERVAL_SIZE; for (int i = 0; i <= nbOfIntervals; i++) { //TODO 4. Create a new task for the current interval and // add it to the list of tasks // Adds the task for the current interval to the list of tasks tasks.add(new FindPrimeTask(number, begin, end)); // Update the begin and the end of the interval begin = end + 1; end += INTERVAL_SIZE; } return tasks; } }
To compile and run the application you need the
PrimeExampleMW
class of this example.
The command line for running the application is the following the optional second parameter being the number to test:
java -Djava.security.policy=proactive.java.policy -Dlog4j.configuration=file:proactive-log4j PrimeExampleMW GCMA.xml 2147483647
© 1997-2008 INRIA Sophia Antipolis All Rights Reserved