GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_dist_object.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <graphlab/rpc/dc.hpp>
25 
26 #ifndef GRAPHLAB_DC_DIST_OBJECT_HPP
27 #define GRAPHLAB_DC_DIST_OBJECT_HPP
28 #include <vector>
29 #include <string>
30 #include <set>
31 #include <graphlab/parallel/atomic.hpp>
32 #include <graphlab/rpc/dc_internal_types.hpp>
33 #include <graphlab/rpc/dc_dist_object_base.hpp>
34 #include <graphlab/rpc/object_request_issue.hpp>
35 #include <graphlab/rpc/object_call_issue.hpp>
36 #include <graphlab/rpc/object_podcall_issue.hpp>
37 #include <graphlab/rpc/object_broadcast_issue.hpp>
38 #include <graphlab/rpc/object_podcall_broadcast_issue.hpp>
39 #include <graphlab/rpc/function_ret_type.hpp>
40 #include <graphlab/rpc/mem_function_arg_types_def.hpp>
41 #include <graphlab/util/charstream.hpp>
42 #include <boost/preprocessor.hpp>
43 #include <graphlab/util/tracepoint.hpp>
44 #include <graphlab/macros_def.hpp>
45 
46 #define BARRIER_BRANCH_FACTOR 128
47 
48 
49 namespace graphlab {
50 
51 
52 /**
53 \ingroup rpc
54 \brief Provides a class with its own distributed communication context, allowing
55 instances of the class to communicate with other remote instances.
56 
57 The philosophy behind the dc_dist_object is the concept of "distributed
58 objects". The idea is that the user should be able to write code:
59 
60 \code
61 void main() {
62  // ... initialization of a distributed_control object dc ...
63 
64  distributed_vector vec(dc), vec2(dc);
65  distributed_graph g(dc);
66 }
67 \endcode
68 where if run in a distributed setting, the "vec" variable, can behave as if it
69 is a single distributed object, and automatically coordinate its operations
70 across the network; communicating with the other instances of "vec" on the
71 other machines. Essentially, each object (vec, vec2 and g) constructs its own
72 private communication context, which allows every machine's "vec" variable to
73 communicate only with other machine's "vec" variable. And similarly for "vec2"
74 and "g". This private communication context is provided by this dc_dist_object
75 class.
76 
77 To construct a distributed object requires little work:
78 \code
79 class distributed_int_vector {
80  private:
81  // creates a local dc_dist_object context
82  graphlab::dc_dist_object<distributed_int_vector> rmi;
83 
84  public:
85  // context must be initialized on construction with the
86  // root distributed_control object
87  distributed_int_vector(distributed_control& dc): rmi(dc, this) {
88  ... other initialization ...
89  // make sure all machines finish constructing this object
90  // before continuing
91  rmi.barrier();
92  }
93 };
94 \endcode
95 
96 After which remote_call(), and remote_request() can be used to communicate
97 across the network with the same matching instance of the
98 distributed_int_vector.
99 
100 Each dc_dist_object maintains its own private communication context which
101 is not influences by other communication contexts. In other words, the
102 <code>rmi.barrier()</code>, and all other operations in each instance of the
103 distributed_int_vector are independent of each other. In particular, the
104 <code>rmi.full_barrier()</code> only waits for completion of all RPC calls
105 from within the current communication context.
106 
107 See the examples in \ref RPC for more usage examples.
108 
109 \note While there is no real limit to the number of distributed
110 objects that can be created. However, each dc_dist_object does contain
111 a reasonably large amount of state, so frequent construction and deletion
112 of objects is not recommended.
113 */
114 template <typename T>
115 class dc_dist_object : public dc_impl::dc_dist_object_base{
116  private:
117  distributed_control &dc_;
118  size_t obj_id;
119  size_t control_obj_id; // object id of this object
120  T* owner;
121  std::vector<atomic<size_t> > callsreceived;
122  std::vector<atomic<size_t> > callssent;
123  std::vector<atomic<size_t> > bytessent;
124  // make operator= private
125  dc_dist_object<T>& operator=(const dc_dist_object<T> &d) {return *this;}
126  friend class distributed_control;
127 
128 
129  DECLARE_TRACER(distobj_remote_call_time);
130 
131 
132  public:
133 
134  /// \cond GRAPHLAB_INTERNAL
135 
136  /// Should not be used by the user
137  void inc_calls_received(procid_t p) {
138  if (!full_barrier_in_effect) {
139  size_t t = callsreceived[p].inc();
140  if (full_barrier_in_effect) {
141  if (t == calls_to_receive[p]) {
142  // if it was me who set the bit
143  if (procs_complete.set_bit(p) == false) {
144  // then decrement the incomplete count.
145  // if it was me to decreased it to 0
146  // lock and signal
147  full_barrier_lock.lock();
148  if (num_proc_recvs_incomplete.dec() == 0) {
149  full_barrier_cond.signal();
150  }
151  full_barrier_lock.unlock();
152  }
153  }
154  }
155  }
156  else {
157  //check the proc I just incremented.
158  // If I just exceeded the required size, I need
159  // to decrement the full barrier counter
160  if (callsreceived[p].inc() == calls_to_receive[p]) {
161  // if it was me who set the bit
162  if (procs_complete.set_bit(p) == false) {
163  // then decrement the incomplete count.
164  // if it was me to decreased it to 0
165  // lock and signal
166  full_barrier_lock.lock();
167  if (num_proc_recvs_incomplete.dec() == 0) {
168  full_barrier_cond.signal();
169  }
170  full_barrier_lock.unlock();
171  }
172  }
173  }
174  }
175 
176  /// Should not be used by the user
177  void inc_calls_sent(procid_t p) {
178  callssent[p].inc();
179  }
180 
181  /// Should not be used by the user
182  void inc_bytes_sent(procid_t p, size_t bytes) {
183  bytessent[p].inc(bytes);
184  }
185 
186  /// \endcond GRAPHLAB_INTERNAL
187  public:
188 
189  /**
190  * \brief Constructs a distributed object context.
191  *
192  * The constructor constructs a distributed object context which is
193  * associated with the "owner" object.
194  *
195  * \param dc_ The root distributed_control which provides the
196  * communication control plane.
197  * \param owner The object to associate with
198  */
200  dc_(dc_),owner(owner) {
201  callssent.resize(dc_.numprocs());
202  callsreceived.resize(dc_.numprocs());
203  bytessent.resize(dc_.numprocs());
204  //------ Initialize the matched send/recv ------
205  recv_froms.resize(dc_.numprocs());
206  //------ Initialize the gatherer ------
207  gather_receive.resize(dc_.numprocs());
208 
209 
210  //------- Initialize the Barrier ----------
211  child_barrier_counter.value = 0;
212  barrier_sense = 1;
213  barrier_release = -1;
214 
215 
216  // compute my children
217  childbase = size_t(dc_.procid()) * BARRIER_BRANCH_FACTOR + 1;
218  if (childbase >= dc_.numprocs()) {
219  numchild = 0;
220  }
221  else {
222  size_t maxchild = std::min<size_t>(dc_.numprocs(),
223  childbase + BARRIER_BRANCH_FACTOR);
224  numchild = (procid_t)(maxchild - childbase);
225  }
226 
227  parent = (procid_t)((dc_.procid() - 1) / BARRIER_BRANCH_FACTOR) ;
228 
229  //-------- Initialize all gather --------------
230  ab_child_barrier_counter.value = 0;
231  ab_barrier_sense = 1;
232  ab_barrier_release = -1;
233 
234 
235  //-------- Initialize the full barrier ---------
236 
237  full_barrier_in_effect = false;
238  procs_complete.resize(dc_.numprocs());
239 
240  // register
241  obj_id = dc_.register_object(owner, this);
242  control_obj_id = dc_.register_object(this, this);
243 
244  //-------- Initialize Tracer
245  std::string name = typeid(T).name();
246  INITIALIZE_TRACER(distobj_remote_call_time,
247  std::string("dc_dist_object ") + name + ": remote_call time");
248  }
249 
250  /// \brief The number of function calls received by this object
251  size_t calls_received() const {
252  size_t ctr = 0;
253  for (size_t i = 0;i < numprocs(); ++i) {
254  ctr += callsreceived[i].value;
255  }
256  return ctr;
257  }
258 
259  /// \brief The number of function calls sent from this object
260  size_t calls_sent() const {
261  size_t ctr = 0;
262  for (size_t i = 0;i < numprocs(); ++i) {
263  ctr += callssent[i].value;
264  }
265  return ctr;
266  }
267 
268  /** \brief The number of bytes sent from this object, excluding
269  * headers and other control overhead.
270  */
271  size_t bytes_sent() const {
272  size_t ctr = 0;
273  for (size_t i = 0;i < numprocs(); ++i) {
274  ctr += bytessent[i].value;
275  }
276  return ctr;
277  }
278 
279  /// \brief A reference to the underlying distributed_control object
281  return dc_;
282  }
283 
284  /// \brief A const reference to the underlying distributed_control object
285  const distributed_control& dc() const {
286  return dc_;
287  }
288 
289  /// \brief The current process ID
290  inline procid_t procid() const {
291  return dc_.procid();
292  }
293 
294  /// \brief The number of processes in the distributed program.
295  inline procid_t numprocs() const {
296  return dc_.numprocs();
297  }
298 
299  /**
300  * \brief A wrapper on cout, that outputs only on machine 0
301  */
302  std::ostream& cout() const {
303  return dc_.cout();
304  }
305 
306  /**
307  * \brief A wrapper on cerr, that outputs only on machine 0
308  */
309  std::ostream& cerr() const {
310  return dc_.cout();
311  }
312 
313  /// \cond GRAPHLAB_INTERNAL
314 
315  /*
316  This generates the interface functions for the standard calls, basic calls
317  The function looks like this:
318  \code
319  template<typename F , typename T0> void remote_call (procid_t target, F remote_function , T0 i0 )
320  {
321  ASSERT_LT(target, dc_.senders.size());
322  if ((STANDARD_CALL & CONTROL_PACKET) == 0) inc_calls_sent(target);
323  dc_impl::object_call_issue1 <T, F , T0> ::exec(dc_.senders[target],
324  STANDARD_CALL,
325  target,obj_id,
326  remote_function ,
327  i0 );
328  }
329 
330  The argument to the RPC_INTERFACE_GENERATOR are:
331  - the name of the rpc call ("remote_call" in the first one)
332  - the name of the issueing processor ("object_call_issue")
333  - The flags to set on the call ("STANDARD_CALL")
334 
335  The call can be issued with
336  rmi.remote_call(target,
337  &object_type::function_name,
338  arg1,
339  arg2...)
340  \endcode
341  */
342  #define GENARGS(Z,N,_) BOOST_PP_CAT(T, N) BOOST_PP_CAT(i, N)
343  #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
344  #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
345  #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
346 
347  #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
348  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
349  void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
350  ASSERT_LT(target, dc_.senders.size()); \
351  BEGIN_TRACEPOINT(distobj_remote_call_time); \
352  if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
353  BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
354  <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
355  ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
356  END_TRACEPOINT(distobj_remote_call_time); \
357  } \
358 
359  /*
360  Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
361  */
362  BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (remote_call, dc_impl::object_call_issue, STANDARD_CALL) )
363  BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (pod_call, dc_impl::object_podcall_issue, STANDARD_CALL) )
364  BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (control_call,dc_impl::object_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
365 
366 
367  oarchive* split_call_begin(void (T::*remote_function)(size_t, wild_pointer)) {
368  return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_begin(this, obj_id, remote_function);
369  }
370 
371  void split_call_end(procid_t target, oarchive* oarc) {
372  inc_calls_sent(target);
373  return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_end(this, oarc, dc_.senders[target],
374  target, STANDARD_CALL);
375  }
376 
377  void split_call_cancel(oarchive* oarc) {
378  return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_cancel(oarc);
379  }
380 
381 
382 
383  #define BROADCAST_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
384  template<typename Iterator, typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
385  void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (Iterator target_begin, Iterator target_end, \
386  F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
387  if (target_begin == target_end) return; \
388  BEGIN_TRACEPOINT(distobj_remote_call_time); \
389  if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) { \
390  Iterator iter = target_begin; \
391  while (iter != target_end){ \
392  inc_calls_sent(*iter); \
393  ++iter; \
394  } \
395  } \
396  BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
397  <Iterator, T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
398  ::exec(this, dc_.senders, BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target_begin, target_end,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
399  END_TRACEPOINT(distobj_remote_call_time); \
400  }
401 
402  BOOST_PP_REPEAT(7, BROADCAST_INTERFACE_GENERATOR, (remote_call, dc_impl::object_broadcast_issue, STANDARD_CALL) )
403  BOOST_PP_REPEAT(7, BROADCAST_INTERFACE_GENERATOR, (pod_call, dc_impl::object_podcall_broadcast_issue, STANDARD_CALL) )
404 
405  /*
406  The generation procedure for requests are the same. The only
407  difference is that the function name has to be changed a little to
408  be identify the return type of the function, (typename
409  dc_impl::function_ret_type<__GLRPC_FRESULT>) and the issuing
410  processor is object_request_issue.
411 
412  The call can be issued with
413  \code
414  ret = rmi.remote_request(target,
415  &object_type::function_name,
416  arg1,
417  arg2...)
418  \endcode
419  */
420  #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
421  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
422  BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
423  ASSERT_LT(target, dc_.senders.size()); \
424  if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
425  return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
426  <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
427  ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) )(); \
428  } \
429 
430 #define FUTURE_REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
431  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
432  BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
433  ASSERT_LT(target, dc_.senders.size()); \
434  if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
435  return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
436  <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
437  ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
438  } \
439 
440 
441  /*
442  Generates the interface functions. 3rd argument is a tuple
443  (interface name, issue name, flags)
444  */
445  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type remote_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY) ) )
446  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
447  BOOST_PP_REPEAT(6, FUTURE_REQUEST_INTERFACE_GENERATOR, (request_future<__GLRPC_FRESULT> future_remote_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY) ) )
448 
449 
450 
451  #undef RPC_INTERFACE_GENERATOR
452  #undef BROADCAST_INTERFACE_GENERATOR
453  #undef REQUEST_INTERFACE_GENERATOR
454  #undef FUTURE_REQUEST_INTERFACE_GENERATOR
455  /* Now generate the interface functions which allow me to call this
456  dc_dist_object directly The internal calls are similar to the ones
457  above. The only difference is that is that instead of 'obj_id', the
458  parameter passed to the issue processor is "control_obj_id" which
459  identifies the current RMI class.
460  */
461  #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
462  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
463  void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
464  ASSERT_LT(target, dc_.senders.size()); \
465  if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
466  BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
467  <dc_dist_object<T>, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
468  ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target,control_obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
469  } \
470 
471  BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (internal_call,dc_impl::object_call_issue, STANDARD_CALL) )
472  BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (internal_control_call,dc_impl::object_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
473 
474 
475  #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
476  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
477  BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
478  ASSERT_LT(target, dc_.senders.size()); \
479  if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
480  return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
481  <dc_dist_object<T>, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
482  ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,control_obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) )(); \
483  } \
484 
485  /*
486  Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
487  */
488  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type internal_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY)) )
489  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type internal_control_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
490 
491 
492  #undef RPC_INTERFACE_GENERATOR
493  #undef REQUEST_INTERFACE_GENERATOR
494  #undef GENARC
495  #undef GENT
496  #undef GENI
497  #undef GENARGS
498 
499  /// \endcond
500 
501 #if DOXYGEN_DOCUMENTATION
502 
503 /**
504  * \brief Performs a non-blocking RPC call to the target machine
505  * to run the provided function pointer.
506  *
507  * remote_call() calls the function "fn" on a target remote machine.
508  * "fn" may be public, private or protected within the owner class; there are
509  * no access restrictions. Provided arguments are serialized and sent to the
510  * target. Therefore, all arguments are necessarily transmitted by value.
511  * If the target function has a return value, the return value is lost.
512  *
513  * remote_call() is non-blocking and does not wait for the target machine
514  * to complete execution of the function. Different remote_calls may be handled
515  * by different threads on the target machine and thus the target function
516  * should be made thread-safe.
517  * Alternatively, see distributed_control::set_sequentialization_key()
518  * to force sequentialization of groups of remote calls.
519  *
520  * If blocking operation is desired, remote_request() may be used.
521  * Alternatively, a full_barrier() may also be used to wait for completion of
522  * all incomplete RPC calls.
523  *
524  * Example:
525  * \code
526  * // A print function is defined in the distributed object
527  * class distributed_obj_example {
528  * graphlab::dc_dist_object<distributed_obj_example> rmi;
529  * ... initialization and constructor ...
530  * private:
531  * void print(std::string s) {
532  * std::cout << s << "\n";
533  * }
534  * public:
535  * void print_on_machine_one(std::string s) {
536  * // calls the print function on machine 1 with the argument "s"
537  * rmi.remote_call(1, &distributed_obj_example::print, s);
538  * }
539  * }
540  * \endcode
541  *
542  * Note the syntax for obtaining a pointer to a member function.
543  *
544  * \param targetmachine The ID of the machine to run the function on
545  * \param fn The function to run on the target machine. Must be a pointer to
546  * member function in the owning object.
547  * \param ... The arguments to send to Fn. Arguments must be serializable.
548  * and must be castable to the target types.
549  */
550  void remote_call(procid_t targetmachine, Fn fn, ...);
551 
552 
553 
554 /**
555  * \brief Performs a non-blocking RPC call to a collection of machines
556  * to run the provided function pointer.
557  *
558  * This function calls the provided function pointer on a collection of
559  * machines contained in the iterator range [begin, end).
560  * Provided arguments are serialized and sent to the target.
561  * Therefore, all arguments are necessarily transmitted by value.
562  * If the target function has a return value, the return value is lost.
563  *
564  * This function is functionally equivalent to:
565  *
566  * \code
567  * while(machine_begin != machine_end) {
568  * remote_call(*machine_begin, fn, ...);
569  * ++machine_begin;
570  * }
571  * \endcode
572  *
573  * However, this function makes some optimizations to ensure all arguments
574  * are only serialized once instead of \#calls times.
575  *
576  * This function is non-blocking and does not wait for the target machines
577  * to complete execution of the function. Different remote_calls may be handled
578  * by different threads on the target machines and thus the target function
579  * should be made thread-safe. Alternatively, see
580  * distributed_control::set_sequentialization_key() to force sequentialization
581  * of groups of remote_calls. A full_barrier()
582  * may also be issued to wait for completion of all RPC calls issued prior
583  * to the full barrier.
584  *
585  * Example:
586  * \code
587  * // A print function is defined in the distributed object
588  * class distributed_obj_example {
589  * graphlab::dc_dist_object<distributed_obj_example> rmi;
590  * ... initialization and constructor ...
591  * private:
592  * void print(std::string s) {
593  * std::cout << s << "\n";
594  * }
595  * public:
596  * void print_on_some_machines(std::string s) {
597  * std::vector<procid_t> procs;
598  * procs.push_back(1); procs.push_back(3); procs.push_back(5);
599  *
600  * // calls the print function on machine 1,3,5 with the argument "s"
601  * rmi.remote_call(procs.begin(), procs.end(),
602  * &distributed_obj_example::print, s);
603  * }
604  * }
605  * \endcode
606  *
607  *
608  * \param machine_begin The beginning of an iterator range containing a list
609  * machines to call. Iterator::value_type must be
610  * castable to procid_t.
611  * \param machine_end The end of an iterator range containing a list
612  * machines to call. Iterator::value_type must be
613  * castable to procid_t.
614  * \param fn The function to run on the target machine. Must be a pointer to
615  * member function in the owning object.
616  * \param ... The arguments to send to Fn. Arguments must be serializable.
617  * and must be castable to the target types.
618  */
619  void remote_call(Iterator machine_begin, Iterator machine_end, Fn fn, ...);
620 
621 
622 /**
623  * \brief Performs a blocking RPC call to the target machine
624  * to run the provided function pointer.
625  *
626  * remote_request() calls the function "fn" on a target remote machine. Provided
627  * arguments are serialized and sent to the target.
628  * Therefore, all arguments are necessarily transmitted by value.
629  * If the target function has a return value, it is sent back to calling
630  * machine.
631  *
632  * Unlike remote_call(), remote_request() is blocking and waits for the target
633  * machine to complete execution of the function. However, different
634  * remote_requests may be still be handled by different threads on the target
635  * machine.
636  *
637  * Example:
638  * \code
639  * // A print function is defined in the distributed object
640  * class distributed_obj_example {
641  * graphlab::dc_dist_object<distributed_obj_example> rmi;
642  * ... initialization and constructor ...
643  * private:
644  * int add_one(int i) {
645  * return i + 1;
646  * }
647  * public:
648  * int add_one_from_machine_1(int i) {
649  * // calls the add_one function on machine 1 with the argument i
650  * return rmi.remote_request(1, &distributed_obj_example::add_one, i);
651  * }
652  * }
653  * \endcode
654  *
655  * \param targetmachine The ID of the machine to run the function on
656  * \param fn The function to run on the target machine. Must be a pointer to
657  * member function in the owning object.
658  * \param ... The arguments to send to Fn. Arguments must be serializable.
659  * and must be castable to the target types.
660  *
661  * \returns Returns the same return type as the function fn
662  */
663  RetVal remote_request(procid_t targetmachine, Fn fn, ...);
664 
665 
666 
667 /**
668  * \brief Performs a nonblocking RPC call to the target machine
669  * to run the provided function pointer which has an expected return value.
670  *
671  * future_remote_request() calls the function "fn" on a target remote machine.
672  * Provided arguments are serialized and sent to the target.
673  * Therefore, all arguments are necessarily transmitted by value.
674  * If the target function has a return value, it is sent back to calling
675  * machine.
676  *
677  * future_remote_request() is like remote_request(), but is non-blocking.
678  * Instead, it returns immediately a \ref graphlab::request_future object
679  * which will allow you wait for the return value.
680  *
681  * Example:
682  * \code
683  * // A print function is defined in the distributed object
684  * class distributed_obj_example {
685  * graphlab::dc_dist_object<distributed_obj_example> rmi;
686  * ... initialization and constructor ...
687  * private:
688  * int add_one(int i) {
689  * return i + 1;
690  * }
691  * public:
692  * int add_one_from_machine_1(int i) {
693  * // calls the add_one function on machine 1 with the argument i
694  * // this call returns immediately
695  * graphlab::request_future<int> future =
696  * rmi.future_remote_request(1, &distributed_obj_example::add_one, i);
697  *
698  * // ... we can do other stuff here
699  * // then when we want the answer
700  * int result = future();
701  * return result;
702  * }
703  * }
704  * \endcode
705  *
706  * \param targetmachine The ID of the machine to run the function on
707  * \param fn The function to run on the target machine. Must be a pointer to
708  * member function in the owning object.
709  * \param ... The arguments to send to Fn. Arguments must be serializable.
710  * and must be castable to the target types.
711  *
712  * \returns Returns the same return type as the function fn
713  */
714  RetVal future_remote_request(procid_t targetmachine, Fn fn, ...);
715 
716 
717 
718 #endif
719 /*****************************************************************************
720  Implementation of matched send_to / recv_from
721  *****************************************************************************/
722 
723 
724  private:
725  std::vector<dc_impl::recv_from_struct> recv_froms;
726 
727  void block_and_wait_for_recv(size_t src,
728  std::string& str,
729  size_t tag) {
730  recv_froms[src].lock.lock();
731  recv_froms[src].data = str;
732  recv_froms[src].tag = tag;
733  recv_froms[src].hasdata = true;
734  recv_froms[src].cond.signal();
735  recv_froms[src].lock.unlock();
736  }
737 
738  public:
739 
740  /**
741  \copydoc distributed_control::send_to()
742  */
743  template <typename U>
744  void send_to(procid_t target, U& t, bool control = false) {
745  std::stringstream strm;
746  oarchive oarc(strm);
747  oarc << t;
748  strm.flush();
749  dc_impl::reply_ret_type rt(REQUEST_WAIT_METHOD);
750  // I shouldn't use a request to block here since
751  // that will take up a thread on the remote side
752  // so I simulate a request here.
753  size_t rtptr = reinterpret_cast<size_t>(&rt);
754  if (control == false) {
755  internal_call(target, &dc_dist_object<T>::block_and_wait_for_recv,
756  procid(), strm.str(), rtptr);
757  }
758  else {
759  internal_control_call(target, &dc_dist_object<T>::block_and_wait_for_recv,
760  procid(), strm.str(), rtptr);
761  }
762  // wait for reply
763  rt.wait();
764 
765  if (control == false) inc_calls_sent(target);
766  }
767 
768 
769  /**
770  \copydoc distributed_control::recv_from()
771  */
772  template <typename U>
773  void recv_from(procid_t source, U& t, bool control = false) {
774  // wait on the condition variable until I have data
775  dc_impl::recv_from_struct &recvstruct = recv_froms[source];
776  recvstruct.lock.lock();
777  while (recvstruct.hasdata == false) {
778  recvstruct.cond.wait(recvstruct.lock);
779  }
780 
781  // got the data. deserialize it
782  std::stringstream strm(recvstruct.data);
783  iarchive iarc(strm);
784  iarc >> t;
785  // clear the data
786  std::string("").swap(recvstruct.data);
787  // remember the tag so we can unlock it before the remote call
788  size_t tag = recvstruct.tag;
789  // clear the has data flag
790  recvstruct.hasdata = false;
791  // unlock
792  recvstruct.lock.unlock();
793  if (control == false) {
794  // remote call to release the sender. Use an empty blob
795  dc_.control_call(source, reply_increment_counter, tag, dc_impl::blob());
796  // I have to increment the calls sent manually here
797  // since the matched send/recv calls do not go through the
798  // typical object calls. It goes through the DC, but I also want to charge
799  // it to this object
800  inc_calls_received(source);
801  }
802  else {
803  dc_.control_call(source, reply_increment_counter, tag, dc_impl::blob());
804  }
805  }
806 
807 
808 
809 /*****************************************************************************
810  Implementation of Broadcast
811  *****************************************************************************/
812 
813 private:
814 
815  std::string broadcast_receive;
816 
817  void set_broadcast_receive(const std::string &s) {
818  broadcast_receive = s;
819  }
820 
821 
822  public:
823 
824  /// \copydoc distributed_control::broadcast()
825  template <typename U>
826  void broadcast(U& data, bool originator, bool control = false) {
827  if (originator) {
828  // construct the data stream
829  std::stringstream strm;
830  oarchive oarc(strm);
831  oarc << data;
832  strm.flush();
833  broadcast_receive = strm.str();
834  if (control == false) {
835  for (size_t i = 0;i < numprocs(); ++i) {
836  if (i != procid()) {
837  internal_request(i,
839  broadcast_receive);
840  }
841  }
842  }
843  else {
844  for (size_t i = 0;i < numprocs(); ++i) {
845  if (i != procid()) {
846  internal_control_request(i,
848  broadcast_receive);
849  }
850  }
851  }
852  }
853 
854  // by the time originator gets here, all machines
855  // will have received the data due to the broadcast_receive
856  // set a barrier here.
857  barrier();
858 
859  // all machines will now deserialize the data
860  if (!originator) {
861  std::stringstream strm(broadcast_receive);
862  iarchive iarc(strm);
863  iarc >> data;
864  }
865  barrier();
866  }
867 
868 
869 /*****************************************************************************
870  Implementation of Gather, all_gather
871  *****************************************************************************/
872 
873  private:
874  std::vector<std::string> gather_receive;
875  atomic<size_t> gatherid;
876 
877  void set_gather_receive(procid_t source, const std::string &s, size_t gid) {
878  while(gatherid.value != gid) sched_yield();
879  gather_receive[source] = s;
880  }
881  public:
882 
883  /// \copydoc distributed_control::gather()
884  template <typename U>
885  void gather(std::vector<U>& data, procid_t sendto, bool control = false) {
886  // if not root
887  if (sendto != procid()) {
888  std::stringstream strm( std::ios::out | std::ios::binary );
889  oarchive oarc(strm);
890  oarc << data[procid()];
891  strm.flush();
892  if (control == false) {
893  internal_request(sendto,
895  procid(),
896  strm.str(),
897  gatherid.value);
898  }
899  else {
900  internal_control_request(sendto,
902  procid(),
903  strm.str(),
904  gatherid.value);
905  }
906  }
907  barrier();
908  if (sendto == procid()) {
909  // if I am the receiver
910  for (procid_t i = 0; i < numprocs(); ++i) {
911  if (i != procid()) {
912  // receiving only from others
913  std::stringstream strm(gather_receive[i],
914  std::ios::in | std::ios::binary);
915  assert(strm.good());
916  iarchive iarc(strm);
917  iarc >> data[i];
918  }
919  }
920  }
921  gatherid.inc();
922  barrier();
923  }
924 
925 /********************************************************************
926  Implementation of all gather
927 *********************************************************************/
928 
929 
930 
931  private:
932  // ------- Sense reversing barrier data ----------
933  /// The next value of the barrier. either +1 or -1
934  int ab_barrier_sense;
935  /// When this flag == the current barrier value. The barrier is complete
936  int ab_barrier_release;
937  /** when barrier sense is 1, barrier clears when
938  * child_barrier_counter == numchild. When barrier sense is -1, barrier
939  * clears when child_barrier_counter == 0;
940  */
941  atomic<int> ab_child_barrier_counter;
942  /// condition variable and mutex protecting the barrier variables
943  conditional ab_barrier_cond;
944  mutex ab_barrier_mut;
945  std::string ab_children_data[BARRIER_BRANCH_FACTOR];
946  std::string ab_alldata;
947 
948  /**
949  The child calls this function in the parent once the child enters the barrier
950  */
951  void __ab_child_to_parent_barrier_trigger(procid_t source, std::string collect) {
952  ab_barrier_mut.lock();
953  // assert childbase <= source <= childbase + BARRIER_BRANCH_FACTOR
954  ASSERT_GE(source, childbase);
955  ASSERT_LT(source, childbase + BARRIER_BRANCH_FACTOR);
956  ab_children_data[source - childbase] = collect;
957  ab_child_barrier_counter.inc(ab_barrier_sense);
958  ab_barrier_cond.signal();
959  ab_barrier_mut.unlock();
960  }
961 
962  /**
963  This is on the downward pass of the barrier. The parent calls this function
964  to release all the children's barriers
965  */
966  void __ab_parent_to_child_barrier_release(int releaseval,
967  std::string allstrings,
968  int use_control_calls) {
969  // send the release downwards
970  // get my largest child
971  logger(LOG_DEBUG, "AB Barrier Release %d", releaseval);
972  ab_alldata = allstrings;
973  for (procid_t i = 0;i < numchild; ++i) {
974  if (use_control_calls) {
975  internal_control_call((procid_t)(childbase + i),
976  &dc_dist_object<T>::__ab_parent_to_child_barrier_release,
977  releaseval,
978  ab_alldata,
979  use_control_calls);
980  }
981  else {
982  internal_call((procid_t)(childbase + i),
983  &dc_dist_object<T>::__ab_parent_to_child_barrier_release,
984  releaseval,
985  ab_alldata,
986  use_control_calls);
987  }
988  }
989  ab_barrier_mut.lock();
990  ab_barrier_release = releaseval;
991  ab_barrier_cond.signal();
992  ab_barrier_mut.unlock();
993  }
994 
995 
996  public:
997 
998  /// \copydoc distributed_control::all_gather()
999  template <typename U>
1000  void all_gather(std::vector<U>& data, bool control = false) {
1001  if (numprocs() == 1) return;
1002  // get the string representation of the data
1003  charstream strm(128);
1004  oarchive oarc(strm);
1005  oarc << data[procid()];
1006  strm.flush();
1007  // upward message
1008  int ab_barrier_val = ab_barrier_sense;
1009  ab_barrier_mut.lock();
1010  // wait for all children to be done
1011  while(1) {
1012  if ((ab_barrier_sense == -1 && ab_child_barrier_counter.value == 0) ||
1013  (ab_barrier_sense == 1 && ab_child_barrier_counter.value == (int)(numchild))) {
1014  // flip the barrier sense
1015  ab_barrier_sense = -ab_barrier_sense;
1016  // call child to parent in parent
1017  ab_barrier_mut.unlock();
1018  if (procid() != 0) {
1019  // collect all my children data
1020  charstream strstrm(128);
1021  oarchive oarc2(strstrm);
1022  oarc2 << std::string(strm->c_str(), strm->size());
1023  for (procid_t i = 0;i < numchild; ++i) {
1024  strstrm.write(ab_children_data[i].c_str(), ab_children_data[i].length());
1025  }
1026  strstrm.flush();
1027  if (control) {
1028  internal_control_call(parent,
1030  procid(),
1031  std::string(strstrm->c_str(), strstrm->size()));
1032  }
1033  else {
1034  internal_call(parent,
1036  procid(),
1037  std::string(strstrm->c_str(), strstrm->size()));
1038  }
1039  }
1040  break;
1041  }
1042  ab_barrier_cond.wait(ab_barrier_mut);
1043  }
1044 
1045 
1046  logger(LOG_DEBUG, "AB barrier phase 1 complete");
1047  // I am root. send the barrier release downwards
1048  if (procid() == 0) {
1049  ab_barrier_release = ab_barrier_val;
1050  // build the downward data
1051  charstream strstrm(128);
1052  oarchive oarc2(strstrm);
1053  oarc2 << std::string(strm->c_str(), strm->size());
1054  for (procid_t i = 0;i < numchild; ++i) {
1055  strstrm.write(ab_children_data[i].c_str(), ab_children_data[i].length());
1056  }
1057  strstrm.flush();
1058  ab_alldata = std::string(strstrm->c_str(), strstrm->size());
1059  for (procid_t i = 0;i < numchild; ++i) {
1060  logger(LOG_DEBUG, "Sending AB release to %d", childbase + i);
1061  internal_control_call((procid_t)(childbase + i),
1063  ab_barrier_val,
1064  ab_alldata,
1065  (int)control);
1066 
1067  }
1068  }
1069  // wait for the downward message releasing the barrier
1070  logger(LOG_DEBUG, "AB barrier waiting for %d", ab_barrier_val);
1071  ab_barrier_mut.lock();
1072  while(1) {
1073  if (ab_barrier_release == ab_barrier_val) break;
1074  ab_barrier_cond.wait(ab_barrier_mut);
1075  }
1076  // read the collected data and release the lock
1077  std::string local_ab_alldata = ab_alldata;
1078  ab_barrier_mut.unlock();
1079 
1080  logger(LOG_DEBUG, "barrier phase 2 complete");
1081  // now the data is a DFS search of a heap
1082  // I need to unpack it
1083  size_t heappos = 0;
1084  std::stringstream istrm(local_ab_alldata);
1085  iarchive iarc(istrm);
1086 
1087  for (size_t i = 0;i < numprocs(); ++i) {
1088  std::string s;
1089  iarc >> s;
1090 
1091  std::stringstream strm2(s);
1092  iarchive iarc2(strm2);
1093  iarc2 >> data[heappos];
1094 
1095  if (i + 1 == numprocs()) break;
1096  // advance heappos
1097  // leftbranch
1098  bool lefttraverseblock = false;
1099  while (1) {
1100  // can we continue going deaper down the left?
1101  size_t leftbranch = heappos * BARRIER_BRANCH_FACTOR + 1;
1102  if (lefttraverseblock == false && leftbranch < numprocs()) {
1103  heappos = leftbranch;
1104  break;
1105  }
1106  // ok. can't go down the left
1107  bool this_is_a_right_branch = (((heappos - 1) % BARRIER_BRANCH_FACTOR) == BARRIER_BRANCH_FACTOR - 1);
1108  // if we are a left branch, go to sibling
1109  if (this_is_a_right_branch == false) {
1110  size_t sibling = heappos + 1;
1111  if (sibling < numprocs()) {
1112  heappos = sibling;
1113  break;
1114  }
1115  }
1116 
1117  // we have finished this subtree, go back up to parent
1118  // and block the depth traversal on the next round
1119  // unless heappos is 0
1120 
1121  heappos = (heappos - 1) / BARRIER_BRANCH_FACTOR;
1122  lefttraverseblock = true;
1123  continue;
1124  // go to sibling
1125  }
1126 
1127  }
1128  }
1129 
1130  /// \copydoc distributed_control::all_reduce2()
1131  template <typename U, typename PlusEqual>
1132  void all_reduce2(U& data, PlusEqual plusequal, bool control = false) {
1133  if (numprocs() == 1) return;
1134  // get the string representation of the data
1135  /* charstream strm(128);
1136  oarchive oarc(strm);
1137  oarc << data;
1138  strm.flush();*/
1139  // upward message
1140  int ab_barrier_val = ab_barrier_sense;
1141  ab_barrier_mut.lock();
1142  // wait for all children to be done
1143  while(1) {
1144  if ((ab_barrier_sense == -1 && ab_child_barrier_counter.value == 0) ||
1145  (ab_barrier_sense == 1 && ab_child_barrier_counter.value == (int)(numchild))) {
1146  // flip the barrier sense
1147  ab_barrier_sense = -ab_barrier_sense;
1148  // call child to parent in parent
1149  ab_barrier_mut.unlock();
1150  if (procid() != 0) {
1151  // accumulate my children data
1152  for (procid_t i = 0;i < numchild; ++i) {
1153  std::stringstream istrm(ab_children_data[i]);
1154  iarchive iarc(istrm);
1155  U tmp;
1156  iarc >> tmp;
1157  plusequal(data, tmp);
1158  }
1159  // upward message
1160  charstream ostrm(128);
1161  oarchive oarc(ostrm);
1162  oarc << data;
1163  ostrm.flush();
1164  if (control) {
1165  internal_control_call(parent,
1167  procid(),
1168  std::string(ostrm->c_str(), ostrm->size()));
1169  }
1170  else {
1171  internal_call(parent,
1173  procid(),
1174  std::string(ostrm->c_str(), ostrm->size()));
1175  }
1176  }
1177  break;
1178  }
1179  ab_barrier_cond.wait(ab_barrier_mut);
1180  }
1181 
1182 
1183  logger(LOG_DEBUG, "AB barrier phase 1 complete");
1184  // I am root. send the barrier release downwards
1185  if (procid() == 0) {
1186  ab_barrier_release = ab_barrier_val;
1187  for (procid_t i = 0;i < numchild; ++i) {
1188  std::stringstream istrm(ab_children_data[i]);
1189  iarchive iarc(istrm);
1190  U tmp;
1191  iarc >> tmp;
1192  plusequal(data, tmp);
1193  }
1194  // build the downward data
1195  charstream ostrm(128);
1196  oarchive oarc(ostrm);
1197  oarc << data;
1198  ostrm.flush();
1199  ab_alldata = std::string(ostrm->c_str(), ostrm->size());
1200  for (procid_t i = 0;i < numchild; ++i) {
1201  internal_control_call((procid_t)(childbase + i),
1203  ab_barrier_val,
1204  ab_alldata,
1205  (int)control);
1206 
1207  }
1208  }
1209  // wait for the downward message releasing the barrier
1210  logger(LOG_DEBUG, "AB barrier waiting for %d", ab_barrier_val);
1211  ab_barrier_mut.lock();
1212  while(1) {
1213  if (ab_barrier_release == ab_barrier_val) break;
1214  ab_barrier_cond.wait(ab_barrier_mut);
1215  }
1216 
1217  if (procid() != 0) {
1218  // read the collected data and release the lock
1219  std::string local_ab_alldata = ab_alldata;
1220  ab_barrier_mut.unlock();
1221 
1222  logger(LOG_DEBUG, "barrier phase 2 complete");
1223 
1224  std::stringstream istrm(local_ab_alldata);
1225  iarchive iarc(istrm);
1226  iarc >> data;
1227  }
1228  else {
1229  ab_barrier_mut.unlock();
1230  }
1231  }
1232 
1233 
1234  template <typename U>
1235  struct default_plus_equal {
1236  void operator()(U& u, const U& v) {
1237  u += v;
1238  }
1239  };
1240 
1241  /// \copydoc distributed_control::all_reduce()
1242  template <typename U>
1243  void all_reduce(U& data, bool control = false) {
1244  all_reduce2(data, default_plus_equal<U>(), control);
1245  }
1246 
1247 ////////////////////////////////////////////////////////////////////////////
1248 
1249 
1250 /*****************************************************************************
1251  Implementation of All Scatter
1252  *****************************************************************************/
1253 
1254  template <typename U>
1255  void all_to_all(std::vector<U>& data, bool control = false) {
1256  ASSERT_EQ(data.size(), numprocs());
1257  for (size_t i = 0;i < data.size(); ++i) {
1258  if (i != procid()) {
1259  std::stringstream strm( std::ios::out | std::ios::binary );
1260  oarchive oarc(strm);
1261  oarc << data[i];
1262  strm.flush();
1263  if (control == false) {
1264  internal_call(i,
1266  procid(),
1267  strm.str(),
1268  gatherid.value);
1269  }
1270  else {
1271  internal_control_call(i,
1272  &dc_dist_object<T>::set_gather_receive,
1273  procid(),
1274  strm.str(),
1275  gatherid.value);
1276  }
1277  }
1278  }
1279  full_barrier();
1280  for (size_t i = 0; i < data.size(); ++i) {
1281  if (i != procid()) {
1282  std::stringstream strm(gather_receive[i],
1283  std::ios::in | std::ios::binary);
1284  assert(strm.good());
1285  iarchive iarc(strm);
1286  iarc >> data[i];
1287  }
1288  }
1289  gatherid.inc();
1290  barrier();
1291  }
1292 
1293 
1294 /*****************************************************************************
1295  Implementation of Barrier
1296  *****************************************************************************/
1297 
1298 
1299 
1300  private:
1301  // ------- Sense reversing barrier data ----------
1302  /// The next value of the barrier. either +1 or -1
1303  int barrier_sense;
1304  /// When this flag == the current barrier value. The barrier is complete
1305  int barrier_release;
1306  /** when barrier sense is 1, barrier clears when
1307  * child_barrier_counter == numchild. When barrier sense is -1, barrier
1308  * clears when child_barrier_counter == 0;
1309  */
1310  atomic<int> child_barrier_counter;
1311  /// condition variable and mutex protecting the barrier variables
1312  conditional barrier_cond;
1313  mutex barrier_mut;
1314  procid_t parent; /// parent node
1315  size_t childbase; /// id of my first child
1316  procid_t numchild; /// number of children
1317 
1318 
1319 
1320 
1321  /**
1322  The child calls this function in the parent once the child enters the barrier
1323  */
1324  void __child_to_parent_barrier_trigger(procid_t source) {
1325  barrier_mut.lock();
1326  // assert childbase <= source <= childbase + BARRIER_BRANCH_FACTOR
1327  ASSERT_GE(source, childbase);
1328  ASSERT_LT(source, childbase + BARRIER_BRANCH_FACTOR);
1329  child_barrier_counter.inc(barrier_sense);
1330  barrier_cond.signal();
1331  barrier_mut.unlock();
1332  }
1333 
1334  /**
1335  This is on the downward pass of the barrier. The parent calls this function
1336  to release all the children's barriers
1337  */
1338  void __parent_to_child_barrier_release(int releaseval) {
1339  // send the release downwards
1340  // get my largest child
1341  logger(LOG_DEBUG, "Barrier Release %d", releaseval);
1342  for (procid_t i = 0;i < numchild; ++i) {
1343  internal_control_call((procid_t)(childbase + i),
1344  &dc_dist_object<T>::__parent_to_child_barrier_release,
1345  releaseval);
1346 
1347  }
1348  barrier_mut.lock();
1349  barrier_release = releaseval;
1350  barrier_cond.signal();
1351  barrier_mut.unlock();
1352  }
1353 
1354 
1355  public:
1356 
1357  /// \copydoc distributed_control::barrier()
1358  void barrier() {
1359  // upward message
1360  int barrier_val = barrier_sense;
1361  barrier_mut.lock();
1362  // wait for all children to be done
1363  while(1) {
1364  if ((barrier_sense == -1 && child_barrier_counter.value == 0) ||
1365  (barrier_sense == 1 && child_barrier_counter.value == (int)(numchild))) {
1366  // flip the barrier sense
1367  barrier_sense = -barrier_sense;
1368  // call child to parent in parent
1369  barrier_mut.unlock();
1370  if (procid() != 0) {
1371  internal_control_call(parent,
1373  procid());
1374  }
1375  break;
1376  }
1377  barrier_cond.wait(barrier_mut);
1378  }
1379 
1380 
1381  logger(LOG_DEBUG, "barrier phase 1 complete");
1382  // I am root. send the barrier release downwards
1383  if (procid() == 0) {
1384  barrier_release = barrier_val;
1385 
1386  for (procid_t i = 0;i < numchild; ++i) {
1387  internal_control_call((procid_t)(childbase + i),
1389  barrier_val);
1390 
1391  }
1392  }
1393  // wait for the downward message releasing the barrier
1394  logger(LOG_DEBUG, "barrier waiting for %d", barrier_val);
1395  barrier_mut.lock();
1396  while(1) {
1397  if (barrier_release == barrier_val) break;
1398  barrier_cond.wait(barrier_mut);
1399  }
1400  barrier_mut.unlock();
1401 
1402  logger(LOG_DEBUG, "barrier phase 2 complete");
1403  }
1404 
1405 
1406  /*****************************************************************************
1407  Implementation of Full Barrier
1408 *****************************************************************************/
1409  private:
1410  mutex full_barrier_lock;
1411  conditional full_barrier_cond;
1412  std::vector<size_t> calls_to_receive;
1413  // used to inform the counter that the full barrier
1414  // is in effect and all modifications to the calls_recv
1415  // counter will need to lock and signal
1416  volatile bool full_barrier_in_effect;
1417 
1418  /** number of 'source' processor counts which have
1419  not achieved the right recv count */
1420  atomic<size_t> num_proc_recvs_incomplete;
1421 
1422  /// Marked as 1 if the proc is complete
1423  dense_bitset procs_complete;
1424 
1425  public:
1426 
1427  /// \copydoc distributed_control::full_barrier()
1428  void full_barrier() {
1429  // gather a sum of all the calls issued to machine 0
1430  std::vector<size_t> calls_sent_to_target(numprocs(), 0);
1431  for (size_t i = 0;i < numprocs(); ++i) {
1432  calls_sent_to_target[i] = callssent[i].value;
1433  }
1434 
1435  // tell node 0 how many calls there are
1436  std::vector<std::vector<size_t> > all_calls_sent(numprocs());
1437  all_calls_sent[procid()] = calls_sent_to_target;
1438  all_gather(all_calls_sent, true);
1439 
1440  // get the number of calls I am supposed to receive from each machine
1441  calls_to_receive.clear(); calls_to_receive.resize(numprocs(), 0);
1442  for (size_t i = 0;i < numprocs(); ++i) {
1443  calls_to_receive[i] += all_calls_sent[i][procid()];
1444 // std::cout << "Expecting " << calls_to_receive[i] << " calls from " << i << std::endl;
1445  }
1446  // clear the counters
1447  num_proc_recvs_incomplete.value = numprocs();
1448  procs_complete.clear();
1449  // activate the full barrier
1450  full_barrier_in_effect = true;
1451  __asm("mfence");
1452  // begin one pass to set all which are already completed
1453  for (size_t i = 0;i < numprocs(); ++i) {
1454  if (callsreceived[i].value >= calls_to_receive[i]) {
1455  if (procs_complete.set_bit(i) == false) {
1456  num_proc_recvs_incomplete.dec();
1457  }
1458  }
1459  }
1460 
1461  full_barrier_lock.lock();
1462  while (num_proc_recvs_incomplete.value > 0) full_barrier_cond.wait(full_barrier_lock);
1463  full_barrier_lock.unlock();
1464  full_barrier_in_effect = false;
1465 // for (size_t i = 0; i < numprocs(); ++i) {
1466 // std::cout << "Received " << global_calls_received[i].value << " from " << i << std::endl;
1467 // }
1468  barrier();
1469  }
1470 
1471  /* -------------------- Implementation of Gather Statistics -----------------*/
1472  private:
1473  struct collected_statistics {
1474  size_t callssent;
1475  size_t bytessent;
1476  collected_statistics(): callssent(0), bytessent(0) { }
1477  void save(oarchive &oarc) const {
1478  oarc << callssent << bytessent;
1479  }
1480  void load(iarchive &iarc) {
1481  iarc >> callssent >> bytessent;
1482  }
1483  };
1484  public:
1485  /** Gather RPC statistics. All machines must call
1486  this function at the same time. However, only proc 0 will
1487  return values */
1488  std::map<std::string, size_t> gather_statistics() {
1489  std::map<std::string, size_t> ret;
1490 
1491  std::vector<collected_statistics> stats(numprocs());
1492  stats[procid()].callssent = calls_sent();
1493  stats[procid()].bytessent = bytes_sent();
1494  logstream(LOG_INFO) << procid() << ": calls_sent: ";
1495  for (size_t i = 0;i < numprocs(); ++i) {
1496  logstream(LOG_INFO) << callssent[i].value << ", ";
1497  }
1498  logstream(LOG_INFO) << std::endl;
1499  logstream(LOG_INFO) << procid() << ": calls_recv: ";
1500  for (size_t i = 0;i < numprocs(); ++i) {
1501  logstream(LOG_INFO) << callsreceived[i].value << ", ";
1502  }
1503  logstream(LOG_INFO) << std::endl;
1504 
1505 
1506  gather(stats, 0, true);
1507  if (procid() == 0) {
1508  collected_statistics cs;
1509  for (size_t i = 0;i < numprocs(); ++i) {
1510  cs.callssent += stats[i].callssent;
1511  cs.bytessent += stats[i].bytessent;
1512  }
1513  ret["total_calls_sent"] = cs.callssent;
1514  ret["total_bytes_sent"] = cs.bytessent;
1515  }
1516  return ret;
1517  }
1518 };
1519 
1520 #include <graphlab/macros_undef.hpp>
1521 #include <graphlab/rpc/mem_function_arg_types_undef.hpp>
1522 #undef BARRIER_BRANCH_FACTOR
1523 }// namespace graphlab
1524 #endif
1525