GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
graphlab::distributed_control Class Reference

The distributed control object is primary means of communication between the distributed GraphLab processes. More...

#include <graphlab/rpc/dc.hpp>

List of all members.

Public Member Functions

 distributed_control ()
 distributed_control (dc_init_param initparam)
procid_t procid () const
 returns the id of the current process
procid_t numprocs () const
 returns the number of processes in total.
void register_deletion_callback (boost::function< void(void)> deleter)
 Registers a callback which will be called on deletion of the distributed_control object.
void remote_call (procid_t targetmachine, Fn fn,...)
 Performs a non-blocking RPC call to the target machine to run the provided function pointer.
void remote_call (Iterator machine_begin, Iterator machine_end, Fn fn,...)
 Performs a non-blocking RPC call to a collection of machines to run the provided function pointer.
RetVal remote_request (procid_t targetmachine, Fn fn,...)
 Performs a blocking RPC call to the target machine to run the provided function pointer.
size_t calls_sent () const
 Returns the total number of RPC calls made.
double mega_calls_sent () const
 Returns the total number of RPC calls made in millions.
size_t calls_received () const
 Returns the total number of RPC calls received.
size_t bytes_sent () const
 Returns the total number of bytes sent excluding headers and other control overhead. Also see network_bytes_sent()
size_t network_bytes_sent () const
 Returns the total number of bytes sent including all headers and other control overhead. Also see bytes_sent()
double network_megabytes_sent () const
 Returns the total number of megabytes sent including all headers and other control overhead. Also see network_bytes_sent()
size_t bytes_received () const
 Returns the total number of bytes received excluding all headers and other control overhead. Also see bytes_sent().
void flush ()
 Performs a local flush of all send buffers.
template<typename U >
void send_to (procid_t target, U &t, bool control=false)
 Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object.
template<typename U >
void recv_from (procid_t source, U &t, bool control=false)
 Waits to receives an object a source machine sent via send_to()
template<typename U >
void broadcast (U &data, bool originator, bool control=false)
 This function allows one machine to broadcasts an object to all machines.
template<typename U >
void gather (std::vector< U > &data, procid_t sendto, bool control=false)
 Collects information contributed by each machine onto one machine.
template<typename U >
void all_gather (std::vector< U > &data, bool control=false)
 Sends some information contributed by each machine to all machines.
template<typename U >
void all_reduce (U &data, bool control=false)
 Combines a value contributed by each machine, making the result available to all machines.
template<typename U , typename PlusEqual >
void all_reduce2 (U &data, PlusEqual plusequal, bool control=false)
 Combines a value contributed by each machine, making the result available to all machines.
void barrier ()
 A distributed barrier which waits for all machines to call the barrier() function before proceeding.
void full_barrier ()
 A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete.
std::ostream & cout () const
 A wrapper on cout, that outputs only on machine 0.
std::ostream & cerr () const
 A wrapper on cerr, that outputs only on machine 0.
std::map< std::string, size_t > gather_statistics ()

Static Public Member Functions

static unsigned char set_sequentialization_key (unsigned char newkey)
 Sets the sequentialization key to a new value, returning the previous value.
static unsigned char new_sequentialization_key ()
 Creates a new sequentialization key, returning the old value.
static unsigned char get_sequentialization_key ()
 gets the current sequentialization key. This function is not generally useful.

Detailed Description

The distributed control object is primary means of communication between the distributed GraphLab processes.

The distributed_control object provides asynchronous, multi-threaded Remote Procedure Call (RPC) services to allow distributed GraphLab processes to communicate with each other. Currently, the only communication method implemented is TCP/IP. There are several ways of setting up the communication layer, but the most reliable, and the preferred method, is to "bootstrap" using MPI. See your local MPI documentation for details on how to launch MPI jobs.

To construct a distributed_control object, the simplest method is to just invoke the default constructor.

// initialize MPI
mpi_tools::init(argc, argv);
// construct distributed control object

After which all distributed control services will operate correctly.

Each process is assigned a sequential process ID at starting at 0. i.e. The first process will have a process ID of 0, the second process will have an ID of 1, etc. distributed_control::procid() can be used to obtain the current machine's process ID, and distributed_control::numprocs() can be used to obtain the total number of processes.

The primary functions used to communicate between processes are distributed_control::remote_call() and distributed_control::remote_request(). These functions are thread-safe and can be called very rapidly as they only write into a local buffer. Communication is handled by a background thread. On the remote side, RPC calls are handled in parallel by a thread pool, and thus may be parallelized arbitrarily. Operations such as distributed_control::full_barrier(), or the sequentialization key can be used to get finer grained control over order of execution on the remote machine.

A few other additional helper functions are also provided to support "synchronous" modes of communication. These functions are not thread-safe and can only be called on one thread per machine. These functions block until all machines call the same function. For instance, if gather() is called on one machine, it will not return until all machines call gather().

Note:
These synchronous operations are modeled after some MPI collective operations. However, these operations here are not particularly optimized and will generally be slower than their MPI counterparts. However, the implementations here are much easier to use, relying extensively on serialization to simplify communication.

To support Object Oriented Programming like methodologies, we allow the creation of Distributed Objects through graphlab::dc_dist_object. dc_dist_object allows a class to construct its own local copy of a distributed_control object allowing instances of the class to communicate with each other across the network.

See GraphLab RPC for usage examples.

Definition at line 198 of file dc.hpp.


Constructor & Destructor Documentation

graphlab::distributed_control::distributed_control ( )

Default constructor. Automatically tries to read the initialization from environment variables, or from MPI (if MPI is initialized).

Definition at line 112 of file dc.cpp.

graphlab::distributed_control::distributed_control ( dc_init_param  initparam)
explicit

Passes custom constructed initialization parameters in dc_init_param

Though dc_init_param can be obtained from environment variables using dc_init_from_env() or from MPI using dc_init_from_mpi(), using the default constructor is prefered.

Definition at line 142 of file dc.cpp.


Member Function Documentation

template<typename U >
void graphlab::distributed_control::all_gather ( std::vector< U > &  data,
bool  control = false 
)
inline

Sends some information contributed by each machine to all machines.

The goal is to have each machine broadcast a piece of information to all machines. This is like gather(), but all machines have the complete vector at the end. To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling all_gather with the vector will result in all machines having a complete copy of the vector containing all contributions (entry 0 from machine 0, entry 1 from machine 1, etc).

Example:

// construct the vector of values
std::vector<int> values;
values.resize(dc.numprocs());
// set my contributed value
values[dc.procid()] = dc.procid();
dc.all_gather(values);
// at this point all machine will have a vector with length equal to the
// number of processes, and containing values [0, 1, 2, ...]
Note:
Behavior is undefined if multiple threads on the same machine call all_gather simultaneously
Parameters:
dataA vector of length equal to the number of processes. The information to communicate is in the entry data[procid()]
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1276 of file dc.hpp.

template<typename U >
void graphlab::distributed_control::all_reduce ( U &  data,
bool  control = false 
)
inline

Combines a value contributed by each machine, making the result available to all machines.

Each machine calls all_reduce() with a object which is serializable and has operator+= implemented. When all_reduce() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine.

Example:

int i = 1;
dc.all_reduce(i);
// since each machine contributed the value "1",
// all machines will have i = numprocs() here.
Parameters:
dataA piece of data to perform a reduction over. The type must implement operator+=.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1281 of file dc.hpp.

template<typename U , typename PlusEqual >
void graphlab::distributed_control::all_reduce2 ( U &  data,
PlusEqual  plusequal,
bool  control = false 
)
inline

Combines a value contributed by each machine, making the result available to all machines.

This function is equivalent to all_reduce(), but with an externally defined PlusEqual function.

Each machine calls all_reduce() with a object which is serializable and a function "plusequal" which combines two instances of the object. When all_reduce2() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine using the plusequal function.

Where U is the type of the object, the plusequal function must be of the form:

void plusequal(U& left, const U& right);

and must implement the equivalent of left += right;

Example:

void int_plus_equal(int& a, const int& b) {
a+=b;
}
int i = 1;
dc.all_reduce2(i, int_plus_equal);
// since each machine contributed the value "1",
// all machines will have i = numprocs() here.
Parameters:
dataA piece of data to perform a reduction over.
plusequalA plusequal function on the data. Must have the prototype void plusequal(U&, const U&)
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1287 of file dc.hpp.

void graphlab::distributed_control::barrier ( )

A distributed barrier which waits for all machines to call the barrier() function before proceeding.

A machine calling the barrier() will wait until every machine reaches this barrier before continuing. Only one thread from each machine should call the barrier.

See also:
full_barrier

Definition at line 542 of file dc.cpp.

template<typename U >
void graphlab::distributed_control::broadcast ( U &  data,
bool  originator,
bool  control = false 
)
inline

This function allows one machine to broadcasts an object to all machines.

The originator calls broadcast with data provided in in 'data' and originator set to true. All other callers call with originator set to false.

The originator will then return 'data'. All other machines will receive the originator's transmission in the "data" parameter.

This call is guaranteed to have barrier-like behavior. That is to say, this call will block until all machines enter the broadcast function.

Example:

int i;
if (procid() == 0) {
// if I am machine 0, I broadcast the value i = 10 to all machines
i = 10;
dc.broadcast(i, true);
} else {
// all other machines receive the broadcast value
dc.broadcast(i, false);
}
// at this point, all machines have i = 10
Note:
Behavior is undefined if more than one machine calls broadcast with originator set to true.
Behavior is undefined if multiple threads on the same machine call broadcast simultaneously
Parameters:
dataIf this is the originator, this will contain the object to broadcast. Otherwise, this will be a reference to the object receiving the broadcast.
originatorSet to true if this is the source of the broadcast. Set to false otherwise.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1266 of file dc.hpp.

void graphlab::distributed_control::full_barrier ( )

A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete.

Similar to the barrier(), but provides additional guarantees that all calls issued prior to this barrier are completed before i returning.

Note:
This function could return prematurely if other threads are still issuing function calls since we cannot differentiate between calls issued before the barrier and calls issued while the barrier is being evaluated. Therefore, when used in a multithreaded scenario, the user must ensure that all other threads which may perform operations using this object are stopped before the full barrier is initated.
See also:
barrier

This barrier ensures globally across all machines that all calls issued prior to this barrier are completed before returning. This function could return prematurely if other threads are still issuing function calls since we cannot differentiate between calls issued before the barrier and calls issued while the barrier is being evaluated.

Definition at line 577 of file dc.cpp.

template<typename U >
void graphlab::distributed_control::gather ( std::vector< U > &  data,
procid_t  sendto,
bool  control = false 
)
inline

Collects information contributed by each machine onto one machine.

The goal is to collect some information from each machine onto a single target machine (sendto). To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling gather with the vector and the target machine will send the contributed value to the target. When the function returns, machine sendto will have the complete vector where data[i] is the data contributed by machine i.

Example:

// construct the vector of values
std::vector<int> values;
values.resize(dc.numprocs());
// set my contributed value
values[dc.procid()] = dc.procid();
dc.gather(values, 0);
// at this point machine 0 will have a vector with length equal to the
// number of processes, and containing values [0, 1, 2, ...]
// All other machines value vector will be unchanged.
Note:
Behavior is undefined machines call gather with different values for sendto
Behavior is undefined if multiple threads on the same machine call gather simultaneously
Parameters:
dataA vector of length equal to the number of processes. The information to communicate is in the entry data[procid()]
sendtoMachine which will hold the complete vector at the end of the operation. All machines must have the same value for this parameter.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1271 of file dc.hpp.

std::map<std::string, size_t> graphlab::distributed_control::gather_statistics ( )

Gather RPC statistics. All machines must call this function at the same time. However, only proc 0 will return values

unsigned char graphlab::distributed_control::new_sequentialization_key ( )
static

Creates a new sequentialization key, returning the old value.

All RPC calls made using the same key value (as long as the key is non-zero) will sequentialize. RPC calls made while the key value is 0 can be run in parallel in arbitrary order. However, since new_sequentialization_key() uses a very naive key selection system, we recommend the use of set_sequentialization_key().

User should

// ...
// ... do stuff
// ...

The key value is thread-local thus setting the key value in one thread does not affect the key value in another thread.

Definition at line 97 of file dc.cpp.

template<typename U >
void graphlab::distributed_control::recv_from ( procid_t  source,
U &  t,
bool  control = false 
)
inline

Waits to receives an object a source machine sent via send_to()

This function waits to receives a Serializable object "t" from a source machine. The source machine must send the object using send_to(). The source machine will wait for the target machine's recv_from() to complete before returning.

Example:

int i;
if (dc.procid() == 0) {
i = 10;
// if I am machine 0, I send the value i = 10 to machine 1
dc.send_to(1, i);
} else if (dc.procid() == 1) {
// machine 1 receives the value of i from machine 0
dc.recv_from(0, i);
}
// at this point machines 0 and 1 have the value i = 10
Template Parameters:
Uthe type of object to receive. This should be inferred by the compiler.
Parameters:
sourceThe target machine to receive from. This function will block until data is received.
tThe object to receive. It must be serializable and the type must match the source machine's call to send_to()
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the source machine's send_to() call.
Note:
Behavior is undefined if multiple threads on the same machine call recv_from simultaneously

Definition at line 1261 of file dc.hpp.

void graphlab::distributed_control::register_deletion_callback ( boost::function< void(void)>  deleter)
inline

Registers a callback which will be called on deletion of the distributed_control object.

This function is useful for distributed static variables which may be only be deleted after main().

Definition at line 327 of file dc.hpp.

void graphlab::distributed_control::remote_call ( procid_t  targetmachine,
Fn  fn,
  ... 
)

Performs a non-blocking RPC call to the target machine to run the provided function pointer.

remote_call() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.

remote_call() is non-blocking and does not wait for the target machine to complete execution of the function. Different remote_calls may be handled by different threads on the target machine and thus the target function should be made thread-safe. Alternatively, see set_sequentialization_key() to force sequentialization of groups of remote calls.

If blocking operation is desired, remote_request() may be used. Alternatively, a full_barrier() may also be used to wait for completion of all incomplete RPC calls.

Example:

// A print function is defined
void print(std::string s) {
std::cout << s << "\n";
}
... ...
// call the print function on machine 1 to print "hello"
dc.remote_call(1, print, "hello");
Parameters:
targetmachineThe ID of the machine to run the function on
fnThe function to run on the target machine
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
void graphlab::distributed_control::remote_call ( Iterator  machine_begin,
Iterator  machine_end,
Fn  fn,
  ... 
)

Performs a non-blocking RPC call to a collection of machines to run the provided function pointer.

This function calls the provided function pointer on a collection of machines contained in the iterator range [begin, end). Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.

This function is functionally equivalent to:

while(machine_begin != machine_end) {
remote_call(*machine_begin, fn, ...);
++machine_begin;
}

However, this function makes some optimizations to ensure all arguments are only serialized once instead of #calls times.

This function is non-blocking and does not wait for the target machines to complete execution of the function. Different remote_calls may be handled by different threads on the target machines and thus the target function should be made thread-safe. Alternatively, see set_sequentialization_key() to force sequentialization of groups of remote_calls. A full_barrier() may also be issued to wait for completion of all RPC calls issued prior to the full barrier.

Example:

// A print function is defined
void print(std::string s) {
std::cout << s << "\n";
}
... ...
// call the print function on machine 1, 3 and 5 to print "hello"
std::vector<procid_t> procs;
procs.push_back(1); procs.push_back(3); procs.push_back(5);
dc.remote_call(procs.begin(), procs.end(), print, "hello");
Parameters:
machine_beginThe beginning of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t.
machine_endThe end of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t.
fnThe function to run on the target machine
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
RetVal graphlab::distributed_control::remote_request ( procid_t  targetmachine,
Fn  fn,
  ... 
)

Performs a blocking RPC call to the target machine to run the provided function pointer.

remote_request() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, it is sent back to calling machine.

Unlike remote_call(), remote_request() is blocking and waits for the target machine to complete execution of the function. However, different remote_requests may be still be handled by different threads on the target machine.

Example:

// A print function is defined
int add_one(int i) {
return i + 1;
}
... ...
// call the add_one function on machine 1
int i = 10;
i = dc.remote_request(1, add_one, i);
// i will now be 11
Parameters:
targetmachineThe ID of the machine to run the function on
fnThe function to run on the target machine
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
Returns:
Returns the same return type as the function fn
template<typename U >
void graphlab::distributed_control::send_to ( procid_t  target,
U &  t,
bool  control = false 
)
inline

Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object.

This function sends a Serializable object "t" to the target machine, but waits for the target machine to call recv_from() before returning to receive the object before returning.

Example:

int i;
if (dc.procid() == 0) {
i = 10;
// if I am machine 0, I send the value i = 10 to machine 1
dc.send_to(1, i);
} else if (dc.procid() == 1) {
// machine 1 receives the value of i from machine 0
dc.recv_from(0, i);
}
// at this point machines 0 and 1 have the value i = 10
Template Parameters:
Uthe type of object to send. This should be inferred by the compiler.
Parameters:
targetThe target machine to send to. Target machine must call recv_from() before this call will return.
tThe object to send. It must be serializable. The type must match the target machine's call to recv_from()
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the target machine's recv_from() call.
Note:
Behavior is undefined if multiple threads on the same machine call send_to simultaneously

Definition at line 1256 of file dc.hpp.

unsigned char graphlab::distributed_control::set_sequentialization_key ( unsigned char  newkey)
static

Sets the sequentialization key to a new value, returning the previous value.

All RPC calls made using the same key value (as long as the key is non-zero) will sequentialize. RPC calls made while the key value is 0 can be run in parallel in arbitrary order.

// ...
// ... do stuff
// ...

The key value is thread-local thus setting the key value in one thread does not affect the key value in another thread.

Definition at line 89 of file dc.cpp.


The documentation for this class was generated from the following files: