GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_DC_HPP
25 #define GRAPHLAB_DC_HPP
26 #include <iostream>
27 #include <boost/iostreams/stream.hpp>
28 #include <boost/function.hpp>
29 #include <graphlab/parallel/pthread_tools.hpp>
30 #include <graphlab/parallel/thread_pool.hpp>
31 #include <graphlab/util/resizing_array_sink.hpp>
32 #include <graphlab/util/blocking_queue.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/serialization/serialization_includes.hpp>
35 
36 #include <graphlab/rpc/dc_types.hpp>
37 #include <graphlab/rpc/dc_internal_types.hpp>
38 
39 #include <graphlab/rpc/dc_receive.hpp>
40 #include <graphlab/rpc/dc_send.hpp>
41 #include <graphlab/rpc/dc_comm_base.hpp>
42 #include <graphlab/rpc/dc_dist_object_base.hpp>
43 
44 #include <graphlab/rpc/is_rpc_call.hpp>
45 #include <graphlab/rpc/function_call_issue.hpp>
46 #include <graphlab/rpc/function_broadcast_issue.hpp>
47 #include <graphlab/rpc/request_issue.hpp>
48 #include <graphlab/rpc/reply_increment_counter.hpp>
49 #include <graphlab/rpc/function_ret_type.hpp>
50 #include <graphlab/rpc/dc_compile_parameters.hpp>
51 #include <graphlab/util/tracepoint.hpp>
52 #include <graphlab/rpc/distributed_event_log.hpp>
53 #include <boost/preprocessor.hpp>
54 #include <graphlab/rpc/function_arg_types_def.hpp>
55 
56 
57 namespace graphlab {
58 
59 
60 /**
61  * \ingroup rpc
62  * \brief Distributed control constructor parameters.
63  *
64  * Provides the communication layer with a list of ip addresses and
65  * port numbers which enumerate all the machines to establish connections
66  * with.
67  *
68  * You should not need to this. The default constructor in
69  * graphlab::distributed_control does it for you.
70  * See \ref RPC for usage details.
71  */
73  /** A vector containing a list of hostnames/ipaddresses and port numbers
74  * of all machines participating in this RPC program.
75  * for instance:
76  * \code
77  * machines.push_back("127.0.0.1:10000");
78  * machines.push_back("127.0.0.1:10001");
79  * \endcode
80  */
81  std::vector<std::string> machines;
82 
83  /** Additional construction options of the form
84  "key1=value1,key2=value2".
85 
86  There are no available options at this time.
87 
88  Internal options which should not be used
89  \li \b __socket__=NUMBER Forces TCP comm to use this socket number for its
90  listening socket instead of creating a new one.
91  The socket must already be bound to the listening
92  port.
93  */
94  std::string initstring;
95 
96  /** The index of this machine into the machines vector */
98  /** Number of background RPC handling threads to create */
100  /** The communication method. */
102 
103  /**
104  * Constructs a dc_init_param object.
105  * \param numhandlerthreads Optional Argument. The number of handler
106  * threads to create. Defaults to
107  * \ref RPC_DEFAULT_NUMHANDLERTHREADS
108  * \param commtype The Communication type. The only accepted value now is
109  * TCP_COMM
110  */
114  commtype(commtype) {
115  }
116 };
117 
118 
119 
120 // forward declarations
121 class dc_services;
122 
123 namespace dc_impl {
124  class dc_buffered_stream_send2;
125  class dc_stream_receive;
126 }
127 
128 /**
129  * \ingroup rpc
130  * \brief The distributed control object is primary means of communication
131  * between the distributed GraphLab processes.
132  *
133  * The distributed_control object provides asynchronous, multi-threaded
134  * Remote Procedure Call (RPC) services to allow distributed GraphLab
135  * processes to communicate with each other. Currently, the only
136  * communication method implemented is TCP/IP.
137  * There are several ways of setting up the communication layer, but the most
138  * reliable, and the preferred method, is to "bootstrap" using MPI. See your
139  * local MPI documentation for details on how to launch MPI jobs.
140  *
141  * To construct a distributed_control object, the simplest method is to just
142  * invoke the default constructor.
143  *
144  * \code
145  * // initialize MPI
146  * mpi_tools::init(argc, argv);
147  * // construct distributed control object
148  * graphlab::distributed_control dc;
149  * \endcode
150  *
151  * After which all distributed control services will operate correctly.
152  *
153  * Each process is assigned a sequential process ID at starting at 0.
154  * i.e. The first process will have a process ID of 0, the second process
155  * will have an ID of 1, etc. distributed_control::procid() can be used to
156  * obtain the current machine's process ID, and distributed_control::numprocs()
157  * can be used to obtain the total number of processes.
158  *
159  * The primary functions used to communicate between processes are
160  * distributed_control::remote_call() and
161  * distributed_control::remote_request(). These functions are thread-safe and
162  * can be called very rapidly as they only write into a local buffer.
163  * Communication is handled by a background thread. On the remote side,
164  * RPC calls are handled in parallel by a thread pool, and thus may be
165  * parallelized arbitrarily. Operations such as
166  * distributed_control::full_barrier(), or the sequentialization key
167  * can be used to get finer grained control over order of execution on the
168  * remote machine.
169  *
170  * A few other additional helper functions are also provided to support
171  * "synchronous" modes of communication. These functions are not thread-safe
172  * and can only be called on one thread per machine. These functions block
173  * until all machines call the same function. For instance, if gather() is
174  * called on one machine, it will not return until all machines call gather().
175  *
176  * \li distributed_control::barrier()
177  * \li distributed_control::full_barrier()
178  * \li distributed_control::broadcast()
179  * \li distributed_control::all_reduce()
180  * \li distributed_control::all_reduce2()
181  * \li distributed_control::gather()
182  * \li distributed_control::all_gather()
183  *
184  * \note These synchronous operations are modeled after some MPI collective
185  * operations. However, these operations here are not particularly optimized
186  * and will generally be slower than their MPI counterparts. However, the
187  * implementations here are much easier to use, relying extensively on
188  * serialization to simplify communication.
189  *
190  * To support Object Oriented Programming like methodologies, we allow the
191  * creation of <b>Distributed Objects</b> through graphlab::dc_dist_object.
192  * dc_dist_object allows a class to construct its own local copy of
193  * a distributed_control object allowing instances of the class to communicate
194  * with each other across the network.
195  *
196  * See \ref RPC for usage examples.
197  */
199  public:
200  /** \internal
201  * Each element of the function call queue is a data/len pair */
202  struct function_call_block{
203  function_call_block() {}
204 
205  function_call_block(char* data, size_t len,
206  unsigned char packet_mask):
207  data(data), len(len), packet_mask(packet_mask){}
208 
209  char* data;
210  size_t len;
211  unsigned char packet_mask;
212  };
213  private:
214  /// initialize receiver threads. private form of the constructor
215  void init(const std::vector<std::string> &machines,
216  const std::string &initstring,
217  procid_t curmachineid,
218  size_t numhandlerthreads,
220 
221  /// a pointer to the communications subsystem
222  dc_impl::dc_comm_base* comm;
223 
224  /// senders and receivers to all machines
225  std::vector<dc_impl::dc_receive*> receivers;
226  std::vector<dc_impl::dc_send*> senders;
227 
228  /// A thread group of function call handlers
229  thread_pool fcallhandlers;
230  std::vector<atomic<size_t> > fcall_handler_active;
231  std::vector<mutex> fcall_handler_blockers;
232 
233  struct fcallqueue_entry {
234  std::vector<function_call_block> calls;
235  char* chunk_src;
236  size_t chunk_len;
237  atomic<size_t>* chunk_ref_counter;
238  procid_t source;
239  bool is_chunk;
240  };
241  /// a queue of functions to be executed
242  std::vector<blocking_queue<fcallqueue_entry*> > fcallqueue;
243  // number of blocks waiting to be deserialized + the number of
244  // incomplete function calls
245  atomic<size_t> fcallqueue_length;
246 
247  /// object registrations;
248  std::vector<void*> registered_objects;
249  std::vector<dc_impl::dc_dist_object_base*> registered_rmi_instance;
250 
251  /// For convenience, we provide a instance of dc_services
252  dc_services* distributed_services;
253 
254  /// ID of the local machine
255  procid_t localprocid;
256 
257 
258  /// Number of machines
259  procid_t localnumprocs;
260 
261  std::vector<atomic<size_t> > global_calls_sent;
262  std::vector<atomic<size_t> > global_calls_received;
263 
264  std::vector<atomic<size_t> > global_bytes_received;
265 
266  std::vector<boost::function<void(void)> > deletion_callbacks;
267 
268  template <typename T> friend class dc_dist_object;
269  friend class dc_impl::dc_stream_receive;
270  friend class dc_impl::dc_buffered_stream_send2;
271 
272  /// disable the operator= by placing it in private
273  distributed_control& operator=(const distributed_control& dc) { return *this; }
274 
275 
276  std::map<std::string, std::string> parse_options(std::string initstring);
277 
278  volatile inline size_t num_registered_objects() {
279  return registered_objects.size();
280  }
281 
282 
283 
284  DECLARE_TRACER(dc_receive_queuing);
285  DECLARE_TRACER(dc_receive_multiplexing);
286  DECLARE_TRACER(dc_call_dispatch);
287 
288  DECLARE_EVENT(EVENT_NETWORK_BYTES);
289  DECLARE_EVENT(EVENT_RPC_CALLS);
290  public:
291 
292  /**
293  * Default constructor. Automatically tries to read the initialization
294  * from environment variables, or from MPI (if MPI is initialized).
295  */
297 
298  /**
299  * Passes custom constructed initialization parameters in
300  * \ref dc_init_param
301  *
302  * Though dc_init_param can be obtained from environment variables using
303  * dc_init_from_env() or from MPI using dc_init_from_mpi(),
304  * using the default constructor is prefered.
305  */
306  explicit distributed_control(dc_init_param initparam);
307 
309 
310  /// returns the id of the current process
311  inline procid_t procid() const {
312  return localprocid;
313  }
314 
315  /// returns the number of processes in total.
316  inline procid_t numprocs() const {
317  return localnumprocs;
318  }
319 
320  /**
321  * \brief Registers a callback which will be called on deletion of the
322  * distributed_control object.
323  *
324  * This function is useful for distributed static variables which may
325  * be only be deleted after main().
326  */
327  void register_deletion_callback(boost::function<void(void)> deleter) {
328  deletion_callbacks.push_back(deleter);
329  }
330 
331  /**
332  \brief Sets the sequentialization key to a new value, returning the
333  previous value.
334 
335  All RPC calls made using the same key value (as long as the key is non-zero)
336  will sequentialize. RPC calls made while the key value is 0 can be
337  run in parallel in arbitrary order.
338 
339  \code
340  oldval = distributed_control::set_sequentialization_key(new_key);
341  // ...
342  // ... do stuff
343  // ...
344  set_sequentialization_key(oldval);
345  \endcode
346 
347  The key value is <b>thread-local</b> thus setting the key value in
348  one thread does not affect the key value in another thread.
349  */
350  static unsigned char set_sequentialization_key(unsigned char newkey);
351 
352  /**
353  \brief Creates a new sequentialization key, returning the old value.
354 
355  All RPC calls made using the same key value (as long as the key is non-zero)
356  will sequentialize. RPC calls made while the key value is 0 can be run in
357  parallel in arbitrary order. However, since new_sequentialization_key() uses
358  a very naive key selection system, we recommend the use of
359  set_sequentialization_key().
360 
361  User should
362  \code
363  oldval = distributed_control::new_sequentialization_key();
364  // ...
365  // ... do stuff
366  // ...
367  set_sequentialization_key(oldval);
368  \endcode
369 
370  The key value is <b>thread-local</b> thus setting the key value in
371  one thread does not affect the key value in another thread.
372  */
373  static unsigned char new_sequentialization_key();
374 
375  /** \brief gets the current sequentialization key. This function is not
376  * generally useful.
377  */
378  static unsigned char get_sequentialization_key();
379 
380  /*
381  * The key RPC communication functions are all macro generated
382  * and doxygen does not like them so much.
383  * Here, we will block all of them out
384  * and have another set of "fake" functions later on which are wrapped
385  * with a #if 0 so C++ will ignore them.
386  */
387 
388  /// \cond GRAPHLAB_INTERNAL
389 
390 
391 
392  /*
393  This generates the interface functions for the standard calls, basic calls
394  The generated code looks like this:
395 
396  template<typename F , typename T0> void remote_call (procid_t target, F remote_function , const T0 &i0 )
397  {
398  ASSERT_LT(target, senders.size());
399  dc_impl::remote_call_issue1 <F , T0> ::exec(senders[target],
400  STANDARD_CALL,
401  target,
402  remote_function ,
403  i0 );
404  }
405  The arguments passed to the RPC_INTERFACE_GENERATOR ARE: (interface name, issue processor name, flags)
406 
407  */
408  #define GENARGS(Z,N,_) BOOST_PP_CAT(const T, N) BOOST_PP_CAT(&i, N)
409  #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
410  #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
411  #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
412 
413  #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
414  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
415  void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
416  ASSERT_LT(target, senders.size()); \
417  BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
418  <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
419  ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
420  } \
421 
422  /*
423  Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
424  */
425  BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (remote_call, dc_impl::remote_call_issue, STANDARD_CALL) )
426  BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (reply_remote_call,dc_impl::remote_call_issue, STANDARD_CALL | REPLY_PACKET) )
427  BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (control_call, dc_impl::remote_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
428 
429 
430 #define BROADCAST_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
431  template<typename Iterator, typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
432  void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (Iterator target_begin, Iterator target_end, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
433  if (target_begin == target_end) return; \
434  BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
435  <Iterator, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
436  ::exec(senders, BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target_begin, target_end, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
437  } \
438 
439  BOOST_PP_REPEAT(6, BROADCAST_INTERFACE_GENERATOR, (remote_call, dc_impl::remote_broadcast_issue, STANDARD_CALL) )
440 
441  #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
442  template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
443  BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
444  ASSERT_LT(target, senders.size()); \
445  return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
446  <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
447  ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
448  } \
449 
450  /*
451  Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
452  */
453  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type remote_request, dc_impl::remote_request_issue, STANDARD_CALL | WAIT_FOR_REPLY) )
454  BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::remote_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
455 
456 
457 
458  #undef RPC_INTERFACE_GENERATOR
459  #undef BROADCAST_INTERFACE_GENERATOR
460  #undef REQUEST_INTERFACE_GENERATOR
461  #undef GENARC
462  #undef GENT
463  #undef GENI
464  #undef GENARGS
465  /// \endcond
466 
467 /*************************************************************************
468  * Here begins the Doxygen fake functions block *
469  *************************************************************************/
470 
471 #if DOXYGEN_DOCUMENTATION
472 
473 /**
474  * \brief Performs a non-blocking RPC call to the target machine
475  * to run the provided function pointer.
476  *
477  * remote_call() calls the function "fn" on a target remote machine. Provided
478  * arguments are serialized and sent to the target.
479  * Therefore, all arguments are necessarily transmitted by value.
480  * If the target function has a return value, the return value is lost.
481  *
482  * remote_call() is non-blocking and does not wait for the target machine
483  * to complete execution of the function. Different remote_calls may be handled
484  * by different threads on the target machine and thus the target function
485  * should be made thread-safe.
486  * Alternatively, see set_sequentialization_key()
487  * to force sequentialization of groups of remote calls.
488  *
489  * If blocking operation is desired, remote_request() may be used.
490  * Alternatively, a full_barrier() may also be used to wait for completion of
491  * all incomplete RPC calls.
492  *
493  * Example:
494  * \code
495  * // A print function is defined
496  * void print(std::string s) {
497  * std::cout << s << "\n";
498  * }
499  *
500  * ... ...
501  * // call the print function on machine 1 to print "hello"
502  * dc.remote_call(1, print, "hello");
503  * \endcode
504  *
505  *
506  *
507  * \param targetmachine The ID of the machine to run the function on
508  * \param fn The function to run on the target machine
509  * \param ... The arguments to send to Fn. Arguments must be serializable.
510  * and must be castable to the target types.
511  */
512  void remote_call(procid_t targetmachine, Fn fn, ...);
513 
514 
515 
516 /**
517  * \brief Performs a non-blocking RPC call to a collection of machines
518  * to run the provided function pointer.
519  *
520  * This function calls the provided function pointer on a collection of
521  * machines contained in the iterator range [begin, end).
522  * Provided arguments are serialized and sent to the target.
523  * Therefore, all arguments are necessarily transmitted by value.
524  * If the target function has a return value, the return value is lost.
525  *
526  * This function is functionally equivalent to:
527  *
528  * \code
529  * while(machine_begin != machine_end) {
530  * remote_call(*machine_begin, fn, ...);
531  * ++machine_begin;
532  * }
533  * \endcode
534  *
535  * However, this function makes some optimizations to ensure all arguments
536  * are only serialized once instead of \#calls times.
537  *
538  * This function is non-blocking and does not wait for the target machines
539  * to complete execution of the function. Different remote_calls may be handled
540  * by different threads on the target machines and thus the target function
541  * should be made thread-safe. Alternatively, see set_sequentialization_key()
542  * to force sequentialization of groups of remote_calls. A full_barrier()
543  * may also be issued to wait for completion of all RPC calls issued prior
544  * to the full barrier.
545  *
546  * Example:
547  * \code
548  * // A print function is defined
549  * void print(std::string s) {
550  * std::cout << s << "\n";
551  * }
552  *
553  * ... ...
554  * // call the print function on machine 1, 3 and 5 to print "hello"
555  * std::vector<procid_t> procs;
556  * procs.push_back(1); procs.push_back(3); procs.push_back(5);
557  * dc.remote_call(procs.begin(), procs.end(), print, "hello");
558  * \endcode
559  *
560  *
561  * \param machine_begin The beginning of an iterator range containing a list
562  * machines to call. Iterator::value_type must be
563  * castable to procid_t.
564  * \param machine_end The end of an iterator range containing a list
565  * machines to call. Iterator::value_type must be
566  * castable to procid_t.
567  * \param fn The function to run on the target machine
568  * \param ... The arguments to send to Fn. Arguments must be serializable.
569  * and must be castable to the target types.
570  */
571  void remote_call(Iterator machine_begin, Iterator machine_end, Fn fn, ...);
572 
573 
574 /**
575  * \brief Performs a blocking RPC call to the target machine
576  * to run the provided function pointer.
577  *
578  * remote_request() calls the function "fn" on a target remote machine. Provided
579  * arguments are serialized and sent to the target.
580  * Therefore, all arguments are necessarily transmitted by value.
581  * If the target function has a return value, it is sent back to calling
582  * machine.
583  *
584  * Unlike remote_call(), remote_request() is blocking and waits for the target
585  * machine to complete execution of the function. However, different
586  * remote_requests may be still be handled by different threads on the target
587  * machine.
588  *
589  * Example:
590  * \code
591  * // A print function is defined
592  * int add_one(int i) {
593  * return i + 1;
594  * }
595  *
596  * ... ...
597  * // call the add_one function on machine 1
598  * int i = 10;
599  * i = dc.remote_request(1, add_one, i);
600  * // i will now be 11
601  * \endcode
602  *
603  * \param targetmachine The ID of the machine to run the function on
604  * \param fn The function to run on the target machine
605  * \param ... The arguments to send to Fn. Arguments must be serializable.
606  * and must be castable to the target types.
607  *
608  * \returns Returns the same return type as the function fn
609  */
610  RetVal remote_request(procid_t targetmachine, Fn fn, ...);
611 
612 
613 
614 #endif
615 /*************************************************************************
616  * Here end the Doxygen fake functions block *
617  *************************************************************************/
618 
619 
620  private:
621  /**
622  *
623  * l
624  Immediately calls the function described by the data
625  inside the buffer. This should not be called directly.
626  */
627  void exec_function_call(procid_t source, unsigned char packet_type_mask, const char* data, const size_t len);
628 
629 
630 
631  /**
632  * \internal
633  * Called by handler threads to process the function call block
634  */
635  void process_fcall_block(fcallqueue_entry &fcallblock);
636 
637 
638  /**
639  * \internal
640  * Receive a collection of serialized function calls.
641  * This function will take ownership of the pointer
642  */
643  void deferred_function_call_chunk(char* buf, size_t len, procid_t src);
644 
645  /**
646  * \internal
647  This is called by the function handler threads
648  */
649  void fcallhandler_loop(size_t id);
650 
651  public:
652  /// \cond GRAPHLAB_INTERNAL
653  /**
654  * \internal
655  * Stops one group of handler threads and wait for them to complete.
656  * May be used to allow external threads to take over RPC processing.
657  *
658  * \param threadid Group number to stop
659  * \param total_threadid Number of groups
660  */
661  void stop_handler_threads(size_t threadid, size_t total_threadid);
662 
663  /**
664  * \internal
665  * Stops one group of handler threads and returns immediately without
666  * waiting for them to complete.
667  * May be used to allow external threads to take over RPC processing.
668  *
669  * \param threadid Group number to stop
670  * \param total_threadid Number of groups
671  */
672  void stop_handler_threads_no_wait(size_t threadid, size_t total_threadid);
673 
674  /**
675  * \internal
676  * Performs RPC processing for a group of threads in lieu of the built-in
677  * RPC threads. The group must be stopped before using stop_handler_threads
678  *
679  * \param threadid Group number to handle
680  * \param total_threadid Number of groups
681  */
682  void handle_incoming_calls(size_t threadid, size_t total_threadid);
683 
684 
685  /**
686  * \internal
687  * Restarts internal RPC threads for a group.
688  * The group must be stopped before using stop_handler_threads
689  *
690  * \param threadid Group number to restart
691  * \param total_threadid Number of groups
692  */
693  void start_handler_threads(size_t threadid, size_t total_threadid);
694 
695  /// \internal
696  size_t recv_queue_length() const {
697  return fcallqueue_length.value;
698  }
699  /// \internal
700  size_t send_queue_length() const {
701  return comm->send_queue_length();
702  }
703 
704  /// \endcond
705 
706  private:
707  inline void inc_calls_sent(procid_t procid) {
708  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, CALLS_EVENT, 1);
709  global_calls_sent[procid].inc();
710  }
711 
712  inline void inc_calls_received(procid_t procid) {
713 
714  if (!full_barrier_in_effect) {
715  size_t t = global_calls_received[procid].inc();
716  if (full_barrier_in_effect) {
717  if (t == calls_to_receive[procid]) {
718  // if it was me who set the bit
719  if (procs_complete.set_bit(procid) == false) {
720  // then decrement the incomplete count.
721  // if it was me to decreased it to 0
722  // lock and signal
723  full_barrier_lock.lock();
724  if (num_proc_recvs_incomplete.dec() == 0) {
725  full_barrier_cond.signal();
726  }
727  full_barrier_lock.unlock();
728  }
729  }
730  }
731  }
732  else {
733  //check the proc I just incremented.
734  // If I just exceeded the required size, I need
735  // to decrement the full barrier counter
736  if (global_calls_received[procid].inc() == calls_to_receive[procid]) {
737  // if it was me who set the bit
738  if (procs_complete.set_bit(procid) == false) {
739  // then decrement the incomplete count.
740  // if it was me to decreased it to 0
741  // lock and signal
742  full_barrier_lock.lock();
743  if (num_proc_recvs_incomplete.dec() == 0) {
744  full_barrier_cond.signal();
745  }
746  full_barrier_lock.unlock();
747  }
748  }
749  }
750  }
751 
752  public:
753  /// \brief Returns the total number of RPC calls made
754  inline size_t calls_sent() const {
755  size_t ctr = 0;
756  for (size_t i = 0;i < numprocs(); ++i) {
757  ctr += global_calls_sent[i].value;
758  }
759  return ctr;
760  }
761 
762  /// \brief Returns the total number of RPC calls made in millions
763  inline double mega_calls_sent() const {
764  size_t ctr = 0;
765  for (size_t i = 0;i < numprocs(); ++i) {
766  ctr += global_calls_sent[i].value;
767  }
768  return double(ctr)/(1024 * 1024);
769  }
770 
771 
772 
773  /// \brief Returns the total number of RPC calls received
774  inline size_t calls_received() const {
775  size_t ctr = 0;
776  for (size_t i = 0;i < numprocs(); ++i) {
777  ctr += global_calls_received[i].value;
778  }
779  return ctr;
780  }
781 
782  /** \brief Returns the total number of bytes sent excluding headers and other
783  * control overhead. Also see network_bytes_sent()
784  */
785  inline size_t bytes_sent() const {
786  size_t ret = 0;
787  for (size_t i = 0;i < senders.size(); ++i) ret += senders[i]->bytes_sent();
788  return ret;
789  }
790 
791  /** \brief Returns the total number of bytes sent including all headers
792  * and other control overhead. Also see bytes_sent()
793  */
794  inline size_t network_bytes_sent() const {
795  return comm->network_bytes_sent();
796  }
797 
798  /** \brief Returns the total number of megabytes sent including all headers
799  * and other control overhead. Also see network_bytes_sent()
800  */
801  inline double network_megabytes_sent() const {
802  return double(comm->network_bytes_sent()) / (1024 * 1024);
803  }
804 
805 
806 
807  /** \brief Returns the total number of bytes received excluding all headers
808  * and other control overhead. Also see bytes_sent().
809  */
810  inline size_t bytes_received() const {
811  size_t ret = 0;
812  for (size_t i = 0;i < global_bytes_received.size(); ++i) {
813  ret += global_bytes_received[i].value;
814  }
815  return ret;
816  }
817 
818  /// \cond GRAPHLAB_INTERNAL
819 
820  /// \internal
821  inline size_t register_object(void* v, dc_impl::dc_dist_object_base *rmiinstance) {
822  ASSERT_NE(v, (void*)NULL);
823  registered_objects.push_back(v);
824  registered_rmi_instance.push_back(rmiinstance);
825  return registered_objects.size() - 1;
826  }
827 
828  /// \internal
829  inline void* get_registered_object(size_t id) {
830  while(__builtin_expect((id >= num_registered_objects()), 0)) sched_yield();
831  while (__builtin_expect(registered_objects[id] == NULL, 0)) sched_yield();
832  return registered_objects[id];
833  }
834 
835  /// \internal
836  inline dc_impl::dc_dist_object_base* get_rmi_instance(size_t id) {
837  while(id >= num_registered_objects()) sched_yield();
838  ASSERT_NE(registered_rmi_instance[id], (void*)NULL);
839  return registered_rmi_instance[id];
840  }
841 
842  /// \internal
843  inline void clear_registered_object(size_t id) {
844  registered_objects[id] = (void*)NULL;
845  registered_rmi_instance[id] = NULL;
846  }
847 
848 
849  /// \endcond
850 
851  /**
852  * \brief Performs a local flush of all send buffers
853  */
854  void flush();
855 
856 
857  /**
858  * \brief Sends an object to a target machine and blocks until the
859  * target machine calls recv_from() to receive the object.
860  *
861  * This function sends a \ref sec_serializable object "t" to the target
862  * machine, but waits for the target machine to call recv_from()
863  * before returning to receive the object before returning.
864  *
865  * Example:
866  * \code
867  * int i;
868  * if (dc.procid() == 0) {
869  * i = 10;
870  * // if I am machine 0, I send the value i = 10 to machine 1
871  * dc.send_to(1, i);
872  * } else if (dc.procid() == 1) {
873  * // machine 1 receives the value of i from machine 0
874  * dc.recv_from(0, i);
875  * }
876  * // at this point machines 0 and 1 have the value i = 10
877  * \endcode
878  *
879  * \tparam U the type of object to send. This should be inferred by the
880  * compiler.
881  * \param target The target machine to send to. Target machine must call
882  * recv_from() before this call will return.
883  * \param t The object to send. It must be serializable. The type must
884  * match the target machine's call to recv_from()
885  * \param control Optional parameter. Defaults to false. If set to true,
886  * this will marked as control plane communication and will
887  * not register in bytes_received() or bytes_sent(). This must
888  * match the "control" parameter on the target machine's
889  * recv_from() call.
890  *
891  * \note Behavior is undefined if multiple threads on the same machine
892  * call send_to simultaneously
893  *
894  */
895  template <typename U>
896  inline void send_to(procid_t target, U& t, bool control = false);
897 
898  /**
899  * \brief Waits to receives an object a source machine sent via send_to()
900  *
901  * This function waits to receives a \ref sec_serializable object "t" from a
902  * source machine. The source machine must send the object using
903  * send_to(). The source machine will wait for the target machine's
904  * recv_from() to complete before returning.
905  *
906  * Example:
907  * \code
908  * int i;
909  * if (dc.procid() == 0) {
910  * i = 10;
911  * // if I am machine 0, I send the value i = 10 to machine 1
912  * dc.send_to(1, i);
913  * } else if (dc.procid() == 1) {
914  * // machine 1 receives the value of i from machine 0
915  * dc.recv_from(0, i);
916  * }
917  * // at this point machines 0 and 1 have the value i = 10
918  * \endcode
919  *
920  * \tparam U the type of object to receive. This should be inferred by the
921  * compiler.
922  * \param source The target machine to receive from. This function will block
923  * until data is received.
924  * \param t The object to receive. It must be serializable and the type
925  * must match the source machine's call to send_to()
926  * \param control Optional parameter. Defaults to false. If set to true,
927  * this will marked as control plane communication and will
928  * not register in bytes_received() or bytes_sent(). This must
929  * match the "control" parameter on the source machine's
930  * send_to() call.
931  *
932  * \note Behavior is undefined if multiple threads on the same machine
933  * call recv_from simultaneously
934  *
935  */
936  template <typename U>
937  inline void recv_from(procid_t source, U& t, bool control = false);
938 
939 
940  /**
941  * \brief This function allows one machine to broadcasts an object to all
942  * machines.
943  *
944  * The originator calls broadcast with data provided in
945  * in 'data' and originator set to true.
946  * All other callers call with originator set to false.
947  *
948  * The originator will then return 'data'. All other machines
949  * will receive the originator's transmission in the "data" parameter.
950  *
951  * This call is guaranteed to have barrier-like behavior. That is to say,
952  * this call will block until all machines enter the broadcast function.
953  *
954  * Example:
955  * \code
956  * int i;
957  * if (procid() == 0) {
958  * // if I am machine 0, I broadcast the value i = 10 to all machines
959  * i = 10;
960  * dc.broadcast(i, true);
961  * } else {
962  * // all other machines receive the broadcast value
963  * dc.broadcast(i, false);
964  * }
965  * // at this point, all machines have i = 10
966  * \endcode
967  *
968  * \note Behavior is undefined if more than one machine calls broadcast
969  * with originator set to true.
970  *
971  * \note Behavior is undefined if multiple threads on the same machine
972  * call broadcast simultaneously
973  *
974  * \param data If this is the originator, this will contain the object to
975  * broadcast. Otherwise, this will be a reference to the object
976  * receiving the broadcast.
977  * \param originator Set to true if this is the source of the broadcast.
978  * Set to false otherwise.
979  * \param control Optional parameter. Defaults to false. If set to true,
980  * this will marked as control plane communication and will
981  * not register in bytes_received() or bytes_sent(). This must
982  * be the same on all machines.
983  */
984  template <typename U>
985  inline void broadcast(U& data, bool originator, bool control = false);
986 
987  /**
988  * \brief Collects information contributed by each machine onto
989  * one machine.
990  *
991  * The goal is to collect some information from each machine onto a single
992  * target machine (sendto). To accomplish this,
993  * each machine constructs a vector of length numprocs(), and stores
994  * the data to communicate in the procid()'th entry in the vector.
995  * Then calling gather with the vector and the target machine will send
996  * the contributed value to the target.
997  * When the function returns, machine sendto will have the complete vector
998  * where data[i] is the data contributed by machine i.
999  *
1000  * Example:
1001  * \code
1002  * // construct the vector of values
1003  * std::vector<int> values;
1004  * values.resize(dc.numprocs());
1005  *
1006  * // set my contributed value
1007  * values[dc.procid()] = dc.procid();
1008  * dc.gather(values, 0);
1009  * // at this point machine 0 will have a vector with length equal to the
1010  * // number of processes, and containing values [0, 1, 2, ...]
1011  * // All other machines value vector will be unchanged.
1012  * \endcode
1013  *
1014  * \note Behavior is undefined machines call gather with different values for
1015  * sendto
1016  *
1017  * \note Behavior is undefined if multiple threads on the same machine
1018  * call gather simultaneously
1019  *
1020  * \param data A vector of length equal to the number of processes. The
1021  * information to communicate is in the entry data[procid()]
1022  * \param sendto Machine which will hold the complete vector at the end
1023  * of the operation. All machines must have the same value
1024  * for this parameter.
1025  * \param control Optional parameter. Defaults to false. If set to true,
1026  * this will marked as control plane communication and will
1027  * not register in bytes_received() or bytes_sent(). This must
1028  * be the same on all machines.
1029  */
1030  template <typename U>
1031  inline void gather(std::vector<U>& data, procid_t sendto, bool control = false);
1032 
1033  /**
1034  * \brief Sends some information contributed by each machine to all machines
1035  *
1036  * The goal is to have each machine broadcast a piece of information to all
1037  * machines. This is like gather(), but all machines have the complete vector
1038  * at the end. To accomplish this, each machine constructs a vector of
1039  * length numprocs(), and stores the data to communicate in the procid()'th
1040  * entry in the vector. Then calling all_gather with the vector will result
1041  * in all machines having a complete copy of the vector containing all
1042  * contributions (entry 0 from machine 0, entry 1 from machine 1, etc).
1043  *
1044  * Example:
1045  * \code
1046  * // construct the vector of values
1047  * std::vector<int> values;
1048  * values.resize(dc.numprocs());
1049  *
1050  * // set my contributed value
1051  * values[dc.procid()] = dc.procid();
1052  * dc.all_gather(values);
1053  * // at this point all machine will have a vector with length equal to the
1054  * // number of processes, and containing values [0, 1, 2, ...]
1055  * \endcode
1056  *
1057  * \note Behavior is undefined if multiple threads on the same machine
1058  * call all_gather simultaneously
1059  *
1060  * \param data A vector of length equal to the number of processes. The
1061  * information to communicate is in the entry data[procid()]
1062  * \param control Optional parameter. Defaults to false. If set to true,
1063  * this will marked as control plane communication and will
1064  * not register in bytes_received() or bytes_sent(). This must
1065  * be the same on all machines.
1066  */
1067  template <typename U>
1068  inline void all_gather(std::vector<U>& data, bool control = false);
1069 
1070 
1071  /**
1072  * \brief Combines a value contributed by each machine, making the result
1073  * available to all machines.
1074  *
1075  * Each machine calls all_reduce() with a object which is serializable
1076  * and has operator+= implemented. When all_reduce() returns, the "data"
1077  * variable will contain a value corresponding to adding up the objects
1078  * contributed by each machine.
1079  *
1080  * Example:
1081  * \code
1082  * int i = 1;
1083  * dc.all_reduce(i);
1084  * // since each machine contributed the value "1",
1085  * // all machines will have i = numprocs() here.
1086  * \endcode
1087  *
1088  * \param data A piece of data to perform a reduction over.
1089  * The type must implement operator+=.
1090  * \param control Optional parameter. Defaults to false. If set to true,
1091  * this will marked as control plane communication and will
1092  * not register in bytes_received() or bytes_sent(). This must
1093  * be the same on all machines.
1094  */
1095  template <typename U>
1096  inline void all_reduce(U& data, bool control = false);
1097 
1098  /**
1099  * \brief Combines a value contributed by each machine, making the result
1100  * available to all machines.
1101  *
1102  * This function is equivalent to all_reduce(), but with an externally
1103  * defined PlusEqual function.
1104  *
1105  * Each machine calls all_reduce() with a object which is serializable
1106  * and a function "plusequal" which combines two instances of the object.
1107  * When all_reduce2() returns, the "data"
1108  * variable will contain a value corresponding to adding up the objects
1109  * contributed by each machine using the plusequal function.
1110  *
1111  * Where U is the type of the object, the plusequal function must be of
1112  * the form:
1113  * \code
1114  * void plusequal(U& left, const U& right);
1115  * \endcode
1116  * and must implement the equivalent of <code>left += right; </code>
1117  *
1118  * Example:
1119  * \code
1120  * void int_plus_equal(int& a, const int& b) {
1121  * a+=b;
1122  * }
1123  *
1124  * int i = 1;
1125  * dc.all_reduce2(i, int_plus_equal);
1126  * // since each machine contributed the value "1",
1127  * // all machines will have i = numprocs() here.
1128  * \endcode
1129  *
1130  * \param data A piece of data to perform a reduction over.
1131  * \param plusequal A plusequal function on the data. Must have the prototype
1132  * void plusequal(U&, const U&)
1133  * \param control Optional parameter. Defaults to false. If set to true,
1134  * this will marked as control plane communication and will
1135  * not register in bytes_received() or bytes_sent(). This must
1136  * be the same on all machines.
1137  */
1138  template <typename U, typename PlusEqual>
1139  inline void all_reduce2(U& data, PlusEqual plusequal, bool control = false);
1140 
1141 
1142  /**
1143  \brief A distributed barrier which waits for all machines to call the
1144  barrier() function before proceeding.
1145 
1146  A machine calling the barrier() will wait until every machine
1147  reaches this barrier before continuing. Only one thread from each machine
1148  should call the barrier.
1149 
1150  \see full_barrier
1151  */
1152  void barrier();
1153 
1154 
1155 
1156 
1157  /*****************************************************************************
1158  Implementation of Full Barrier
1159 *****************************************************************************/
1160  /**
1161  * \brief A distributed barrier which waits for all machines to call
1162  * the full_barrier() function before proceeding. Also waits for all
1163  * previously issued remote calls to complete.
1164 
1165  Similar to the barrier(), but provides additional guarantees that
1166  all calls issued prior to this barrier are completed before
1167  i returning.
1168 
1169  \note This function could return prematurely if
1170  other threads are still issuing function calls since we
1171  cannot differentiate between calls issued before the barrier
1172  and calls issued while the barrier is being evaluated.
1173  Therefore, when used in a multithreaded scenario, the user must ensure
1174  that all other threads which may perform operations using this object
1175  are stopped before the full barrier is initated.
1176 
1177  \see barrier
1178  */
1179  void full_barrier();
1180 
1181 
1182  /**
1183  * \brief A wrapper on cout, that outputs only on machine 0
1184  */
1185  std::ostream& cout() const {
1186  if (procid() == 0) return std::cout;
1187  else return nullstrm;
1188  }
1189 
1190  /**
1191  * \brief A wrapper on cerr, that outputs only on machine 0
1192  */
1193  std::ostream& cerr() const {
1194  if (procid() == 0) return std::cerr;
1195  else return nullstrm;
1196  }
1197 
1198  private:
1199  mutex full_barrier_lock;
1200  conditional full_barrier_cond;
1201  std::vector<size_t> calls_to_receive;
1202  // used to inform the counter that the full barrier
1203  // is in effect and all modifications to the calls_recv
1204  // counter will need to lock and signal
1205  volatile bool full_barrier_in_effect;
1206 
1207  /** number of 'source' processor counts which have
1208  not achieved the right recv count */
1209  atomic<size_t> num_proc_recvs_incomplete;
1210 
1211  /// Marked as 1 if the proc is complete
1212  dense_bitset procs_complete;
1213  ///\internal
1214  mutable boost::iostreams::stream<boost::iostreams::null_sink> nullstrm;
1215 
1216  /*****************************************************************************
1217  Collection of Statistics
1218 *****************************************************************************/
1219 
1220  private:
1221  struct collected_statistics {
1222  size_t callssent;
1223  size_t bytessent;
1224  size_t network_bytessent;
1225  collected_statistics(): callssent(0), bytessent(0), network_bytessent(0) { }
1226  void save(oarchive &oarc) const {
1227  oarc << callssent << bytessent << network_bytessent;
1228  }
1229  void load(iarchive &iarc) {
1230  iarc >> callssent >> bytessent >> network_bytessent;
1231  }
1232  };
1233  public:
1234  /** Gather RPC statistics. All machines must call
1235  this function at the same time. However, only proc 0 will
1236  return values */
1237  std::map<std::string, size_t> gather_statistics();
1238 };
1239 
1240 
1241 
1242 
1243 } // namespace graphlab
1244 
1245 #define REGISTER_RPC(dc, f) dc.register_rpc<typeof(f)*, f>(std::string(BOOST_PP_STRINGIZE(f)))
1246 
1247 #include <graphlab/rpc/function_arg_types_undef.hpp>
1248 #include <graphlab/rpc/function_call_dispatch.hpp>
1249 #include <graphlab/rpc/request_dispatch.hpp>
1250 #include <graphlab/rpc/dc_dist_object.hpp>
1251 #include <graphlab/rpc/dc_services.hpp>
1252 
1253 namespace graphlab {
1254 
1255 template <typename U>
1256 inline void distributed_control::send_to(procid_t target, U& t, bool control) {
1257  distributed_services->send_to(target, t, control);
1258 }
1259 
1260 template <typename U>
1261 inline void distributed_control::recv_from(procid_t source, U& t, bool control) {
1262  distributed_services->recv_from(source, t, control);
1263 }
1264 
1265 template <typename U>
1266 inline void distributed_control::broadcast(U& data, bool originator, bool control) {
1267  distributed_services->broadcast(data, originator, control);
1268 }
1269 
1270 template <typename U>
1271 inline void distributed_control::gather(std::vector<U>& data, procid_t sendto, bool control) {
1272  distributed_services->gather(data, sendto, control);
1273 }
1274 
1275 template <typename U>
1276 inline void distributed_control::all_gather(std::vector<U>& data, bool control) {
1277  distributed_services->all_gather(data, control);
1278 }
1279 
1280 template <typename U>
1281 inline void distributed_control::all_reduce(U& data, bool control) {
1282  distributed_services->all_reduce(data, control);
1283 }
1284 
1285 
1286 template <typename U, typename PlusEqual>
1287 inline void distributed_control::all_reduce2(U& data, PlusEqual plusequal, bool control) {
1288  distributed_services->all_reduce2(data, plusequal, control);
1289 }
1290 
1291 
1292 
1293 namespace dc_impl {
1294  extern procid_t get_last_dc_procid();
1295 
1296  extern distributed_control* get_last_dc();
1297 }
1298 
1299 }
1300 
1301 #include <graphlab/util/mpi_tools.hpp>
1302 #endif
1303