GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
graphlab::dc_dist_object< T > Class Template Reference

Provides a class with its own distributed communication context, allowing instances of the class to communicate with other remote instances. More...

#include <graphlab/rpc/dc_dist_object.hpp>

List of all members.

Public Member Functions

 dc_dist_object (distributed_control &dc_, T *owner)
 Constructs a distributed object context.
size_t calls_received () const
 The number of function calls received by this object.
size_t calls_sent () const
 The number of function calls sent from this object.
size_t bytes_sent () const
 The number of bytes sent from this object, excluding headers and other control overhead.
distributed_controldc ()
 A reference to the underlying distributed_control object.
const distributed_controldc () const
 A const reference to the underlying distributed_control object.
procid_t procid () const
 The current process ID.
procid_t numprocs () const
 The number of processes in the distributed program.
std::ostream & cout () const
 A wrapper on cout, that outputs only on machine 0.
std::ostream & cerr () const
 A wrapper on cerr, that outputs only on machine 0.
void remote_call (procid_t targetmachine, Fn fn,...)
 Performs a non-blocking RPC call to the target machine to run the provided function pointer.
void remote_call (Iterator machine_begin, Iterator machine_end, Fn fn,...)
 Performs a non-blocking RPC call to a collection of machines to run the provided function pointer.
RetVal remote_request (procid_t targetmachine, Fn fn,...)
 Performs a blocking RPC call to the target machine to run the provided function pointer.
RetVal future_remote_request (procid_t targetmachine, Fn fn,...)
 Performs a nonblocking RPC call to the target machine to run the provided function pointer which has an expected return value.
template<typename U >
void send_to (procid_t target, U &t, bool control=false)
 Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object.
template<typename U >
void recv_from (procid_t source, U &t, bool control=false)
 Waits to receives an object a source machine sent via send_to()
template<typename U >
void broadcast (U &data, bool originator, bool control=false)
 This function allows one machine to broadcasts an object to all machines.
template<typename U >
void gather (std::vector< U > &data, procid_t sendto, bool control=false)
 Collects information contributed by each machine onto one machine.
template<typename U >
void all_gather (std::vector< U > &data, bool control=false)
 Sends some information contributed by each machine to all machines.
template<typename U , typename PlusEqual >
void all_reduce2 (U &data, PlusEqual plusequal, bool control=false)
 Combines a value contributed by each machine, making the result available to all machines.
template<typename U >
void all_reduce (U &data, bool control=false)
 Combines a value contributed by each machine, making the result available to all machines.
template<typename U >
void all_to_all (std::vector< U > &data, bool control=false)
void barrier ()
 A distributed barrier which waits for all machines to call the barrier() function before proceeding.
void full_barrier ()
 A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete.
std::map< std::string, size_t > gather_statistics ()

Detailed Description

template<typename T>
class graphlab::dc_dist_object< T >

Provides a class with its own distributed communication context, allowing instances of the class to communicate with other remote instances.

The philosophy behind the dc_dist_object is the concept of "distributed objects". The idea is that the user should be able to write code:

void main() {
// ... initialization of a distributed_control object dc ...
distributed_vector vec(dc), vec2(dc);
distributed_graph g(dc);
}

where if run in a distributed setting, the "vec" variable, can behave as if it is a single distributed object, and automatically coordinate its operations across the network; communicating with the other instances of "vec" on the other machines. Essentially, each object (vec, vec2 and g) constructs its own private communication context, which allows every machine's "vec" variable to communicate only with other machine's "vec" variable. And similarly for "vec2" and "g". This private communication context is provided by this dc_dist_object class.

To construct a distributed object requires little work:

class distributed_int_vector {
private:
// creates a local dc_dist_object context
public:
// context must be initialized on construction with the
// root distributed_control object
distributed_int_vector(distributed_control& dc): rmi(dc, this) {
... other initialization ...
// make sure all machines finish constructing this object
// before continuing
rmi.barrier();
}
};

After which remote_call(), and remote_request() can be used to communicate across the network with the same matching instance of the distributed_int_vector.

Each dc_dist_object maintains its own private communication context which is not influences by other communication contexts. In other words, the rmi.barrier(), and all other operations in each instance of the distributed_int_vector are independent of each other. In particular, the rmi.full_barrier() only waits for completion of all RPC calls from within the current communication context.

See the examples in GraphLab RPC for more usage examples.

Note:
While there is no real limit to the number of distributed objects that can be created. However, each dc_dist_object does contain a reasonably large amount of state, so frequent construction and deletion of objects is not recommended.

Definition at line 115 of file dc_dist_object.hpp.


Constructor & Destructor Documentation

template<typename T>
graphlab::dc_dist_object< T >::dc_dist_object ( distributed_control dc_,
T *  owner 
)
inline

Constructs a distributed object context.

The constructor constructs a distributed object context which is associated with the "owner" object.

Parameters:
dc_The root distributed_control which provides the communication control plane.
ownerThe object to associate with

Definition at line 199 of file dc_dist_object.hpp.


Member Function Documentation

template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::all_gather ( std::vector< U > &  data,
bool  control = false 
)
inline

Sends some information contributed by each machine to all machines.

The goal is to have each machine broadcast a piece of information to all machines. This is like gather(), but all machines have the complete vector at the end. To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling all_gather with the vector will result in all machines having a complete copy of the vector containing all contributions (entry 0 from machine 0, entry 1 from machine 1, etc).Example:

// construct the vector of values
std::vector<int> values;
values.resize(dc.numprocs());
// set my contributed value
values[dc.procid()] = dc.procid();
dc.all_gather(values);
// at this point all machine will have a vector with length equal to the
// number of processes, and containing values [0, 1, 2, ...]
Note:
Behavior is undefined if multiple threads on the same machine call all_gather simultaneously
Parameters:
dataA vector of length equal to the number of processes. The information to communicate is in the entry data[procid()]
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1000 of file dc_dist_object.hpp.

template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::all_reduce ( U &  data,
bool  control = false 
)
inline

Combines a value contributed by each machine, making the result available to all machines.

Each machine calls all_reduce() with a object which is serializable and has operator+= implemented. When all_reduce() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine.Example:

int i = 1;
dc.all_reduce(i);
// since each machine contributed the value "1",
// all machines will have i = numprocs() here.
Parameters:
dataA piece of data to perform a reduction over. The type must implement operator+=.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1243 of file dc_dist_object.hpp.

template<typename T>
template<typename U , typename PlusEqual >
void graphlab::dc_dist_object< T >::all_reduce2 ( U &  data,
PlusEqual  plusequal,
bool  control = false 
)
inline

Combines a value contributed by each machine, making the result available to all machines.

This function is equivalent to all_reduce(), but with an externally defined PlusEqual function.Each machine calls all_reduce() with a object which is serializable and a function "plusequal" which combines two instances of the object. When all_reduce2() returns, the "data" variable will contain a value corresponding to adding up the objects contributed by each machine using the plusequal function.Where U is the type of the object, the plusequal function must be of the form:

void plusequal(U& left, const U& right);

and must implement the equivalent of left += right; Example:

void int_plus_equal(int& a, const int& b) {
a+=b;
}
int i = 1;
dc.all_reduce2(i, int_plus_equal);
// since each machine contributed the value "1",
// all machines will have i = numprocs() here.
Parameters:
dataA piece of data to perform a reduction over.
plusequalA plusequal function on the data. Must have the prototype void plusequal(U&, const U&)
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 1132 of file dc_dist_object.hpp.

template<typename T>
void graphlab::dc_dist_object< T >::barrier ( )
inline

A distributed barrier which waits for all machines to call the barrier() function before proceeding.

A machine calling the barrier() will wait until every machine reaches this barrier before continuing. Only one thread from each machine should call the barrier.

See also:
full_barrier

Definition at line 1358 of file dc_dist_object.hpp.

template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::broadcast ( U &  data,
bool  originator,
bool  control = false 
)
inline

This function allows one machine to broadcasts an object to all machines.

The originator calls broadcast with data provided in in 'data' and originator set to true. All other callers call with originator set to false.The originator will then return 'data'. All other machines will receive the originator's transmission in the "data" parameter.This call is guaranteed to have barrier-like behavior. That is to say, this call will block until all machines enter the broadcast function.Example:

int i;
if (procid() == 0) {
// if I am machine 0, I broadcast the value i = 10 to all machines
i = 10;
dc.broadcast(i, true);
} else {
// all other machines receive the broadcast value
dc.broadcast(i, false);
}
// at this point, all machines have i = 10
Note:
Behavior is undefined if more than one machine calls broadcast with originator set to true.
Behavior is undefined if multiple threads on the same machine call broadcast simultaneously
Parameters:
dataIf this is the originator, this will contain the object to broadcast. Otherwise, this will be a reference to the object receiving the broadcast.
originatorSet to true if this is the source of the broadcast. Set to false otherwise.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 826 of file dc_dist_object.hpp.

template<typename T>
void graphlab::dc_dist_object< T >::full_barrier ( )
inline

A distributed barrier which waits for all machines to call the full_barrier() function before proceeding. Also waits for all previously issued remote calls to complete.

Similar to the barrier(), but provides additional guarantees that all calls issued prior to this barrier are completed before i returning.

Note:
This function could return prematurely if other threads are still issuing function calls since we cannot differentiate between calls issued before the barrier and calls issued while the barrier is being evaluated. Therefore, when used in a multithreaded scenario, the user must ensure that all other threads which may perform operations using this object are stopped before the full barrier is initated.
See also:
barrier

This barrier ensures globally across all machines that all calls issued prior to this barrier are completed before returning. This function could return prematurely if other threads are still issuing function calls since we cannot differentiate between calls issued before the barrier and calls issued while the barrier is being evaluated.

Definition at line 1428 of file dc_dist_object.hpp.

template<typename T>
RetVal graphlab::dc_dist_object< T >::future_remote_request ( procid_t  targetmachine,
Fn  fn,
  ... 
)

Performs a nonblocking RPC call to the target machine to run the provided function pointer which has an expected return value.

future_remote_request() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, it is sent back to calling machine.

future_remote_request() is like remote_request(), but is non-blocking. Instead, it returns immediately a graphlab::request_future object which will allow you wait for the return value.

Example:

// A print function is defined in the distributed object
class distributed_obj_example {
... initialization and constructor ...
private:
int add_one(int i) {
return i + 1;
}
public:
int add_one_from_machine_1(int i) {
// calls the add_one function on machine 1 with the argument i
// this call returns immediately
rmi.future_remote_request(1, &distributed_obj_example::add_one, i);
// ... we can do other stuff here
// then when we want the answer
int result = future();
return result;
}
}
Parameters:
targetmachineThe ID of the machine to run the function on
fnThe function to run on the target machine. Must be a pointer to member function in the owning object.
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
Returns:
Returns the same return type as the function fn
template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::gather ( std::vector< U > &  data,
procid_t  sendto,
bool  control = false 
)
inline

Collects information contributed by each machine onto one machine.

The goal is to collect some information from each machine onto a single target machine (sendto). To accomplish this, each machine constructs a vector of length numprocs(), and stores the data to communicate in the procid()'th entry in the vector. Then calling gather with the vector and the target machine will send the contributed value to the target. When the function returns, machine sendto will have the complete vector where data[i] is the data contributed by machine i.Example:

// construct the vector of values
std::vector<int> values;
values.resize(dc.numprocs());
// set my contributed value
values[dc.procid()] = dc.procid();
dc.gather(values, 0);
// at this point machine 0 will have a vector with length equal to the
// number of processes, and containing values [0, 1, 2, ...]
// All other machines value vector will be unchanged.
Note:
Behavior is undefined machines call gather with different values for sendto
Behavior is undefined if multiple threads on the same machine call gather simultaneously
Parameters:
dataA vector of length equal to the number of processes. The information to communicate is in the entry data[procid()]
sendtoMachine which will hold the complete vector at the end of the operation. All machines must have the same value for this parameter.
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must be the same on all machines.

Definition at line 885 of file dc_dist_object.hpp.

template<typename T>
std::map<std::string, size_t> graphlab::dc_dist_object< T >::gather_statistics ( )
inline

Gather RPC statistics. All machines must call this function at the same time. However, only proc 0 will return values

Definition at line 1488 of file dc_dist_object.hpp.

template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::recv_from ( procid_t  source,
U &  t,
bool  control = false 
)
inline

Waits to receives an object a source machine sent via send_to()

This function waits to receives a Serializable object "t" from a source machine. The source machine must send the object using send_to(). The source machine will wait for the target machine's recv_from() to complete before returning.Example:

int i;
if (dc.procid() == 0) {
i = 10;
// if I am machine 0, I send the value i = 10 to machine 1
dc.send_to(1, i);
} else if (dc.procid() == 1) {
// machine 1 receives the value of i from machine 0
dc.recv_from(0, i);
}
// at this point machines 0 and 1 have the value i = 10
Template Parameters:
Uthe type of object to receive. This should be inferred by the compiler.
Parameters:
sourceThe target machine to receive from. This function will block until data is received.
tThe object to receive. It must be serializable and the type must match the source machine's call to send_to()
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the source machine's send_to() call.
Note:
Behavior is undefined if multiple threads on the same machine call recv_from simultaneously

Definition at line 773 of file dc_dist_object.hpp.

template<typename T>
void graphlab::dc_dist_object< T >::remote_call ( procid_t  targetmachine,
Fn  fn,
  ... 
)

Performs a non-blocking RPC call to the target machine to run the provided function pointer.

remote_call() calls the function "fn" on a target remote machine. "fn" may be public, private or protected within the owner class; there are no access restrictions. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.

remote_call() is non-blocking and does not wait for the target machine to complete execution of the function. Different remote_calls may be handled by different threads on the target machine and thus the target function should be made thread-safe. Alternatively, see distributed_control::set_sequentialization_key() to force sequentialization of groups of remote calls.

If blocking operation is desired, remote_request() may be used. Alternatively, a full_barrier() may also be used to wait for completion of all incomplete RPC calls.

Example:

// A print function is defined in the distributed object
class distributed_obj_example {
... initialization and constructor ...
private:
void print(std::string s) {
std::cout << s << "\n";
}
public:
void print_on_machine_one(std::string s) {
// calls the print function on machine 1 with the argument "s"
rmi.remote_call(1, &distributed_obj_example::print, s);
}
}

Note the syntax for obtaining a pointer to a member function.

Parameters:
targetmachineThe ID of the machine to run the function on
fnThe function to run on the target machine. Must be a pointer to member function in the owning object.
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
template<typename T>
void graphlab::dc_dist_object< T >::remote_call ( Iterator  machine_begin,
Iterator  machine_end,
Fn  fn,
  ... 
)

Performs a non-blocking RPC call to a collection of machines to run the provided function pointer.

This function calls the provided function pointer on a collection of machines contained in the iterator range [begin, end). Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, the return value is lost.

This function is functionally equivalent to:

while(machine_begin != machine_end) {
remote_call(*machine_begin, fn, ...);
++machine_begin;
}

However, this function makes some optimizations to ensure all arguments are only serialized once instead of #calls times.

This function is non-blocking and does not wait for the target machines to complete execution of the function. Different remote_calls may be handled by different threads on the target machines and thus the target function should be made thread-safe. Alternatively, see distributed_control::set_sequentialization_key() to force sequentialization of groups of remote_calls. A full_barrier() may also be issued to wait for completion of all RPC calls issued prior to the full barrier.

Example:

// A print function is defined in the distributed object
class distributed_obj_example {
... initialization and constructor ...
private:
void print(std::string s) {
std::cout << s << "\n";
}
public:
void print_on_some_machines(std::string s) {
std::vector<procid_t> procs;
procs.push_back(1); procs.push_back(3); procs.push_back(5);
// calls the print function on machine 1,3,5 with the argument "s"
rmi.remote_call(procs.begin(), procs.end(),
&distributed_obj_example::print, s);
}
}
Parameters:
machine_beginThe beginning of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t.
machine_endThe end of an iterator range containing a list machines to call. Iterator::value_type must be castable to procid_t.
fnThe function to run on the target machine. Must be a pointer to member function in the owning object.
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
template<typename T>
RetVal graphlab::dc_dist_object< T >::remote_request ( procid_t  targetmachine,
Fn  fn,
  ... 
)

Performs a blocking RPC call to the target machine to run the provided function pointer.

remote_request() calls the function "fn" on a target remote machine. Provided arguments are serialized and sent to the target. Therefore, all arguments are necessarily transmitted by value. If the target function has a return value, it is sent back to calling machine.

Unlike remote_call(), remote_request() is blocking and waits for the target machine to complete execution of the function. However, different remote_requests may be still be handled by different threads on the target machine.

Example:

// A print function is defined in the distributed object
class distributed_obj_example {
... initialization and constructor ...
private:
int add_one(int i) {
return i + 1;
}
public:
int add_one_from_machine_1(int i) {
// calls the add_one function on machine 1 with the argument i
return rmi.remote_request(1, &distributed_obj_example::add_one, i);
}
}
Parameters:
targetmachineThe ID of the machine to run the function on
fnThe function to run on the target machine. Must be a pointer to member function in the owning object.
...The arguments to send to Fn. Arguments must be serializable. and must be castable to the target types.
Returns:
Returns the same return type as the function fn
template<typename T>
template<typename U >
void graphlab::dc_dist_object< T >::send_to ( procid_t  target,
U &  t,
bool  control = false 
)
inline

Sends an object to a target machine and blocks until the target machine calls recv_from() to receive the object.

This function sends a Serializable object "t" to the target machine, but waits for the target machine to call recv_from() before returning to receive the object before returning.Example:

int i;
if (dc.procid() == 0) {
i = 10;
// if I am machine 0, I send the value i = 10 to machine 1
dc.send_to(1, i);
} else if (dc.procid() == 1) {
// machine 1 receives the value of i from machine 0
dc.recv_from(0, i);
}
// at this point machines 0 and 1 have the value i = 10
Template Parameters:
Uthe type of object to send. This should be inferred by the compiler.
Parameters:
targetThe target machine to send to. Target machine must call recv_from() before this call will return.
tThe object to send. It must be serializable. The type must match the target machine's call to recv_from()
controlOptional parameter. Defaults to false. If set to true, this will marked as control plane communication and will not register in bytes_received() or bytes_sent(). This must match the "control" parameter on the target machine's recv_from() call.
Note:
Behavior is undefined if multiple threads on the same machine call send_to simultaneously

Definition at line 744 of file dc_dist_object.hpp.


The documentation for this class was generated from the following file: