The Message Passing Interface (MPI) is a widely adopted communication library for parallel and distributed computing. This work has been designed to make it easier to wrap, deploy and couple several MPI legacy codes, especially on the Grid.
On one hand, we propose a simple wrapping method designed to automatically deploy MPI applications on clusters or desktop Grid through the use of deployment descriptor, allowing an MPI application to be deployed using most protocols and schedulers (LSF, PBS, SSH, SunGRID, etc) . The proposed wrapping also permits programmers to develop conventional stand-alone Java applications using some MPI legacy codes.
On the other hand, we propose a wrapping method with control designed to let SPMD processes associated with one code communicate with the SPMD processors associated with another simulation code. This feature adds the parallel capability of MPI on the Grid with the support of ProActive for inter-process communication between MPI processes at different Grid points. A special feature of the proposed wrapping is the support of "MPI to/from Java application" communications which permit users to exchange data between the two worlds.
The API is organized in the package org.objectweb.proactive.mpi, with the class org.objectweb.proactive.mpi.MPI gathering static methods and the class org.objectweb.proactive.mpi.MPISpmd whose, instances represent and allow to control a given deployed MPI code.
In sum, the following features are proposed:
Simple Wrapping and deployment of MPI code (without changing source)
Wrapping with control:
Deploying an Active Object for control MPI process,
MPI to ProActive Communications,
ProActive to MPI Communications,
MPI to MPI Communication through ProActive.
This work is mainly intended to deploy automatically and transparently MPI parallel applications on clusters. Transparency means that the deployer does not know what particular resources provide computer power. So the deployer should have to finalize the deployment descriptor file and to get back the result of the application without worrying about resources selections, resource locations and types, or mapping processes on resources.
One of the main principle is to specify and wrap the MPI code in an XML descriptor.
Main Features for Deployment:
File Transfer [using XML deployment descriptor]
The primary objective is to provide deployer an automatic deployment of his application through an XML deployment descriptor. In fact, ProActive provides support for File Transfer. In this way, deployer can transfer MPI application input data and/or MPI application code to the remote host. The File Transfer happens before the deployer launches his application. For more details about File Transfer see Section 23.1, “Introduction and Concepts”.
Asking for resources [using XML deployment descriptor]
Deployer describes MPI job requirements in the file deployment descriptor using one or several Virtual Node. He gets back a set of Nodes corresponding to the remote available hosts for the MPI Job execution. For more details (or usage example) about resources booking, have a look at Section 41.1.4, “Using the Infrastructure” .
Control MPI process [using ProActive API]
After deployment, deployer obtains the Virtual Node containing resources required for the MPI job, that is a set of Nodes. The MPI API provides programmer with the ability to create a stateful MPISpmd object from the Virtual Node obtained. To this end the programmer is able to control the MPI program, that is: trigger the job execution, kill the job, synchronize the job, get the object status/result etc..). This API is detailed in the next chapter.
What is an MPISpmd object ?
An MPISpmd object is regarded as an MPI code wrapper. It has the following features :
it holds a state (which can take different value, and reflects the MPI code status)
it can be controlled through an API (presented in the next section)
MPISpmd object creation methods
import org.objectweb.proactive.mpi; /** * creates an MPISpmd object from a Virtual Node which represents the deployment of an MPI code. * Activates the virtual node (i.e activates all the Nodes mapped to this VirtualNode * in the XML Descriptor) if not already activated, and returns an object representing * the MPI deployment process. * The MPI code being deployed is specified in the XML descriptor where the Virtual Node is defined. */ static public MPISpmd MPI.newMPISpmd(VirtualNode virtualNode);
MPISpmd object control methods
/** * Triggers the process execution represented by the MPISpmd object on the resources previously * allocated. This method call is an asynchronous request, thus the call does not * block until the result (MPI result) is used or explicit synchronization is required. The method * immediately returns a future object, more specially a future on an MPIResult object. * As a consequence, the application can go on with executing its code, as long as it doesn't need * to invoke methods on this MPIResult returned object, in which case the calling thread is * automatically blocked if the result of the method invocation is not yet available, i.e. * In practice, mpirun is also called */ public MPIResult startMPI();
/** * Restarts the process represented by the MPISpmd object on the same resources. This process has * to previously been started once with the start method, otherwise the method throws an * IllegalMPIStateException. If current state is Running, the * process is killed and a new independent computation is triggered, * and a new MPIResult object is created. It is also an asynchronous method which returns a future * on an MPIResult object. */ public MPIResult reStartMPI();
/** * Kills the process and OS MPI processes represented by the MPISpmd object. * It returns true if the process was running when it has been killed, false otherwise. */ public boolean killMPI();
/** * Returns the current status of the MPISpmd object. The different status are listed below. */ public String getStatus();
/** * Add or modify the MPI command parameters. It allows programmers to specify arguments to the MPI code. * This method has to be called before startMPI or reStartMPI. */ public void setCommandArguments(String arguments);
MPIResult object
An MPIResult object is obtained with the startMPI/reStartMPI methods call. Rather, these methods return a future on an MPIResult object that does not block application as long as no method is called on this MPIResult object. On the contrary, when a MPIResult object is used, the application is blocked until the MPIResult object is updated, meaning that the MPI program is terminated. The following method gets the exit value of the MPI program.
import org.objectweb.proactive.mpi.MPIResult; /** * Returns the exit value of the MPI program. * By usual convention, the value 0 indicates normal termination. */ public int getReturnValue();
MPISpmd object status
import org.objectweb.proactive.mpi; MPIConstants.MPI_UNSTARTED; // default status - MPISpmd object creation (newMPISpmd) MPIConstants.MPI_RUNNING; // MPISpmd object has been started or restarted MPIConstants.MPI_FINISHED; // MPISpmd object has finished MPIConstants.MPI_KILLED; // MPISpmd object has been killed
Each status defines the current state of the MPISpmd object. It provides the guarantee of application consistency and a better control of the application in case of multiple MPISpmd objects.
First finalize the xml file descriptor to specify the MPI code, and files that have to be transfered on the remote hosts and resources requirement as it is explained at Section 41.1.4, “Using the Infrastructure”. Then, in a Java file import the package org.objectweb.proactive.mpi. In an attempt to keep application consistency, the MPISpmd object makes use of status. It guarantees that either a method called on object is coherent or an exception is thrown. Especially the exception IllegalMPIStateException signals a method that has been called at an illegal or inappropriate time. In other words, an application is not in an appropriate state for the requested operation.
An application does not require to declare in its throws clause because IllegalMPIStateException is a subclass of RuntimeException. The graph above presents a kind of finite state machine or finite automaton, that is a model of behavior composed of states (status of the MPISpmd object) and transition actions (methods of the API). Once the MPISpmd object is created (newMPISpmd), the object enters in the initial state: ProActiveMPIConstants.MPI_UNSTARTED.
Sample of code (available in the release) These few lines show how to execute the MPI executive jacobi, and show how to get its return value once finished. No modification have to be made to the source code.
import org.objectweb.proactive.mpi.*; ... // load the file descriptor ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml'); // get the Virtual Node that references the jacobi MPI code you want to execute VirtualNode jacobiVN = pad.getVirtualNode('JACOBIVN'); // activate Virtual Node (it's not mandatory because the MPI.newMPISpmd method does // it automatically if it has not been already done) jacobiVN.activate(); // create the MPISpmd object with the Virtual Node MPISpmd jacobiSpmd = MPI.newMPISpmd(jacobiVN); // trigger jacobi mpi code execution and get a future on the MPIResult MPIResult jacobiResult = jacobiSpmd.startMPI(); // print current status logger.info("Current status: "+jacobiSpmd.getStatus()); // get return value (block the thread until the jacobiResult is available) logger.info("Return value: "+jacobiResult.getReturnValue()); // print the MPISpmd object caracteristics (name, current status, processes number ...) logger.info(jacobiSpmd); ...
Resources booking and MPI code are specified using ProActive Descriptors. We have explained the operation with an example included in the release. The deployment goes through sh, then PBS, before launching the MPI code on 16 nodes of a cluster. The entire file is available in Example C.37, “MPI Wrapping: mpi_files/MPIRemote-descriptor.xml”.
File Transfer: specify all the files which have to be transferred on the remote host like binary code and input data. In the following example, jacobi is the binary of the MPI program. For further details about File Transfer see Section 23.1, “Introduction and Concepts”.
<componentDefinition> <virtualNodesDefinition> <virtualNode name="JACOBIVN" /> </virtualNodesDefinition> </componentDefinition> <deployment> ... </deployment> <fileTransferDefinitions> <fileTransfer id="jacobiCodeTransfer"> <file src="jacobi" dest="jacobi" /> </fileTransfer> </fileTransferDefinitions>
Resource allocation: define processes for resource reservation. See Section 21.7, “Infrastructure and processes” for more details on processes.
SSHProcess: first define the process used to contact the remote host on which resources will be reserved. Link the reference ID of the file transfer with the FileTransfer previously defined, and link the reference ID to the DependentProcessSequence process explained below.
<processDefinition id="sshProcess"> <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess" hostname="nef.inria.fr" username="user"> <processReference refid="jacobiDependentProcess" /> <fileTransferDeploy refid="jacobiCodeTransfer"> <copyProtocol>scp</copyProtocol> <sourceInfo prefix= "/user/user/home/ProActive/src/org/objectweb/proactive/examples/mpi" /> <destinationInfo prefix="/home/user/MyApp"/> </fileTransferDeploy> </sshProcess> </processDefinition>
DependentProcessSequence: This process is used when a process is dependent on another process. The first process of the list can be any process of the infrastructure of processes in ProActive, but the second has to be imperatively a DependentProcess, that is to implement the org.objectweb.proactive.core.process.DependentProcess interface. The following lines express that the mpiProcess is dependent on the resources allocated by the pbsProcess.
<processDefinition id="jacobiDependentProcess"> <dependentProcessSequence class="org.objectweb.proactive.core.process.DependentListProcess"> <processReference refid="jacobiPBSProcess"/> <processReference refid="jacobiMPIProcess"/> </dependentProcessSequence> </processDefinition>
PBS Process: note that you can use any services defined in ProActive to allocate resources instead of the PBS one.
<processDefinition id="jacobiPBSProcess"> <pbsProcess class="org.objectweb.proactive.core.process.pbs.PBSSubProcess"> <processReference refid="jvmProcess" /> <commandPath value="/opt/torque/bin/qsub" /> <pbsOption> <hostsNumber>16</hostsNumber> <processorPerNode>1</processorPerNode> <bookingDuration>00:02:00</bookingDuration> <scriptPath> <absolutePath value="/home/smariani/pbsStartRuntime.sh" /> </scriptPath> </pbsOption> </pbsProcess> </processDefinition>
MPI process: defines the MPI actual code to be deployed (executable) and its attributes. It is possible to pass a command option to mpirun by filling the attribute mpiCommandOptions. Specify the number of hosts you wish the application to be deployed on, and at least the MPI code local path. The local path is the path from which you start the application.
<processDefinition id="jacobiMPIProcess"> <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess" mpiFileName="jacobi" mpiCommandOptions="input_file.dat output_file.dat"> <commandPath value="/usr/src/redhat/BUILD/mpich-1.2.6/bin/mpirun" /> <mpiOptions> <processNumber>16</processNumber> <localRelativePath> <relativePath origin="user.home" value="/ProActive/scripts/unix"/> </localRelativePath> <remoteAbsolutePath> <absolutePath value="/home/smariani/MyApp"/> </remoteAbsolutePath> </mpiOptions> </mpiProcess> </processDefinition>
Let's assume we want to interconnect together several modules (VibroToAcous, AcousToVibro, Vibro, Acous, CheckConvergency) which are each a parallel MPI binary code.
import org.objectweb.proactive.ProActive; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.config.ProActiveConfiguration; import org.objectweb.proactive.core.descriptor.data.ProActiveDescriptor; import org.objectweb.proactive.core.descriptor.data.VirtualNode; ... // load the file descriptor ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml'); // get the Virtual Nodes which references all the MPI code we want to use VirtualNode VibToAc = pad.getVirtualNode("VibToAc"); VirtualNode AcToVib = pad.getVirtualNode("AcToVib"); VirtualNode Vibro = pad.getVirtualNode("Vibro"); VirtualNode Acous = pad.getVirtualNode("Acous"); VirtualNode CheckConvergency = pad.getVirtualNode("CheckConvergency"); // it's not necessary to activate manually each Virtual Node because it's done // when creating the MPISpmd object with the Virtual Node // create MPISpmd objects from Virtual Nodes MPISpmd vibToAc = MPI.newMPISpmd(VibToAc); MPISpmd acToVib = MPI.newMPISpmd(AcToVib); MPISpmd vibro = MPI.newMPISpmd(Vibro); MPISpmd acous = MPI.newMPISpmd(Acous); // create two different MPISpmd objects from a single Virtual Node MPISpmd checkVibro = MPI.newMPISpmd(CheckConvergency); MPISpmd checkAcous = MPI.newMPISpmd(CheckConvergency); // create MPIResult object for each MPISpmd object MPIResult vibToAcRes, acToVibRes, vibroRes, acousRes, checkVibroRes, checkAcousRes; boolean convergence = false; boolean firstLoop = true; While (!convergence) { // trigger execution of vibToAc and acToVib MPISpmd object if (firstLoop){ vibToAcRes = vibToAc.startMPI(); acToVibRes = acToVib.startMPI(); }else{ vibToAcRes = vibToAc.reStartMPI(); acToVibRes = acToVib.reStartMPI(); } // good termination? if (( vibToACRes.getReturnValue() < 0 ) || ( acToVibRes.getReturnValue() < 0 )) System.exit(-1); // trigger execution of vibro and acous MPISpmd object if (firstLoop){ vibroRes = vibro.startMPI(); acousRes = acous.startMPI(); }else{ vibroRes = vibro.reStartMPI(); acousRes = acous.reStartMPI(); } // good termination? if (( vibroRes.getReturnValue() < 0 ) || ( acousRes.getReturnValue() < 0 )) System.exit(-1); // Check convergency of acoustic part and structure part if (firstLoop){ // modify argument checkVibro.setCommandArguments("oldVibro.res newVibro.res"); checkAcous.setCommandArguments("oldAcous.res newAcous.res"); checkVibroRes = checkVibro.startMPI(); checkAcousRes = checkAcous.startMPI(); }else{ checkVibroRes = checkVibro.reStartMPI(); checkAcousRes = checkAcous.reStartMPI(); } // Convergency? if (( checkVibroRes.getReturnValue() == 0 ) || ( checkAcousRes.getReturnValue() == 0 )) { convergence = true; } firstLoop = false; } // free resources VibToAc.killAll(false); AcToVib.killAll(false); Vibro.killAll(false); Acous.killAll(false); CheckConvergency.killAll(false);
Some MPI applications may decompose naturally into components that are better suited to execute on different plateforms, e.g., a simulation component and a visualization component; other applications may be too large to fit on one system. If each subsystem is a parallel system, then MPI is likely to be used for "intra-system" communication, in order to achieve better performance thanks to MPI vendor MPI libraries, as compared to the generic TCP/IP implementations.
ProActive makes it possible to deploy at once a set of MPI applications on a set of clusters or desktop machines. Moreover, this section will also demonstrate how to deploy at the same time a set of ProActive JVMs, to be used mainly for the sake of two aspects:
communicating between the different codes,
controlling, and synchronizing the execution of several (coupled) MPI codes.
"Inter-system" message passing is implemented by ProActive asynchronous remote method invocations. An MPI process may participate both in intra-system communication, using the native MPI implementation, and in inter-system communication, with ProActive through JNI (Java Native Interface) layered on top of IPC system V.
This wrapping defines a cross implementation protocol for MPI that enables MPI implementations to run very efficiently on each subsystem, and ProActive to allow interoperability between each subsystem. A parallel computation will be able to span multiple systems both using the native vendor message passing library and ProActive on each system. New ProActive specific MPI API are supporting these features. The goal is to support some point-to-point communication functions for communication across systems, as well as some collectives. This binding assume that inter-system communication uses ProActive between each pair of communicating systems, while intra-system communication uses proprietary protocols, at the discretion of each vendor MPI implementation.
The API for the wrapping with control is organized in the package org.objectweb.proactive.mpi.control, with the class org.objectweb.proactive.mpi.control.ProActiveMPI gathering static method for deployment.
First the principle to wrap MPI code is similar to the Simple Wrapping method: deployer describes MPI job requirements in the file deployment descriptor using a Virtual Node and gets back a set of Nodes corresponding to the remote available hosts for the MPI Job execution. After deployment, deployer obtains the Virtual Node containing a set of Nodes on which the whole MPI processes will be mapped.
Further, to ensure control, an Active Object is deployed on each Node where an MPI process resides. The Active Object has a role of wrapper/proxy, redirecting respectively local MPI process output messages to the remote recipient(s) and incoming messages to the local MPI process. For more details, please refer to Section 41.2.4, “MPI to MPI Communications through ProActive”.
This approach provides programmer with the ability to deploy some instances of his own classes on any Node(s) using the API defined below. It permits programmer to capture output messages of MPI process towards his own classes, and to send new messages towards any MPI process of the whole application. For more details, please refer to Section 41.2.2, “MPI to ProActive Communications” and Section 41.2.3, “ProActive to MPI Communications”. The deployment of Java Active Objects takes place after all MPI processes have started and once the ProActiveMPI_Init() function has been called. That way the implementation can ensure that, when an SPMD group of Active Objects is created by calling the newActiveSpmd function on an MPISpmd object, then programmer SPMD instance ranks will match with the MPI process ones.
MPISpmd object methods
For more details about MPISpmd object creation, please refer to Section 41.1.2, “API For Deploying MPI Codes”.
import org.objectweb.proactive.mpi; /** * Builds (and deploys) an 'SPMD' group of Active objects with all references between them * to communicate. This method creates objects of type class on the same nodes on which * this MPISpmd object has deployed the MPI application, with no parameters. * There's a bijection between mpi process rank of the application deployed by this * MPISpmd object and the rank of each active object of the 'SPMD' group. */ public void newActiveSpmd(String class);
import org.objectweb.proactive.mpi; /** * Builds (and deploys) an 'SPMD' group of Active objects class on the same nodes on which * this MPISpmd object has deployed the MPI application. * Params contains the parameters used to build the group's member. * There's a bijection between mpi process rank of the application deployed by this * MPISpmd object and the rank of each active object of the 'SPMD' group */ public void newActiveSpmd(String class, Object[] params);
import org.objectweb.proactive.mpi; /** * Builds (and deploys) an 'SPMD' group of Active objects of type class on the same * nodes on which this MPISpmd object has deployed the MPI application. * Params contains the parameters used to build the group's member. * There's a bijection between mpi process rank of the application deployed by this * MPISpmd object and the rank of each active object of the 'SPMD' group */ public void newActiveSpmd(String class, Object[][] params);
import org.objectweb.proactive.mpi; /** * Builds (and deploys) an Active object of type class on the same node where the mpi process * of the application deployed with this MPISpmd object has rank rank. * Params contains the parameters used to build the active object */ public void newActive(String class, Object[] params, int rank); throws ArrayIndexOutOfBoundsException - if the specified rank is greater than number of nodes
Deployment method
The MPI API in the package org.objectweb.proactive.mpi provides programmer with the ability to create an MPISpmd object from the Virtual Node obtained. The following static method is used to achieve MPI processes registration and job number attribution. Each MPI process belongs to a global job, which permits to make difference between two MPI processes with same rank in the whole application. For instance, it would exist a first root process which belongs to job 0 (the first MPI application) and a second root process which belongs to job 1 (the second MPI application). The JobID of an MPI code is directly given by the rank of the MPISpmd Object in the ArrayList at deployment time.
import org.objectweb.proactive.mpi; /** * Deploys and starts (startMPI() being called) all MPISpmd objects contained in the list mpiSpmdObjectList. */ static public void ProActiveMPI.deploy(ArrayList mpiSpmdObjectList);
The following piece of code is an example of a java main program which shows how to use the wrapping with control feature with two codes. The xml file descriptor is finalized exactly in the same manner that for the Simple Wrapping. For more details about writing a file descriptor, please refer to Section 41.1.4, “Using the Infrastructure”.
import org.objectweb.proactive.mpi.*; ... // load the file descriptor ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml'); // get the Virtual Nodes which reference the different MPI codes VirtualNode vnA = pad.getVirtualNode("CLUSTER_A"); VirtualNode vnB = pad.getVirtualNode("CLUSTER_B"); // create the MPISpmd objects with the Virtual Nodes MPISpmd spmdA = MPI.newMPISpmd(vnA); MPISpmd spmdB = MPI.newMPISpmd(vnB); Object[][] params = new Object[][]{{param_on_node_1},{param_on_node_2}, {param_on_node_3}}; // deploy "MyClass" as an 'SPMD' group on same nodes that spmdA object, with the list of parameters // defined above spmdA.newActiveSpmd("MyClass", params); // deploy "AnotherClass" on the node where the mpi process of the application is rank 0, // with no parameters spmdB.newActiveSpmd("AnotherClass", new Object[]{}, 0); // create the list of MPISpmd objects (First MPI job is job with value 0, second is job with value 1 etc... ) ArrayList spmdList = new ArrayList(); spmdList.add(spmdA); spmdList.add(spmdB); // deploy and start the listed MPISpmd objects ProActiveMPI.deploy(spmdList); ...
The wrapping with control allows the programmer to send messages from MPI to Java Objects. Of course these classes have to be previously deployed using the API seen above. This feature could be useful for example if a simulation code is an MPI computation and the visualization component is a java code. All MPI Code that need to be controled or communicate through ProActive needs to call the ProActiveMPI_Init() function detailed in the Section 41.2.4, “MPI to MPI Communications through ProActive”
ProActiveSend Performs a basic send from mpi side to a ProActive java class Synopsis #include "ProActiveMPI.h" int ProActiveSend(void* buf, int count, MPI_Datatype datatype, int dest, char* className, char* methodName, int jobID, ...); Input Parameters buf initial address of send buffer count number of elements in send buffer (nonnegative integer) datatype datatype of each send buffer element dest rank of destination(integer) classNamename of class methodNamename of the method to be called jobID remote or local job (integer) variable arguments string parameters to be passed to the method
The ProActiveMPIData class belongs to the package org.objectweb.proactive.mpi.control. While a message is sent from MPI side, a corresponding object ProActiveMPIData is created on java side and is passed as parameter to the method which name is specified in the ProActiveSend method, called by MPI. The ProActiveMPIData object contains severals fields that can be useful to the programmer. The following methods are available:
import org.objectweb.proactive.mpi.control; /** * return the rank of the MPI process that sent this message */ public int getSrc();
/** * return the sender job ID */ public int getJobID();
/** * return the type of elements in the buffer data contained in the message. * The type can be compared with the constants defined in the class ProActiveMPIConstants * in the same package. */ public int getDatatype();
/** * return the parameters as an array of String specified in the ProActiveSend method call. */ public String [] getParameters();
/** * return the data buffer as an array of primitive type byte. */ public byte [] getData();
/** * return the number of elements in the buffer. */ public int getCount();
The ProActiveMPIUtil class in the package org.objectweb.proactive.mpi.control.util brings together a set of static function for conversion. In fact, the programmer may use the following functions to convert an array of bytes into an array of elements with a different type:
/* Given a byte array, restore it as an int * param bytes the byte array * param startIndex the starting index of the place the int is stored */ public static int bytesToInt(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a float * param bytes the byte array * param startIndex the starting index of the place the float is stored */ public static float bytesToFloat(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a short * param bytes the byte array * param startIndex the starting index of the place the short is stored */ public static short bytesToShort(byte[] bytes, int startIndex);
/* * Given a byte array, restore a String out of it. * the first cell stores the length of the String * param bytes the byte array * param startIndex the starting index where the string is stored, * the first cell stores the length * ret the string out of the byte array. */ public static String bytesToString(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a long * param bytes the byte array * param startIndex the starting index of the place the long is stored */ public static long bytesToLong(byte[] bytes, int startIndex);
/* Given a byte array, restore it as a double * param bytes the byte array * param startIndex the starting index of the place the double is stored */ public static double bytesToDouble(byte[] bytes, int startIndex);
Main program [ProActive deployment part]
import org.objectweb.proactive.mpi.*; ... // load the file descriptor ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml'); // get the Virtual Nodes which reference the different MPI codes VirtualNode vnA = pad.getVirtualNode("CLUSTER_A"); // create the MPISpmd object with the Virtual Node MPISpmd spmdA = MPI.newMPISpmd(vnA); // deploy "MyClass" on same node that mpi process #3 spmdA.newActive("MyClass", new Object[]{}, 3); // create the list of MPISpmd objects ArrayList spmdList = new ArrayList(); spmdList.add(spmdA); // deploy and start the listed MPISpmd objects ProActiveMPI.deploy(spmdList); ...
Programmer class definition
public class MyClass{ public MyClass() { } // create a method with a ProActiveMPIData parameter which will be called by the MPI part public void foo(ProActiveMPIData data){ int icnt = m_r.getCount(); for (int start = 0; start < data.getData().length; start = start + 8) { // print the buffer received by converting the bytes array to an array of doubles System.out.print(" buf["+(icnt++)+"]= " + ProActiveMPIUtil.bytesToDouble(data.getData(), start)); } } }
MPI Side
#include <stdio.h> #include "mpi.h" #include "ProActiveMPI.h" // variables declaration ... // initialize MPI environment MPI_Init( &argc, &argv ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Comm_size( MPI_COMM_WORLD, &size); // initialize MPI with ProActive environment ProActiveMPI_Init(rank); // get this process job number ProActiveMPI_Job(&myjob); // send a buffer of maxn doubles to MyClass"Active Object, located on the same // host that mpi process #3 of job #0, by calling method "foo" with some parameters. if ((rank == 0) && (myjob == 0)){ error = ProActiveSend(xlocal[0], maxn, MPI_DOUBLE, 3, "MyClass", "foo", 0, "params1", "params2", NULL ); if (error < 0){ printf("!!! Error Method call ProActiveSend \n"); } } ProActiveMPI_Finalize(); MPI_Finalize( ); return 0; }
Snapshot of this example
The wrapping with control allows programmer to pass some messages from his own classes to the MPI computation. Of course these classes have to be previously deployed using the API seen at Section 41.2.1.1, “Java API”. This feature could be useful for example if the programmer want to control the MPI code by sending some "start" or "stop" messages during computation.
Send Function
import org.objectweb.proactive.mpi.control; /** * Sends a buffer of bytes containing count elements of type datatype * to destination dest of job jobID * The datatypes are listed below */ static public void ProActiveMPICoupling.MPISend(byte[] buf, int count, int datatype, int dest, int tag, int jobID);
Datatypes
The following constants have to be used with the ProActiveMPICoupling.MPISend method to fill the datatype parameter.
import org.objectweb.proactive.mpi.control; MPIConstants.MPI_CHAR; eMPIConstants.MPI_UNSIGNED_CHAR; MPIConstants.MPI_BYTE; MPIConstants.MPI_SHORT; MPIConstants.MPI_UNSIGNED_SHORT; MPIConstants.MPI_INT; MPIConstants.MPI_UNSIGNED; MPIConstants.MPI_LONG; MPIConstants.MPI_UNSIGNED_LONG; MPIConstants.MPI_FLOAT; MPIConstants.MPI_DOUBLE; MPIConstants.MPI_LONG_DOUBLE; MPIConstants.MPI_LONG_LONG_INT;
ProActiveRecv Performs a blocking receive from mpi side to receive data from a ProActive java class Synopsis #include "ProActiveMPI.h" int ProActiveRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID); Output Parameters buf initial address of receive buffer Input Parameters count number of elements in send buffer (nonnegative integer) datatype datatype of each recv buffer element src rank of source (integer) tag message tag (integer) jobID remote job (integer)
ProActiveIRecv Performs a non blocking receive from mpi side to receive data from a ProActive java class Synopsis #include "ProActiveMPI.h" int ProActiveIRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request); Output Parameters request communication request (handle) Input Parameters buf initial address of receive buffer count number of elements in send buffer (nonnegative integer) datatype datatype of each recv buffer element src rank of source (integer) tag message tag (integer) jobID remote job (integer)
ProActiveTest Tests for the completion of receive from a ProActive java class Synopsis #include "ProActiveMPI.h" int ProActiveTest(ProActiveMPI_Request *request, int *flag); Output Parameters flag true if operation completed (logical) Input Parameters request communication request (handle)
ProActiveWait Waits for an MPI receive from a ProActive java class to complete Synopsis #include "ProActiveMPI.h" int ProActiveWait(ProActiveMPI_Request *request); Input Parameters request communication request (handle)
The following example shows how to send some messages from a ProActive class to his MPI computation.
Main program [ProActive deployment part]
import org.objectweb.proactive.mpi.*; ... // load the file descriptor ProActiveDescriptor pad = ProActive.getProactiveDescriptor('file:descriptor.xml'); // get the Virtual Nodes which reference the different MPI codes VirtualNode vnA = pad.getVirtualNode("CLUSTER_A"); // create the MPISpmd object with the Virtual Node MPISpmd spmdA = MPI.newMPISpmd(vnA); // deploy "MyClass" on same node that mpi process #3 spmdA.newActive("MyClass", new Object[]{}, 3); // create the list of MPISpmd objects ArrayList spmdList = new ArrayList(); spmdList.add(spmdA); // deploy and start the listed MPISpmd objects ProActiveMPI.deploy(spmdList); ...
Programmer class definition
Assume for example the "postTreatmentForVisualization" method. It is called at each iteration from MPI part, gets the current array of doubles generated by the MPI computation and makes a java post treatment in order to visualize them in a java viewer. If the java computation fails, the method sends a message to MPI side to abort the computation.
import org.objectweb.proactive.mpi.control; public class MyClass{ public MyClass() { } // create a method with a ProActiveMPIData parameter public void postTreatmentForVisualization(ProActiveMPIData data){ int icnt = m_r.getCount(); double [] buf = new double [icnt]; int error = 0; for (int start = 0; start < data.getData().length; start = start + 8) { // save double in a buffer buf[start/8]=ProActiveMPIUtil.bytesToDouble(data.getData(), start); } // make data post-treatment for visualization ... if (error == -1){ // convert int to double byte [] byteArray = new byte [4]; ProActiveMPIUtil.intToBytes(error, byteArray, 0); // send message to the local MPI process to Abort computation ProActiveMPICoupling.MPISend(byteArray, 1, ProActiveMPIConstants.MPI_INT, 3, 0, 0); } }
MPI Side
#include <stdio.h> #include "mpi.h" #include "ProActiveMPI.h" // variables declaration short buf; ProActiveMPI_Request request; int flag; // initialize MPI environment MPI_Init( &argc, &argv ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Comm_size( MPI_COMM_WORLD, &size); // initialize MPI with ProActive environment ProActiveMPI_Init(rank); // get this process job number ProActiveMPI_Job(&myjob); // computation for (itcnt=0; itcnt<10000; itcnt++){ // call the "postTreatmentForVisualization" method in "MyClass" Active Object, // located on the same host that root process of job #0 and send the current data // generated by the computation if ((rank == 0) && (myjob == 0)){ error = ProActiveSend(xlocal[0], 1, MPI_DOUBLE, 3, "MyClass", "postTreatmentForVisualization", 0,NULL ); if (error < 0){ printf("!!! Error Method call ProActiveSend \n"); } } // perform a non-blocking recv if ((rank == 3) && (myjob == 0)){ error = ProActiveIRecv(&buf, 1 , MPI_INT, 3, 0, 0, &request); if (error < 0){ printf("!!! Error Method call ProActiveIRecv \n"); } } // do computation ... // check if a message arrived from ProActive side if ((rank == 3) && (myjob == 0)){ error = ProActiveTest(&request, &flag); if (error < 0){ printf("!!! Error Method call ProActiveTest \n"); } // if a message is captured, flag is true and buf contains message // it is not mandatory to check the value of the buffer because we know that // the reception of a message is due to a failure of java side computation. if (flag == 1){ MPI_Abort(MPI_COMM_WORLD, 1); } } } ProActiveMPI_Finalize(); MPI_Finalize( ); return 0; }
Snapshot of this example
The ProActiveMPI features handles the details of starting and shutting down processes on different system and coordinating execution. However passing data between the processes is explicitly specified by the programmer in the source code, depending on whether messages are being passed between local or remote systems, programmer would choose respectively either the MPI API or the ProActiveMPI API defined below.
ProActiveMPI_Init Initializes the MPI with ProActive execution environment Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Init(int rank); Input Parameters rank the rank of the mpi process previously well initialized with MPI_Init
ProActiveMPI_Job Initializes the job environment variable Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Job(int *job); Output Parameters job job the mpi process belongs to
ProActiveMPI_Finalize Terminates MPI with ProActive execution environment Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Finalize();
ProActiveMPI_Send Performs a basic send Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, int jobID ); Input Parameters buf initial address of send buffer count number of elements in send buffer (nonnegative integer) datatype datatype of each send buffer element dest rank of destination (integer) tag message tag (integer) jobID remote job (integer)
ProActiveMPI_Recv Performs a basic Recv Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID); Output Parameters buf initial address of receive buffer (choice) Input Parameters count number of elements in recv buffer (nonnegative integer) datatype datatype of each recv buffer element src rank of source (integer) tag message tag (integer) jobID remote job (integer)
ProActiveMPI_IRecv Performs a non blocking receive Synopsis #include "ProActiveMPI.h" int ProActiveMPI_IRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request); Output Parameters request communication request (handle) Input Parameters buf initial address of receive buffer count number of elements in send buffer (nonnegative integer) datatype datatype of each recv buffer element src rank of source (integer) tag message tag (integer) jobID remote job (integer)
ProActiveMPI_Test Tests for the completion of receive Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Test(ProActiveMPI_Request *request, int *flag); Output Parameters flag true if operation completed (logical) Input Parameters request communication request (handle)
ProActiveMPI_Wait Waits for an MPI receive to complete Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Wait(ProActiveMPI_Request *request); Input Parameters request communication request (handle)
ProActiveMPI_AllSend Performs a basic send to all processes of a remote job Synopsis #include "ProActiveMPI.h" int ProActiveMPI_AllSend(void *buf, int count, MPI_Datatype datatype, int tag, int jobID); Input Parameters buf initial address of send buffer count number of elements in send buffer (nonnegative integer) datatype datatype of each recv buffer element tag message tag (integer) jobID remote job (integer)
ProActiveMPI_Barrier Blocks until all process of the specified job have reached this routine No synchronization is enforced if jobID is different from current jobID, and -1 is returned. Synopsis #include "ProActiveMPI.h" int ProActiveMPI_Barrier(int jobID); Input Parameters jobID jobID for which the caller is blocked until all members have entered the call.
#include <stdio.h> #include "mpi.h" #include "ProActiveMPI.h" // variables declaration ... // initialize MPI environment MPI_Init( &argc, &argv ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Comm_size( MPI_COMM_WORLD, &size); // initialize MPI with ProActive environment ProActiveMPI_Init(rank); // get this process job number ProActiveMPI_Job(&myjob); // send from process (#size, #0) to (#0, #1) [#num_process, #num_job] if ((rank == size-1) && (myjob==0)){ error = ProActiveMPI_Send(xlocal[maxn/size], maxn, MPI_DOUBLE, 0, 0, 1); if (error < 0){ printf(" Error while sending from #%d-%d \n", rank, myjob);} } // recv (#0, #1) from (#size, #0) if ((rank == 0) && (myjob==1)) { error = ProActiveMPI_Recv(xlocal[0], maxn, MPI_DOUBLE, size-1, 0, 0); if (error < 0){ printf(" Error while recving with #%d-%d \n", rank, myjob);} } ProActiveMPI_Finalize(); MPI_Finalize( ); return 0; }
The Jacobi relaxation method for solving the Poisson equation has become a classic example of applying domain decomposition to parallelize a problem. Briefly, the original domain is divided into sub-domains. Figure below illustrates dividing a 12x12 domain into two domains with two 12x3 sub-domains (one-dimensional decomposition). Each sub-domain is associated with a single cpu of a cluster, but one can divide the original domain into as many domains as there are clusters and as many sub-domains as there are cpu's. The iteration in the interior (green) cells can proceed independently of each other. Only the perimeter (red) cells need information from the neighbouring sub-domains. Thus, the values of the solution in the perimeter must be sent to the "ghost" (blue) cells of the neighbours, as indicated by the arrows. The amount of data that must be transferred between cells (and the corresponding nodes) is proportional to the number of cells in one dimension, N.
In example below, the domain decomposition is applied on two clusters. The domain is a 1680x1680 mesh divided in 16 sub-domains of 1680x280 on each cluster.
To compile the ProActiveMPI package, you may enter the ProActive/compile directory and type:
linux> build clean ProActiveMPI
Note | |
---|---|
The compilation requires an implementation of MPI installed on your machine otherwise it leads an error. |
If build is successful, it will:
compile recursively all java classes in the org.objectweb.proactive.mpi package.
generate the native library that all wrapper/proxy Active Objects will load in their JVM.
execute the configure script in directory org/objectweb/proactive/mpi/control/config. The script -configure- generates a Makefile in same directory. The Makefile permits to compile MPI source code which contains the ProActiveMPI functions.
For more details about writing a file descriptor, please refer to Section 41.1.4, “Using the Infrastructure”.
<?xml version="1.0" encoding="UTF-8"?> <ProActiveDescriptor xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation= "http://www-sop.inria.fr/oasis/proactive/schema/3.2/DescriptorSchema.xsd"> <variables> <descriptorVariable name="PROACTIVE_HOME" value="ProActive"/> <descriptorVariable name="LOCAL_HOME" value="/home/smariani"/> <descriptorVariable name="REMOTE_HOME_NEF" value="/home/smariani"/> <descriptorVariable name="REMOTE_HOME_NINA" value="/user/smariani/home"/> <descriptorVariable name="MPIRUN_PATH_NEF" value= "/usr/src/redhat/BUILD/mpich-1.2.6/bin/mpirun"/> <descriptorVariable name="MPIRUN_PATH_NINA" value= "/user/smariani/home/mpich-1.2.6/bin/mpirun"/> <descriptorVariable name="QSUB_PATH" value="/opt/torque/bin/qsub"/> </variables> <componentDefinition> <virtualNodesDefinition> <virtualNode name="Cluster_Nef" /> <virtualNode name="Cluster_Nina" /> </virtualNodesDefinition> </componentDefinition> <deployment> <mapping> <map virtualNode="Cluster_Nef"> <jvmSet> <vmName value="Jvm1" /> </jvmSet> </map> <map virtualNode="Cluster_Nina"> <jvmSet> <vmName value="Jvm2" /> </jvmSet> </map> </mapping> <jvms> <jvm name="Jvm1"> <creation> <processReference refid="sshProcess_nef" /> </creation> </jvm> <jvm name="Jvm2"> <creation> <processReference refid="sshProcess_nina" /> </creation> </jvm> </jvms> </deployment> <fileTransferDefinitions> <fileTransfer id="JACOBI"> <!-- Transfer mpi program on remote hosts --> <file src="jacobi" dest="jacobi" /> </fileTransfer> </fileTransferDefinitions> <infrastructure> <processes> <processDefinition id="localJVM_NEF"> <jvmProcess class="org.objectweb.proactive.core.process.JVMNodeProcess"> <classpath> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/ProActive.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/asm.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/log4j.jar" /> <absolutePath value= "${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/components/fractal.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/xercesImpl.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/bouncycastle.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/jsch.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/lib/javassist.jar" /> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/classes" /> </classpath> <javaPath> <absolutePath value="${REMOTE_HOME_NEF}/jdk1.5.0_05/bin/java" /> </javaPath> <policyFile> <absolutePath value="${REMOTE_HOME_NEF}/proactive.java.policy" /> </policyFile> <log4jpropertiesFile> <absolutePath value="${REMOTE_HOME_NEF}/${PROACTIVE_HOME}/compile/proactive-log4j" /> </log4jpropertiesFile> <jvmParameters> <parameter value="-Dproactive.useIPaddress=true" /> <parameter value="-Dproactive.rmi.port=6099" /> <!-- DO NOT FORGET TO SET THE java.library.path VARIABLE to the remote directory path of the application --> <parameter value="-Djava.library.path=${REMOTE_HOME_NEF}/MyApp" /> </jvmParameters> </jvmProcess> </processDefinition> <processDefinition id="localJVM_NINA"> <jvmProcess class="org.objectweb.proactive.core.process.JVMNodeProcess"> <classpath> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/ProActive.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/asm.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/log4j.jar" /> <absolutePath value= "${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/components/fractal.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/xercesImpl.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/bouncycastle.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/jsch.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/lib/javassist.jar" /> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/classes" /> </classpath> <javaPath> <absolutePath value="/user/smariani/home/NOSAVE/jdk1.5.0_05/bin/java"/> </javaPath> <policyFile> <absolutePath value="${REMOTE_HOME_NINA}/proactive.java.policy"/> </policyFile> <log4jpropertiesFile> <absolutePath value="${REMOTE_HOME_NINA}/${PROACTIVE_HOME}/compile/proactive-log4j" /> </log4jpropertiesFile> <jvmParameters> <parameter value="-Dproactive.useIPaddress=true" /> <parameter value="-Dproactive.rmi.port=6099" /> <!-- DO NOT FORGET TO SET THE java.library.path VARIABLE to the remote directory path of the application --> <parameter value="-Djava.library.path=${REMOTE_HOME_NINA}/MyApp" /> </jvmParameters> </jvmProcess> </processDefinition> <!-- pbs Process --> <processDefinition id="pbsProcess"> <pbsProcess class="org.objectweb.proactive.core.process.pbs.PBSSubProcess"> <processReference refid="localJVM_NEF" /> <commandPath value="${QSUB_PATH}" /> <pbsOption> <!-- ask for 16 nodes on cluster nef (8 hosts, 2 nodes per machine)--> <hostsNumber>8</hostsNumber> <processorPerNode>2</processorPerNode> <bookingDuration>01:00:00</bookingDuration> <scriptPath> <absolutePath value="${REMOTE_HOME_NEF}/pbsStartRuntime.sh" /> </scriptPath> </pbsOption> </pbsProcess> </processDefinition> <processDefinition id="lsfProcess"> <bsubProcess class="org.objectweb.proactive.core.process.lsf.LSFBSubProcess"> <processReference refid="localJVM_NINA"/> <bsubOption> <!-- ask for 16 nodes on cluster nina (8 hosts, 2 nodes per machine)--> <processor>16</processor> <resourceRequirement value="span[ptile=2]"/> <scriptPath> <absolutePath value="${REMOTE_HOME_NINA}/startRuntime.sh"/> </scriptPath> </bsubOption> </bsubProcess> </processDefinition> <!-- mpi Process --> <processDefinition id="mpiProcess_nef"> <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess" mpiFileName="jacobi" > <commandPath value="${MPIRUN_PATH_NEF}" /> <mpiOptions> <processNumber>16</processNumber> <localRelativePath> <relativePath origin="user.home" value="Test" /> </localRelativePath> <remoteAbsolutePath> <absolutePath value="${REMOTE_HOME_NEF}/MyApp" /> </remoteAbsolutePath> </mpiOptions> </mpiProcess> </processDefinition> <!-- mpi Process --> <processDefinition id="mpiProcess_nina"> <mpiProcess class="org.objectweb.proactive.core.process.mpi.MPIDependentProcess" mpiFileName="jacobi" > <commandPath value="${MPIRUN_PATH_NINA}" /> <mpiOptions> <processNumber>16</processNumber> <localRelativePath> <relativePath origin="user.home" value="Test" /> </localRelativePath> <remoteAbsolutePath> <absolutePath value="${REMOTE_HOME_NINA}/MyApp" /> </remoteAbsolutePath> </mpiOptions> </mpiProcess> </processDefinition> <!-- dependent process --> <processDefinition id="dpsProcess_nef"> <dependentProcessSequence class= "org.objectweb.proactive.core.process.DependentListProcess"> <processReference refid="pbsProcess" /> <processReference refid="mpiProcess_nef" /> </dependentProcessSequence> </processDefinition> <!-- dependent process --> <processDefinition id="dpsProcess_nina"> <dependentProcessSequence class= "org.objectweb.proactive.core.process.DependentListProcess"> <processReference refid="lsfProcess" /> <processReference refid="mpiProcess_nina" /> </dependentProcessSequence> </processDefinition> <!-- ssh process --> <processDefinition id="sshProcess_nef"> <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess" hostname= "nef.inria.fr" username="smariani"> <processReference refid="dpsProcess_nef" /> <fileTransferDeploy refid="JACOBI"> <copyProtocol>processDefault, scp, rcp</copyProtocol> <!-- local host path --> <sourceInfo prefix= "${PROACTIVE_HOME}/src/org/objectweb/proactive/mpi/control/config/bin" /> <!-- remote host path --> <destinationInfo prefix="${REMOTE_HOME_NEF}/MyApp" /> </fileTransferDeploy> </sshProcess> </processDefinition> <!-- ssh process --> <processDefinition id="sshProcess_nina"> <sshProcess class="org.objectweb.proactive.core.process.ssh.SSHProcess" hostname= "cluster.inria.fr" username="smariani"> <processReference refid="dpsProcess_nina" /> <fileTransferDeploy refid="JACOBI"> <copyProtocol>scp</copyProtocol> <!-- local host path --> <sourceInfo prefix= "${PROACTIVE_HOME}/src/org/objectweb/proactive/mpi/control/config/bin" /> <!-- remote host path --> <destinationInfo prefix="${REMOTE_HOME_NINA}/MyApp" /> </fileTransferDeploy> </sshProcess> </processDefinition> </processes> </infrastructure> </ProActiveDescriptor>
Note | |
---|---|
To be interfaced with some native code, each wrapper/proxy loads a library in their JVM context. Then, it is necessary that the value of the java.library.path variable for each JVM is set to the remote directory path. To be done, use the following tag in each jvmProcess definition: |
<parameter value="-Djava.library.path=${REMOTE_HOME_NEF}/MyApp" />
Place the source file in org/objectweb/proactive/mpi/control/config/src directory
#include <stdio.h> #include "mpi.h" #include "ProActiveMPI.h" #include <time.h> /* This example handles a 1680x1680 mesh, on 2 clusters with 16 nodes (2 ppn) for each */ #define maxn 1680 #define size 840 #define JOB_ZERO 0 #define JOB_ONE 1 #define NB_ITER 10000 int main( argc, argv ) int argc; char **argv; { int rank, initValue, i, j, itcnt, idjob, nb_proc, error; int i_first, i_last; double xlocal[(size/3)+2][maxn]; double xnew[(size/3)+3][maxn]; char processor_name[MPI_MAX_PROCESSOR_NAME]; int namelen; // MPI initialization MPI_Init( &argc, &argv ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Comm_size( MPI_COMM_WORLD, &nb_proc ); MPI_Get_processor_name(processor_name,&namelen); // ProActive with MPI initialization error = ProActiveMPI_Init(rank); if (error < 0){ printf("[MPI] !!! Error ProActiveMPI init \n"); MPI_Abort( MPI_COMM_WORLD, 1 ); } // get this process job ID ProActiveMPI_Job(&idjob); if (nb_proc != 16) MPI_Abort( MPI_COMM_WORLD, 1 ); /* xlocal[][0] is lower ghostpoints, xlocal[][size+2] is upper */ /* * Note that top and bottom processes have one less row of interior points */ i_first = 1; i_last = size/nb_proc; if ((rank == 0) && (idjob == JOB_ZERO)) i_first++; if ((rank == nb_proc - 1) && (idjob == JOB_ONE)) i_last--; // matrix initialization if (idjob==JOB_ZERO) initValue=rank; else {initValue = nb_proc+rank;} /* Fill the data as specified */ for (i=1; i<=size/nb_proc; i++) for (j=0; j<maxn; j++) xlocal[i][j] = initValue; for (j=0; j<maxn; j++) { xlocal[i_first-1][j] = -1; xlocal[i_last+1][j] = -1; } itcnt = 0; do { /*----+----+----+----+----+----+ MPI COMMS +----+----+----+----+----+----+*/ /* Send up unless I'm at the top, then receive from below */ /* Note the use of xlocal[i] for &xlocal[i][0] */ if (rank < nb_proc - 1) MPI_Send( xlocal[size/nb_proc], maxn, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD ); if (rank > 0) MPI_Recv( xlocal[0], maxn, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, &status ); /*----+----+----+----+----+----+ PROACTIVE COMMS +----+----+----+----+----+----+*/ if ((rank == nb_proc - 1) && (idjob == JOB_ZERO)){ error = ProActiveMPI_Send(xlocal[size/nb_proc], maxn, MPI_DOUBLE, 0, 0, JOB_ONE); if (error < 0){ printf("[MPI] !!! Error ProActiveMPI send #15/0 -> #0/1 \n");} } if ((rank == 0) && (idjob==JOB_ONE)) { error = ProActiveMPI_Recv(xlocal[0], maxn, MPI_DOUBLE, nb_proc - 1, 0, JOB_ZERO); if (error < 0){ printf("[MPI] !!! Error ProActiveMPI recv #0/1 <- #15/0 \n");} } /*----+----+----+----+----+----+ MPI COMMS +----+----+----+----+----+----+*/ /* Send down unless I'm at the bottom */ if (rank > 0) MPI_Send( xlocal[1], maxn, MPI_DOUBLE, rank - 1, 1, MPI_COMM_WORLD ); if (rank < nb_proc - 1) MPI_Recv( xlocal[size/nb_proc+1], maxn, MPI_DOUBLE, rank + 1, 1, MPI_COMM_WORLD, &status ); /*----+----+----+----+----+----+ PROACTIVE COMMS +----+----+----+----+----+----+*/ if ((rank == 0) && (idjob==JOB_ONE)){ error = ProActiveMPI_Send(xlocal[1], maxn, MPI_DOUBLE, nb_proc - 1, 1, JOB_ZERO); if (error < 0){ printf("[MPI] !!! Error ProActiveMPI send #0/1 -> #15/0 \n");} } if ((rank == nb_proc - 1) && (idjob==JOB_ZERO)) { t_00 = MPI_Wtime(); error = ProActiveMPI_Recv(xlocal[size/nb_proc+1], maxn, MPI_DOUBLE, 0, 1, JOB_ONE); t_01 = MPI_Wtime(); if (error < 0){ printf("[MPI] !!! Error ProActiveMPI recv #15/0 <- #0/1 \n");} waitForRecv += t_01 - t_00; } /*----+----+----+----+----+----+ COMPUTATION +----+----+----+----+----+----+*/ /* Compute new values (but not on boundary) */ itcnt ++; diffnorm = 0.0; for (i=i_first; i<=i_last; i++) for (j=1; j<maxn-1; j++) { xnew[i][j] = (xlocal[i][j+1] + xlocal[i][j-1] + xlocal[i+1][j] + xlocal[i-1][j]) / 4.0; diffnorm += (xnew[i][j] - xlocal[i][j]) * (xnew[i][j] - xlocal[i][j]); } /* Only transfer the interior points */ for (i=i_first; i<=i_last; i++) for (j=1; j<maxn-1; j++) xlocal[i][j] = xnew[i][j]; if (rank == 0) printf( "[MPI] At iteration %d, job %d \n", itcnt, idjob ); } while (itcnt < NB_ITER); // print this process buffer printf("[MPI] Rank: %d Job: %d \n",rank, idjob ); for (i=1; i<(size/16); i++){ printf("["); for (j=0; j<maxn; j++) printf( "%f ",xlocal[i][j]); printf("] \n"); } // clean environment ProActiveMPI_Finalize(); MPI_Finalize( ); return 0; }
To compile the MPI code with the added features for wrapping, you may enter the org/objectweb/proactive/mpi/control/config directory and type:
linux> make clean linux> make mpicode=jacobi
Note | |
---|---|
The mpicode value is the name of the source file without its extension. The Makefile generates a binary with the same name in /bin directory. |
import org.apache.log4j.Logger; import org.objectweb.proactive.ProActive; import org.objectweb.proactive.core.ProActiveException; import org.objectweb.proactive.core.config.ProActiveConfiguration; import org.objectweb.proactive.core.descriptor.data.ProActiveDescriptor; import org.objectweb.proactive.core.descriptor.data.VirtualNode; import org.objectweb.proactive.core.node.Node; import org.objectweb.proactive.core.util.log.Loggers; import org.objectweb.proactive.core.util.log.ProActiveLogger; import org.objectweb.proactive.mpi.MPI; import org.objectweb.proactive.mpi.MPISpmd; import org.objectweb.proactive.mpi.control.ProActiveMPI; import java.util.ArrayList; import java.util.Vector; public class Main { public static void main(String[] args) { Logger logger = ProActiveLogger.getLogger(Loggers.EXAMPLES); if (args.length != 1) { logger.error("Usage: java " + Main.class.getName() + " <deployment file>"); System.exit(0); } ProActiveConfiguration.load(); VirtualNode jacobiOnNina; VirtualNode jacobiOnNef; ProActiveDescriptor pad = null; try { pad = ProActive.getProactiveDescriptor("file:" + args[0]); // gets virtual node jacobiOnNef = pad.getVirtualNode("Cluster_Nef"); jacobiOnNina = pad.getVirtualNode("Cluster_Nina"); MPISpmd nefMPISpmd = MPI.newMPISpmd(jacobiOnNef); MPISpmd ninaMPISpmd = MPI.newMPISpmd(jacobiOnNina); ArrayList my_jobs = new ArrayList(); my_jobs.add(nefMPISpmd); my_jobs.add(ninaMPISpmd); ProActiveMPI.deploy(my_jobs); } catch (ProActiveException e) { e.printStackTrace(); logger.error("Pb when reading descriptor"); } } }
Deploy the ProActive main program above like any another ProActive application using a script like the following one:
#!/bin/sh echo --- ProActive/MPI JACOBI example --------------------------------------------- workingDir=`dirname $0` . $workingDir/env.sh XMLDESCRIPTOR=/user/smariani/home/Test/MPI-Jacobi-nina-nef.xml $JAVACMD -classpath $CLASSPATH -Djava.security.policy=$PROACTIVE/compile/proactive.java.policy -Dproactive.rmi.port=6099 -Dlog4j.configuration=file:$PROACTIVE/compile/proactive-log4j Main $XMLDESCRIPTOR
Reading of the file descriptor and return of 16 nodes from the first cluster Nef and 16 nodes from the second cluster Nina
************* Reading deployment descriptor: file:/user/smariani/home/TestLoadLib/MPI-Jacobi-nina-nef.xml ******************** created VirtualNode name=Cluster_Nef created VirtualNode name=Cluster_Nina ... **** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.75:6099/Cluster_Nef932675317 done **** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.76:6099/Cluster_Nef1864357984 done **** Mapping VirtualNode Cluster_Nef with Node: //193.51.209.70:6099/Cluster_Nef1158912343 done ... **** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.47:6099/Cluster_Nina1755746262 done **** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.47:6099/Cluster_Nina-1139061904 done **** Mapping VirtualNode Cluster_Nina with Node: //193.51.209.45:6099/Cluster_Nina-941377986 done ...
Deployment of proxies on remote nodes and environment initialization
[MANAGER] Create SPMD Proxy for jobID: 0 [MANAGER] Initialize remote environments [MANAGER] Activate remote thread for communication [MANAGER] Create SPMD Proxy for jobID: 1 [MANAGER] Initialize remote environments [MANAGER] Activate remote thread for communication
Processes registration
[MANAGER] JobID #0 register mpi process #12 [MANAGER] JobID #0 register mpi process #3 [MANAGER] JobID #0 register mpi process #1 [MANAGER] JobID #0 register mpi process #15 [MANAGER] JobID #0 register mpi process #4 [MANAGER] JobID #0 register mpi process #7 [MANAGER] JobID #0 register mpi process #0 [MANAGER] JobID #0 register mpi process #9 [MANAGER] JobID #0 register mpi process #2 [MANAGER] JobID #0 register mpi process #13 [MANAGER] JobID #0 register mpi process #10 [MANAGER] JobID #0 register mpi process #5 [MANAGER] JobID #0 register mpi process #11 [MANAGER] JobID #0 register mpi process #14 [MANAGER] JobID #0 register mpi process #6 [MANAGER] JobID #0 register mpi process #8 [MANAGER] JobID #1 register mpi process #10 [MANAGER] JobID #1 register mpi process #13 [MANAGER] JobID #1 register mpi process #6 [MANAGER] JobID #1 register mpi process #3 [MANAGER] JobID #1 register mpi process #7 [MANAGER] JobID #1 register mpi process #8 [MANAGER] JobID #1 register mpi process #15 [MANAGER] JobID #1 register mpi process #9 [MANAGER] JobID #1 register mpi process #4 [MANAGER] JobID #1 register mpi process #1 [MANAGER] JobID #1 register mpi process #0 [MANAGER] JobID #1 register mpi process #11 [MANAGER] JobID #1 register mpi process #2 [MANAGER] JobID #1 register mpi process #5 [MANAGER] JobID #1 register mpi process #12 [MANAGER] JobID #1 register mpi process #14
Starting computation
[MPI] At iteration 1, job 1 [MPI] At iteration 2, job 1 [MPI] At iteration 3, job 1 [MPI] At iteration 4, job 1 [MPI] At iteration 5, job 1 ... [MPI] At iteration 1, job 0 [MPI] At iteration 2, job 0 [MPI] At iteration 3, job 0 [MPI] At iteration 4, job 0 [MPI] At iteration 5, job 0 [MPI] At iteration 6, job 0 ... [MPI] At iteration 9996, job 1 [MPI] At iteration 9997, job 1 [MPI] At iteration 9998, job 1 [MPI] At iteration 9999, job 1 [MPI] At iteration 10000, job 1 ... [MPI] At iteration 9996, job 0 [MPI] At iteration 9997, job 0 [MPI] At iteration 9998, job 0 [MPI] At iteration 9999, job 0 [MPI] At iteration 10000, job 0
Displaying each process result, for example
[MPI] Rank: 15 Job: 1 [31.000000 27.482592 24.514056 ... 24.514056 27.482592 31.000000 ] [31.000000 26.484765 22.663677 ... 22.663677 26.484765 31.000000 ] [31.000000 24.765592 19.900617 ... 19.900617 24.765592 31.000000 ]
All processes unregistration
[MANAGER] JobID #1 unregister mpi process #15 [MANAGER] JobID #1 unregister mpi process #14 [MANAGER] JobID #0 unregister mpi process #0 [MANAGER] JobID #1 unregister mpi process #13 [MANAGER] JobID #0 unregister mpi process #1 [MANAGER] JobID #1 unregister mpi process #12 [MANAGER] JobID #0 unregister mpi process #2 ...
The following snapshot shows the 32 Nodes required, distributed on 16 hosts (two processes per host, and 8 hosts on each cluster). Each Node contains its local wrapper, a ProActiveMPICoupling Active Object. One can notice the ProActive communication between two MPI processes trough the communication between two proxies which belongs to two Nodes residing on different clusters.
The proxy has the role of a smart reference that performs additional actions when the MPISpmdImpl Active Object is accessed. Especially the proxy forwards requests to the Active Object if the current status of this Active Object is in an appropriate state, otherwise an IllegalMPIStateException is thrown.
DependentListProcess and IndependentListProcess (left part on the picture)
The SequentialListProcess relative classes are defined in the org.objectweb.proactive.core.process package. The two classes share the same characteristics: both contain a list of processes which have to be executed sequentially. This dependent constraint has been integrated in order to satisfy the MPI process requirement. Indeed, the DependentListProcess class specifies a list of processes which have to extend the DependentProcess interface, unless the header process which is a simple allocation resources process. It provides deployer to be sure that the dependent process will be executed if and only if this dependent process gets back parameters from which it is dependent.
MPIDependentProcess (right part on the picture)
The MPI relative classes are defined in the org.objectweb.proactive.core.process.mpi package. MPI process preliminary requires a list of hosts for job execution. Thus, this process has to implement the Dependent Process interface. See section 11.7. Infrastructure and processes (part III) for more details on processes.
org.objectweb.proactive.mpi | ||
public class MPI | ||
static MPISpmd | newMPISpmd(VirtualNode virtualNode) throws IllegalMPIStateException | Creates an MPISpmd object from an existing VirtualNode |
public class MPISpmd | ||
MPIResult | startMPI() throws IllegalMPIStateException | Triggers MPI code execution and returns a future on an MPIResult object |
MPIResult | reStartMPI() throws IllegalMPIStateException | Restarts MPI code execution and returns a new future on an MPIResult object |
boolean | killMPI() throws IllegalMPIStateException | Kills the MPI code execution |
String | getStatus() | Returns the current status of MPI code execution |
void | setCommandArguments(String arguments) | Adds or modifies the MPI command parameters |
public class MPIResult | ||
int | getReturnValue() | Returns the exit value of the MPI code |
public class MPIConstants | ||
static final String | MPI_UNSTARTED | MPISpmd object status after creation |
static final String | MPI_RUNNING | MPISpmd object has been started or restarted |
static final String | MPI_KILLED | MPISpmd object has been killed |
static final String | MPI_FINISHED | MPISpmd object has finished |
Table 41.1. Simple Wrapping of MPI Code
org.objectweb.proactive.mpi | ||
public class MPISpmd | ||
void | newActiveSpmd(String class) | Deploys an SPMD group of Active Objects on each MPISpmd Nodes |
void | newActiveSpmd(String class, Object[] params) | Deploys an SPMD group of Active Objects with specific constructor parameters on each MPISpmd Nodes |
void | newActiveSpmd(String class, Object[][] params) | Deploys an SPMD group of Active Objects with specific constructor parameters on each MPISpmd Nodes |
void | newActive(String class, Object[] params, int rank) throws ArrayIndexOutOfBoundsException | Deploys an Active object with specific constructor parameters on a single node specified with rank |
org.objectweb.proactive.mpi.control | ||
public class ProActiveMPI | ||
void | deploy(ArrayList mpiSpmdList) | Deploys and starts all MPISpmd objects in the list |
Table 41.2. API for creating one Active Object per MPI process
int | ProActiveSend(void* buf, int count, MPI_Datatype datatype, int dest, char* className, char* methodName, int jobID, ...) | Performs a basic send from mpi side to a ProActive java class |
Table 41.3. MPI to ProActive Communications API
org.objectweb.proactive.mpi.control | ||
public class ProActiveMPIData | ||
int | getSrc() | Returns the rank of mpi process sender |
int | getJobID() | Returns jobID of mpi process sender |
int | getDataType() | Returns type of data |
String [] | getParameters() | Returns the parameters passed in the ProActiveSend method call |
byte [] | getData() | Returns the data as a byte array |
int | getCount() | Returns the number of elements in data array |
org.objectweb.proactive.mpi.control.util | ||
public class ProActiveMPIUtil | ||
static int | bytesToInt(byte[] bytes, int startIndex) | Given a byte array, restores it as an int |
static float | bytesToFloat(byte[] bytes, int startIndex) | Given a byte array, restores it as a float |
static short | bytesToShort(byte[] bytes, int startIndex) | Given a byte array, restores it as a short |
static long | bytesToLong(byte[] bytes, int startIndex) | Given a byte array, restores it as a long |
static double | bytesToDouble(byte[] bytes, int startIndex) | Given a byte array, restores it as a double |
static String | bytesToString(byte[] bytes, int startIndex) | Given a byte array, restores a string out of it |
static int | intTobytes(int num, byte[] bytes, int startIndex) | Translates int into bytes, stored in byte array |
static int | floatToByte(float num, byte[] bytes, int startIndex) | Translates float into bytes, stored in byte array |
static int | shortToBytes(short num, byte[] bytes, int startIndex) | Translates short into bytes, stored in byte array |
static int | stringToBytes(String str, byte[] bytes, int startIndex) | Gives a String less than 255 bytes, store it as byte array |
static int | longToBytes(long num, byte[] bytes, int startIndex) | Translates long into bytes, stored in byte array |
static int | doubleToBytes(double num, byte[] bytes, int startIndex) | Translates double into bytes, stored in byte array |
Table 41.4. Java API for MPI message conversion
org.objectweb.proactive.mpi.control | ||
public class ProActiveMPICoupling | ||
static void | MPISend(byte[] buf, int count, int datatype, int dest, int tag, int jobID) | Sends a buffer of bytes to the specified MPI process |
org.objectweb.proactive.mpi.control | ||
public class ProActiveMPIConstants | ||
static final int | MPI_CHAR | char |
static final int | MPI_UNSIGNED_CHAR | unsigned char |
static final int | MPI_BYTE | byte |
static final int | MPI_SHORT | short |
static final int | MPI_UNSIGNED_SHORT | unsigned short |
static final int | MPI_INT | int |
static final int | MPI_UNSIGNED | unsigned int |
static final int | MPI_LONG | long |
static final int | MPI_UNSIGNED_LONG | unsigned long |
static final int | MPI_FLOAT | float |
static final int | MPI_DOUBLE | double |
static final int | MPI_LONG_DOUBLE | long double |
static final int | MPI_LONG_LONG_INT | long long int |
Table 41.5. ProActiveMPI API for sending messages to MPI
int | ProActiveRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID) | Performs a blocking receive from MPI side to receive data from a ProActive java class |
int | ProActiveIRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request) | Performs a non blocking receive from MPI side to receive data from a ProActive java class |
int | ProActiveTest(ProActiveMPI_Request *request, int *flag) | Tests for the completion of receive from a ProActive java class |
int | ProActiveWait(ProActiveMPI_Request *request) | Waits for an MPI receive from a ProActive java class to complete |
Table 41.6. MPI message reception from ProActive
int | ProActiveMPI_Init(int rank) | Initializes the MPI with ProActive execution environment |
int | ProActiveMPI_Job(int *job) | Initializes the variable with the JOBID |
int | ProActiveMPI_Finalize() | Terminates MPI with ProActive execution environment |
int | ProActiveMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, int jobID) | Performs a basic send |
int | ProActiveMPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID) | Performs a basic Recv |
int | ProActiveMPI_IRecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, int jobID, ProActiveMPI_Request *request) | Performs a non blocking receive |
int | ProActiveMPI_Test(ProActiveMPI_Request *request, int *flag) | Tests for the completion of receive |
int | ProActiveMPI_Wait(ProActiveMPI_Request *request) | Waits for an MPI receive to complete |
int | ProActiveMPI_AllSend(void *buf, int count, MPI_Datatype datatype, int tag, int jobID) | Performs a basic send to all processes of a remote job |
int | ProActiveMPI_Barrier(int jobID) | Blocks until all process of the specified job have reached this routine |
Table 41.7. MPI to MPI through ProActive C API
Datatypes: MPI_CHAR, MPI_UNSIGNED_CHAR, MPI_BYTE, MPI_SHORT, MPI_UNSIGNED_SHORT, MPI_INT, MPI_UNSIGNED, MPI_LONG, MPI_UNSIGNED_LONG, MPI_FLOAT, MPI_DOUBLE, MPI_LONG_DOUBLE, MPI_LONG_LONG_INT
Call | PROACTIVEMPI_INIT(rank, err) integer :: rank, err | Initializes the MPI with ProActive execution environment |
Call | PROACTIVEMPI_JOB(job, err) integer :: job, err | Initializes the job environment variable |
Call | PROACTIVEMPI_FINALIZE(err) integer :: err | Terminates MPI with ProActive execution environment |
Call | PROACTIVEMPI_SEND(buf, count, datatype, dest, tag, jobID, err) < type >, dimension(*) :: buf integer :: count, datatype, dest, tag, jobID, err | Performs a basic send |
Call | PROACTIVEMPI_RECV(buf, count, datatype, src, tag, jobID, err) < type >, dimension(*) :: buf integer :: count, datatype, src, tag, jobID, err | Performs a basic Recv |
Call | PROACTIVEMPI_ALLSEND(buf, count, datatype, tag, jobID, err) < type >, dimension(*) :: buf integer :: count, datatype, tag, jobID, err | Performs a basic send to all processes of a remote job |
Call | PROACTIVEMPI_BARRIER(jobID, err) integer :: jobID, err | Blocks until all process of the specified job have reached this routine |
Table 41.8. MPI to MPI through ProActive Fortran API
Datatypes: MPI_CHARACTER, MPI_BYTE, MPI_INTEGER, MPI_DOUBLE
© 2001-2007 INRIA Sophia Antipolis All Rights Reserved