High-Level API Tutorials

7.1. Master-Worker DistributedPrimes Application

In the previous Distributed Primes example, we have used a basic master-worker architecture in order to distribute the computation. However, the algorithm used is very inefficient as the master has to wait for all the workers to finish the computation before sending a new number to be computed. In this chapter we will rewrite the algorithm to take advantage of the Master-Worker API already included in ProActive.

The Master-Worker API aims to simplify the distribution of computations which are embarrassingly parallel. The Master-Worker API hides the active object details from the user allowing the distribution of computation without explicitly specifying the creation and deployment of active objects.

We will rewrite the Primes example to take advantage of the Master-Worker API. Each Worker will check if a number is prime by using the simple Euclid's sieve algorithm. We are not aiming for a efficient algorithm, as there are much faster and more complex methods of checking for primes, but for an illustration of how the Master-Worker API works.

The Master-Worker API uses a logical partition of computations and resources:

  • task - a task is logical partition of the computation. The computation will be split into several tasks that will be assigned to workers.

  • worker - a worker is logical partition of the resources available. The workers retrieve tasks, compute them and send the results back to the master.

Architecture of Distributed Primes Using Master-Worker API

Figure 7.1.  Architecture of Distributed Primes Using Master-Worker API


7.1.1. Application components

The general simple algorithm for using the Master-Worker API is the following:

  1. Define the resources

  2. Define the master

  3. Define the tasks

  4. Tell the master to solve the tasks

  5. Retrieve the results from the workers

While this is the basic structure possible with the Master-Worker API, more complex algorithms are of course possible.

We follow this algorithm in our simple example.

First we define the resources to be used by specifying a deployment descriptor:

// adding resources
master.addResources(DistributedPrimesMW.class
   .getResource("/org/objectweb/proactive/examples/masterworker/WorkersLocal.xml"));

To learn how deployment descriptors work read Section 6.3, “Application Deployment” and Chapter 20, XML Deployment Descriptors.

After specifying the resources we need to create a master to coordinate the computation:

// creation of the master
ProActiveMaster<ComputePrime, Long> master =
   new ProActiveMaster<ComputePrime, Long>();

The master has as a type ComputePrime which is a inner class that extends the Task interface. To create the master we need to specify a task the master will control and the return type of the method run() in the task (in our case Long).

We define a task by implementing the interface Task and overriding the method run(WorkerMemory memory). The method run(WorkerMemory memory) is designed to hold the computation part of the task. In our case it returns a type Long after checking if the number passed to the constructor is prime.

/**
 * Very simple task which calculates if a n is prime by using 
 * Euclid's sieve.
 */
public static class ComputePrime implements Task<Long> {

	// The number to be checked
	private Long number;
    public ComputePrime(Long number) {
    	this.number = number;
    }
    // very simple euclid's sieve
    public Long run(WorkerMemory memory) throws Exception {
    	
    	Long limit = new Long(Math.round(Math.sqrt(number))+1);
    	Long divisor = new Long(2);
    	boolean prime = true;
    	while ((divisor < limit) && (prime)){
    		if ((number%divisor) == 0) prime = false;
    		divisor++;
    	}
    	//returns the number if it is prime otherwise return 0
    	if (prime) return number;
    	return new Long(0);
    	
    }
}

Because we do not know ahead of time how many primes numbers we have in an interval we set a number of tasks NUMBER_OF_TASKS that reprezents the number of numbers to be checked in one run. Then we assign the tasks to the workers, calculate how many primes numbers we found and display them. We repeat the same process until the desired number of prime numbers has been found.

Following is the code for the entire application. As you may notice it is much shorter than the previous example and it does not use any explicit active object instantiation. Instead we just specify a deployment descriptor and ProActive takes care of the rest.

package active;
import java.util.List;
import java.util.Vector;

import org.objectweb.proactive.ProActive;
import org.objectweb.proactive.api.PADeployment;
import org.objectweb.proactive.extensions.masterworker.ProActiveMaster;
import org.objectweb.proactive.extensions.masterworker.TaskAlreadySubmittedException;
import org.objectweb.proactive.extensions.masterworker.TaskException;
import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory;


public class DistributedPrimesMW {
    private static final int NUMBER_OF_TASKS = 30;

    public static void main(String[] args) throws TaskAlreadySubmittedException, TaskException {
        //get the number of primes from the command line
    	int NUMBER_OF_PRIMES = Integer.parseInt(args[0]);
    	// creation of the master
        ProActiveMaster<ComputePrime, Long> master =
        		new ProActiveMaster<ComputePrime, Long>();

        // adding resources
        master.addResources(DistributedPrimesMW.class
                .getResource("/org/objectweb/proactive/examples/masterworker/WorkersLocal.xml"));

        // defining tasks
        Vector<ComputePrime> tasks = new Vector<ComputePrime>();
        
        // holds the number of primes found
        long found = 0;
        // number to start with 
        long checkedLimit =  2;
        //iterate until at least NUMBER_OF_PRIMES primes have been found
        while (found < NUMBER_OF_PRIMES){
        	// add a task for each number between checkedLimit and checkedLimit + NUMBER_OF_TASKS
        	for (long i = checkedLimit; i < checkedLimit + NUMBER_OF_TASKS; i++)
	            tasks.add(new ComputePrime(new Long(i)));

        	// start the computation
	        master.solve(tasks);

            // wait for results
            List<Long> primes = master.waitAllResults();

            //display the results and increment the number of primes found
            for (Long prime:primes)
            	if (prime != 0) {
            		System.out.print(prime.toString() + " ");
            		found++;
            	}
          //move the checking start point
          checkedLimit = checkedLimit + NUMBER_OF_TASKS;
          //clear the taks list since we will be reusing it
          tasks.clear();
        }
        //terminate the master and workers
        master.terminate(true);
      //  System.exit(0);
    }

    /**
     * Very simple task which calculates if a n is prime by using 
     * Euclid's sieve.
     * @author The ProActive Team
     */
    public static class ComputePrime implements Task<Long> {

    	// The number to be checked
    	private Long number;
        public ComputePrime(Long number) {
        	this.number = number;
        }
        // very simple euclid's sieve
        public Long run(WorkerMemory memory) throws Exception {
        	
        	Long limit = new Long(Math.round(Math.sqrt(number))+1);
        	Long divisor = new Long(2);
        	boolean prime = true;
        	while ((divisor < limit) && (prime)){
        		if ((number%divisor) == 0) prime = false;
        		divisor++;
        	}
        	//returns the number if it is prime otherwise return 0
        	if (prime) return number;
        	return new Long(0);
        	
        }
    }
}

7.1.2. Running master-worker Distributed Primes

To compile and run the application you need the DistributedPrimesMW class and to include the following jar files: ProActive/dist/lib/ProActive.jar, ProActive/dist/lib/javassist.jar, ProActive/dist/lib/log4j.jar, ProActive/dist/lib/xercesImpl.jar, ProActive/dist/lib/fractal.jar, and ProActive/dist/lib/bouncycastle.jar and explicitly set the Java security policy with the -Djava.security.policy=pathToFile and the logging policy with -Dlog4j.configuration=file:proactive-log4j. The steps necessary are explained in Chapter 2, ProActive And IC2D Installation.

The command line for running the application is:

java -Djava.security.policy=proactive.java.policy -Dlog4j.configuration=file:proactive-log4j DistributedPrimesMW