GraphLab: Distributed Graph-Parallel API
2.1
|
The distributed control object is primary means of communication between the distributed GraphLab processes. More...
#include <graphlab/rpc/dc.hpp>
Public Member Functions | |
distributed_control () | |
distributed_control (dc_init_param initparam) | |
procid_t | procid () const |
returns the id of the current process | |
procid_t | numprocs () const |
returns the number of processes in total. | |
void | register_deletion_callback (boost::function< void(void)> deleter) |
Registers a callback which will be called on deletion of the distributed_control object. | |
void | remote_call (procid_t targetmachine, Fn fn,...) |
Performs a non-blocking RPC call to the target machine to run the provided function pointer. | |
void | remote_call (Iterator machine_begin, Iterator machine_end, Fn fn,...) |
Performs a non-blocking RPC call to a collection of machines to run the provided function pointer. | |
RetVal | remote_request (procid_t targetmachine, Fn fn,...) |
Performs a blocking RPC call to the target machine to run the provided function pointer. | |
size_t | calls_sent () const |
Returns the total number of RPC calls made. | |
double | mega_calls_sent () const |
Returns the total number of RPC calls made in millions. | |
size_t | calls_received () const |
Returns the total number of RPC calls received. | |
size_t | bytes_sent () const |
Returns the total number of bytes sent excluding headers and other control overhead. Also see network_bytes_sent() | |
size_t | network_bytes_sent () const |
Returns the total number of bytes sent including all headers and other control overhead. Also see bytes_sent() | |
double | network_megabytes_sent () const |
Returns the total number of megabytes sent including all headers and other control overhead. Also see network_bytes_sent() | |
size_t | bytes_received () const |
Returns the total number of bytes received excluding all headers and other control overhead. Also see bytes_sent(). | |
void | flush () |
Performs a local flush of all send buffers. | |
template<typename U > | |
void | send_to (procid_t target, U &t, bool control=false) |
Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object. | |
template<typename U > | |
void | recv_from (procid_t source, U &t, bool control=false) |
Waits to receives an object a source machine sent via send_to() | |
template<typename U > | |
void | broadcast (U &data, bool originator, bool control=false) |
This function allows one machine to broadcasts an object to all machines. | |
template<typename U > | |
void | gather (std::vector< U > &data, procid_t sendto, bool control=false) |
Collects information contributed by each machine onto one machine. | |
template<typename U > | |
void | all_gather (std::vector< U > &data, bool control=false) |
Sends some information contributed by each machine to all machines. | |
template<typename U > | |
void | all_reduce (U &data, bool control=false) |
Combines a value contributed by each machine, making the result available to all machines. | |
template<typename U , typename PlusEqual > | |
void | all_reduce2 (U &data, PlusEqual plusequal, bool control=false) |
Combines a value contributed by each machine, making the result available to all machines. | |
void | barrier () |
A distributed barrier which waits for all machines to call the barrier() function before proceeding. | |
void | full_barrier () |
A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete. | |
std::ostream & | cout () const |
A wrapper on cout, that outputs only on machine 0. | |
std::ostream & | cerr () const |
A wrapper on cerr, that outputs only on machine 0. | |
std::map< std::string, size_t > | gather_statistics () |
Static Public Member Functions | |
static unsigned char | set_sequentialization_key (unsigned char newkey) |
Sets the sequentialization key to a new value, returning the previous value. | |
static unsigned char | new_sequentialization_key () |
Creates a new sequentialization key, returning the old value. | |
static unsigned char | get_sequentialization_key () |
gets the current sequentialization key. This function is not generally useful. |
The distributed control object is primary means of communication between the distributed GraphLab processes.
The distributed_control object provides asynchronous, multi-threaded Remote Procedure Call (RPC) services to allow distributed GraphLab processes to communicate with each other. Currently, the only communication method implemented is TCP/IP. There are several ways of setting up the communication layer, but the most reliable, and the preferred method, is to "bootstrap" using MPI. See your local MPI documentation for details on how to launch MPI jobs.
To construct a distributed_control object, the simplest method is to just invoke the default constructor.
After which all distributed control services will operate correctly.
Each process is assigned a sequential process ID at starting at 0. i.e. The first process will have a process ID of 0, the second process will have an ID of 1, etc. distributed_control::procid() can be used to obtain the current machine's process ID, and distributed_control::numprocs() can be used to obtain the total number of processes.
The primary functions used to communicate between processes are distributed_control::remote_call() and distributed_control::remote_request(). These functions are thread-safe and can be called very rapidly as they only write into a local buffer. Communication is handled by a background thread. On the remote side, RPC calls are handled in parallel by a thread pool, and thus may be parallelized arbitrarily. Operations such as distributed_control::full_barrier(), or the sequentialization key can be used to get finer grained control over order of execution on the remote machine.
A few other additional helper functions are also provided to support "synchronous" modes of communication. These functions are not thread-safe and can only be called on one thread per machine. These functions block until all machines call the same function. For instance, if gather() is called on one machine, it will not return until all machines call gather().
To support Object Oriented Programming like methodologies, we allow the creation of Distributed Objects through graphlab::dc_dist_object. dc_dist_object allows a class to construct its own local copy of a distributed_control object allowing instances of the class to communicate with each other across the network.
See GraphLab RPC for usage examples.
graphlab::distributed_control::distributed_control | ( | ) |
|
explicit |
Passes custom constructed initialization parameters in dc_init_param
Though dc_init_param can be obtained from environment variables using dc_init_from_env() or from MPI using dc_init_from_mpi(), using the default constructor is prefered.
|
inline |
Sends some information contributed by each machine to all machines.
The goal is to have each machine broadcast a piece of information to all machines. This is like gather(), but all machines have the complete vector at the end. To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling all_gather with the vector will result in all machines having a complete copy of the vector containing all contributions (entry 0 from machine 0, entry 1 from machine 1, etc).
Example:
data | A vector of length equal to the number of processes. The information to communicate is in the entry data[procid()] |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines. |
|
inline |
Combines a value contributed by each machine, making the result available to all machines.
Each machine calls all_reduce() with a object which is serializable and has operator+= implemented. When all_reduce() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine.
Example:
data | A piece of data to perform a reduction over. The type must implement operator+=. |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines. |
|
inline |
Combines a value contributed by each machine, making the result available to all machines.
This function is equivalent to all_reduce(), but with an externally defined PlusEqual function.
Each machine calls all_reduce() with a object which is serializable and a function "plusequal" which combines two instances of the object. When all_reduce2() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine using the plusequal function.
Where U is the type of the object, the plusequal function must be of the form:
and must implement the equivalent of left += right;
Example:
data | A piece of data to perform a reduction over. |
plusequal | A plusequal function on the data. Must have the prototype void plusequal(U&, const U&) |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines. |
void graphlab::distributed_control::barrier | ( | ) |
A distributed barrier which waits for all machines to call the barrier() function before proceeding.
A machine calling the barrier() will wait until every machine reaches this barrier before continuing. Only one thread from each machine should call the barrier.
|
inline |
This function allows one machine to broadcasts an object to all machines.
The originator calls broadcast with data provided in in 'data' and originator set to true. All other callers call with originator set to false.
The originator will then return 'data'. All other machines will receive the originator's transmission in the "data" parameter.
This call is guaranteed to have barrier-like behavior. That is to say, this call will block until all machines enter the broadcast function.
Example:
data | If this is the originator, this will contain the object to broadcast. Otherwise, this will be a reference to the object receiving the broadcast. |
originator | Set to true if this is the source of the broadcast. Set to false otherwise. |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines. |
void graphlab::distributed_control::full_barrier | ( | ) |
A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete.
Similar to the barrier(), but provides additional guarantees that all calls issued prior to this barrier are completed before i returning.
This barrier ensures globally across all machines that all calls issued prior to this barrier are completed before returning. This function could return prematurely if other threads are still issuing function calls since we cannot differentiate between calls issued before the barrier and calls issued while the barrier is being evaluated.
|
inline |
Collects information contributed by each machine onto one machine.
The goal is to collect some information from each machine onto a single target machine (sendto). To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling gather with the vector and the target machine will send the contributed value to the target. When the function returns, machine sendto will have the complete vector where data[i] is the data contributed by machine i.
Example:
data | A vector of length equal to the number of processes. The information to communicate is in the entry data[procid()] |
sendto | Machine which will hold the complete vector at the end of the operation. All machines must have the same value for this parameter. |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines. |
std::map<std::string, size_t> graphlab::distributed_control::gather_statistics | ( | ) |
Gather RPC statistics. All machines must call this function at the same time. However, only proc 0 will return values
|
static |
Creates a new sequentialization key, returning the old value.
All RPC calls made using the same key value (as long as the key is non-zero) will sequentialize. RPC calls made while the key value is 0 can be run in parallel in arbitrary order. However, since new_sequentialization_key() uses a very naive key selection system, we recommend the use of set_sequentialization_key().
User should
The key value is thread-local thus setting the key value in one thread does not affect the key value in another thread.
|
inline |
Waits to receives an object a source machine sent via send_to()
This function waits to receives a Serializable object "t" from a source machine. The source machine must send the object using send_to(). The source machine will wait for the target machine's recv_from() to complete before returning.
Example:
U | the type of object to receive. This should be inferred by the compiler. |
source | The target machine to receive from. This function will block until data is received. |
t | The object to receive. It must be serializable and the type must match the source machine's call to send_to() |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the source machine's send_to() call. |
|
inline |
Registers a callback which will be called on deletion of the distributed_control object.
This function is useful for distributed static variables which may be only be deleted after main().
void graphlab::distributed_control::remote_call | ( | procid_t | targetmachine, |
Fn | fn, | ||
... | |||
) |
Performs a non-blocking RPC call to the target machine to run the provided function pointer.
remote_call() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.
remote_call() is non-blocking and does not wait for the target machine to complete execution of the function. Different remote_calls may be handled by different threads on the target machine and thus the target function should be made thread-safe. Alternatively, see set_sequentialization_key() to force sequentialization of groups of remote calls.
If blocking operation is desired, remote_request() may be used. Alternatively, a full_barrier() may also be used to wait for completion of all incomplete RPC calls.
Example:
targetmachine | The ID of the machine to run the function on |
fn | The function to run on the target machine |
... | The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types. |
void graphlab::distributed_control::remote_call | ( | Iterator | machine_begin, |
Iterator | machine_end, | ||
Fn | fn, | ||
... | |||
) |
Performs a non-blocking RPC call to a collection of machines to run the provided function pointer.
This function calls the provided function pointer on a collection of machines contained in the iterator range [begin, end). Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.
This function is functionally equivalent to:
However, this function makes some optimizations to ensure all arguments are only serialized once instead of #calls times.
This function is non-blocking and does not wait for the target machines to complete execution of the function. Different remote_calls may be handled by different threads on the target machines and thus the target function should be made thread-safe. Alternatively, see set_sequentialization_key() to force sequentialization of groups of remote_calls. A full_barrier() may also be issued to wait for completion of all RPC calls issued prior to the full barrier.
Example:
machine_begin | The beginning of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t. |
machine_end | The end of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t. |
fn | The function to run on the target machine |
... | The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types. |
RetVal graphlab::distributed_control::remote_request | ( | procid_t | targetmachine, |
Fn | fn, | ||
... | |||
) |
Performs a blocking RPC call to the target machine to run the provided function pointer.
remote_request() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, it is sent back to calling machine.
Unlike remote_call(), remote_request() is blocking and waits for the target machine to complete execution of the function. However, different remote_requests may be still be handled by different threads on the target machine.
Example:
targetmachine | The ID of the machine to run the function on |
fn | The function to run on the target machine |
... | The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types. |
|
inline |
Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object.
This function sends a Serializable object "t" to the target machine, but waits for the target machine to call recv_from() before returning to receive the object before returning.
Example:
U | the type of object to send. This should be inferred by the compiler. |
target | The target machine to send to. Target machine must call recv_from() before this call will return. |
t | The object to send. It must be serializable. The type must match the target machine's call to recv_from() |
control | Optional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the target machine's recv_from() call. |
|
static |
Sets the sequentialization key to a new value, returning the previous value.
All RPC calls made using the same key value (as long as the key is non-zero) will sequentialize. RPC calls made while the key value is 0 can be run in parallel in arbitrary order.
The key value is thread-local thus setting the key value in one thread does not affect the key value in another thread.