GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <unistd.h>
25 #include <sys/param.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <ifaddrs.h>
30 #include <netinet/in.h>
31 
32 #include <map>
33 #include <sstream>
34 
35 #include <boost/unordered_map.hpp>
36 #include <boost/bind.hpp>
37 //#include <graphlab/logger/assertions.hpp>
38 #include <graphlab/parallel/pthread_tools.hpp>
39 #include <graphlab/util/stl_util.hpp>
40 #include <graphlab/util/net_util.hpp>
41 #include <graphlab/util/mpi_tools.hpp>
42 
43 #include <graphlab/rpc/dc.hpp>
44 #include <graphlab/rpc/dc_tcp_comm.hpp>
45 //#include <graphlab/rpc/dc_sctp_comm.hpp>
46 #include <graphlab/rpc/dc_buffered_stream_send2.hpp>
47 #include <graphlab/rpc/dc_stream_receive.hpp>
48 #include <graphlab/rpc/reply_increment_counter.hpp>
49 #include <graphlab/rpc/dc_services.hpp>
50 
51 #include <graphlab/rpc/dc_init_from_env.hpp>
52 #include <graphlab/rpc/dc_init_from_mpi.hpp>
53 
54 
55 // If this option is turned on,
56 // all incoming calls from the same machine
57 // will always be executed in the same thread.
58 // This decreases latency and increases throughput
59 // dramatically, but at a cost of parallelism.
60 #define RPC_FAST_DISPATCH
61 
62 
63 namespace graphlab {
64 
65 namespace dc_impl {
66 
67 static procid_t last_dc_procid = 0;
68 static distributed_control* last_dc = NULL;
69 
70 procid_t get_last_dc_procid() {
71  return last_dc_procid;
72 }
73 
74 distributed_control* get_last_dc() {
75  if (last_dc == NULL) {
76  last_dc = new distributed_control();
77  }
78  return last_dc;
79 }
80 
81 
82 
83 
84 bool thrlocal_sequentialization_key_initialized = false;
85 pthread_key_t thrlocal_sequentialization_key;
86 
87 } // namespace dc_impl
88 
89 unsigned char distributed_control::set_sequentialization_key(unsigned char newkey) {
90  size_t oldval = reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
91  size_t newval = newkey;
92  pthread_setspecific(dc_impl::thrlocal_sequentialization_key, reinterpret_cast<void*>(newval));
93  assert(oldval < 256);
94  return (unsigned char)oldval;
95 }
96 
98  size_t oldval = reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
99  size_t newval = (oldval + 1) % 256;
100  pthread_setspecific(dc_impl::thrlocal_sequentialization_key, reinterpret_cast<void*>(newval));
101  assert(oldval < 256);
102  return (unsigned char)oldval;
103 }
104 
106  size_t oldval = reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
107  assert(oldval < 256);
108  return (unsigned char)oldval;
109 }
110 
111 
113  dc_init_param initparam;
114  if (init_param_from_env(initparam)) {
115  logstream(LOG_INFO) << "Distributed Control Initialized from Environment" << std::endl;
116  } else if (mpi_tools::initialized() && init_param_from_mpi(initparam)) {
117  logstream(LOG_INFO) << "Distributed Control Initialized from MPI" << std::endl;
118  }
119  else {
120  logstream(LOG_INFO) << "Shared Memory Execution" << std::endl;
121  // get a port and socket
122  std::pair<size_t, int> port_and_sock = get_free_tcp_port();
123  size_t port = port_and_sock.first;
124  int sock = port_and_sock.second;
125 
126  initparam.machines.push_back(std::string("localhost:") + tostr(port));
127  initparam.curmachineid = 0;
128  initparam.initstring = std::string(" __sockhandle__=") + tostr(sock) + " ";
130  initparam.commtype = RPC_DEFAULT_COMMTYPE;
131  }
132  init(initparam.machines,
133  initparam.initstring,
134  initparam.curmachineid,
135  initparam.numhandlerthreads,
136  initparam.commtype);
137  INITIALIZE_TRACER(dc_receive_queuing, "dc: time spent on enqueue");
138  INITIALIZE_TRACER(dc_receive_multiplexing, "dc: time spent exploding a chunk");
139  INITIALIZE_TRACER(dc_call_dispatch, "dc: time spent issuing RPC calls");
140 }
141 
143  init(initparam.machines,
144  initparam.initstring,
145  initparam.curmachineid,
146  initparam.numhandlerthreads,
147  initparam.commtype);
148  INITIALIZE_TRACER(dc_receive_queuing, "dc: time spent on enqueue");
149  INITIALIZE_TRACER(dc_receive_multiplexing, "dc: time spent exploding a chunk");
150  INITIALIZE_TRACER(dc_call_dispatch, "dc: time spent issuing RPC calls");
151 }
152 
153 
154 distributed_control::~distributed_control() {
155  distributed_services->full_barrier();
156  logstream(LOG_INFO) << "Shutting down distributed control " << std::endl;
157  FREE_CALLBACK_EVENT(EVENT_NETWORK_BYTES);
158  FREE_CALLBACK_EVENT(EVENT_RPC_CALLS);
159  // call all deletion callbacks
160  for (size_t i = 0; i < deletion_callbacks.size(); ++i) {
161  deletion_callbacks[i]();
162  }
163 
164  size_t bytessent = bytes_sent();
165  for (size_t i = 0;i < senders.size(); ++i) {
166  senders[i]->flush();
167  }
168 
169  comm->close();
170 
171  for (size_t i = 0;i < senders.size(); ++i) {
172  delete senders[i];
173  }
174  size_t bytesreceived = bytes_received();
175  for (size_t i = 0;i < receivers.size(); ++i) {
176  receivers[i]->shutdown();
177  delete receivers[i];
178  }
179  senders.clear();
180  receivers.clear();
181  // shutdown function call handlers
182  for (size_t i = 0;i < fcallqueue.size(); ++i) fcallqueue[i].stop_blocking();
183  fcallhandlers.join();
184  logstream(LOG_INFO) << "Bytes Sent: " << bytessent << std::endl;
185  logstream(LOG_INFO) << "Calls Sent: " << calls_sent() << std::endl;
186  logstream(LOG_INFO) << "Network Sent: " << network_bytes_sent() << std::endl;
187  logstream(LOG_INFO) << "Bytes Received: " << bytesreceived << std::endl;
188  logstream(LOG_INFO) << "Calls Received: " << calls_received() << std::endl;
189 
190  delete comm;
191 
192 }
193 
194 
195 void distributed_control::exec_function_call(procid_t source,
196  unsigned char packet_type_mask,
197  const char* data,
198  const size_t len) {
199  BEGIN_TRACEPOINT(dc_call_dispatch);
200  // not a POD call
201  if ((packet_type_mask & POD_CALL) == 0) {
202  // extract the dispatch function
203  iarchive arc(data, len);
204  size_t f;
205  arc >> f;
206  // a regular funcion call
207  dc_impl::dispatch_type dispatch = (dc_impl::dispatch_type)f;
208  dispatch(*this, source, packet_type_mask, data + arc.off, len - arc.off);
209  }
210  else {
211  dc_impl::dispatch_type2 dispatch2 = *reinterpret_cast<const dc_impl::dispatch_type2*>(data);
212  dispatch2(*this, source, packet_type_mask, data, len);
213  }
214  if ((packet_type_mask & CONTROL_PACKET) == 0) inc_calls_received(source);
215  END_TRACEPOINT(dc_call_dispatch);
216 }
217 
218 void distributed_control::deferred_function_call_chunk(char* buf, size_t len, procid_t src) {
219  BEGIN_TRACEPOINT(dc_receive_queuing);
220  fcallqueue_entry* fc = new fcallqueue_entry;
221  fc->chunk_src = buf;
222  fc->chunk_len = len;
223  fc->chunk_ref_counter = NULL;
224  fc->is_chunk = true;
225  fc->source = src;
226  fcallqueue_length.inc();
227  fcallqueue[src % fcallqueue.size()].enqueue(fc);
228  END_TRACEPOINT(dc_receive_queuing);
229 }
230 
231 
232 void distributed_control::process_fcall_block(fcallqueue_entry &fcallblock) {
233  if (fcallblock.is_chunk == false) {
234  for (size_t i = 0;i < fcallblock.calls.size(); ++i) {
235  fcallqueue_length.dec();
236  exec_function_call(fcallblock.source, fcallblock.calls[i].packet_mask,
237  fcallblock.calls[i].data, fcallblock.calls[i].len);
238  }
239  if (fcallblock.chunk_ref_counter != NULL) {
240  if (fcallblock.chunk_ref_counter->dec(fcallblock.calls.size()) == 0) {
241  delete fcallblock.chunk_ref_counter;
242  free(fcallblock.chunk_src);
243  }
244  }
245  }
246 #ifdef RPC_FAST_DISPATCH
247  else {
248  fcallqueue_length.dec();
249 
250  //parse the data in fcallblock.data
251  char* data = fcallblock.chunk_src;
252  size_t remaininglen = fcallblock.chunk_len;
253  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, BYTES_EVENT, remaininglen);
254  while(remaininglen > 0) {
255  ASSERT_GE(remaininglen, sizeof(dc_impl::packet_hdr));
256  dc_impl::packet_hdr hdr = *reinterpret_cast<dc_impl::packet_hdr*>(data);
257  ASSERT_LE(hdr.len, remaininglen);
258 
259  exec_function_call(fcallblock.source, hdr.packet_type_mask,
260  data + sizeof(dc_impl::packet_hdr),
261  hdr.len);
262  data += sizeof(dc_impl::packet_hdr) + hdr.len;
263  remaininglen -= sizeof(dc_impl::packet_hdr) + hdr.len;
264  }
265  free(fcallblock.chunk_src);
266  }
267 #else
268  else {
269  fcallqueue_length.dec();
270  BEGIN_TRACEPOINT(dc_receive_multiplexing);
271  fcallqueue_entry* queuebufs[fcallqueue.size()];
272  atomic<size_t>* refctr = new atomic<size_t>(0);
273 
274  fcallqueue_entry immediate_queue;
275 
276  immediate_queue.chunk_src = fcallblock.chunk_src;
277  immediate_queue.chunk_ref_counter = refctr;
278  immediate_queue.chunk_len = 0;
279  immediate_queue.source = fcallblock.source;
280  immediate_queue.is_chunk = false;
281 
282  for (size_t i = 0;i < fcallqueue.size(); ++i) {
283  queuebufs[i] = new fcallqueue_entry;
284  queuebufs[i]->chunk_src = fcallblock.chunk_src;
285  queuebufs[i]->chunk_ref_counter = refctr;
286  queuebufs[i]->chunk_len = 0;
287  queuebufs[i]->source = fcallblock.source;
288  queuebufs[i]->is_chunk = false;
289  }
290 
291  //parse the data in fcallblock.data
292  char* data = fcallblock.chunk_src;
293  size_t remaininglen = fcallblock.chunk_len;
294  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, BYTES_EVENT, remaininglen);
295  size_t stripe = 0;
296  while(remaininglen > 0) {
297  ASSERT_GE(remaininglen, sizeof(dc_impl::packet_hdr));
298  dc_impl::packet_hdr hdr = *reinterpret_cast<dc_impl::packet_hdr*>(data);
299  ASSERT_LE(hdr.len, remaininglen);
300 
301  refctr->value++;
302 
303 
304  if ((hdr.packet_type_mask & CONTROL_PACKET)) {
305  // control calls are handled immediately with priority.
306  immediate_queue.calls.push_back(function_call_block(
307  data + sizeof(dc_impl::packet_hdr),
308  hdr.len,
309  hdr.packet_type_mask));
310  } else {
311  global_bytes_received[hdr.src].inc(hdr.len);
312  if (hdr.sequentialization_key == 0) {
313  queuebufs[stripe]->calls.push_back(function_call_block(
314  data + sizeof(dc_impl::packet_hdr),
315  hdr.len,
316  hdr.packet_type_mask));
317  ++stripe;
318  if (stripe == (fcallblock.source % fcallqueue.size())) ++stripe;
319  if (stripe >= fcallqueue.size()) stripe -= fcallqueue.size();
320  }
321  else {
322  size_t idx = (hdr.sequentialization_key % (fcallqueue.size()));
323  queuebufs[idx]->calls.push_back(function_call_block(
324  data + sizeof(dc_impl::packet_hdr),
325  hdr.len,
326  hdr.packet_type_mask));
327  }
328  }
329  data += sizeof(dc_impl::packet_hdr) + hdr.len;
330  remaininglen -= sizeof(dc_impl::packet_hdr) + hdr.len;
331  }
332  END_TRACEPOINT(dc_receive_multiplexing);
333  BEGIN_TRACEPOINT(dc_receive_queuing);
334  for (size_t i = 0;i < fcallqueue.size(); ++i) {
335  if (queuebufs[i]->calls.size() > 0) {
336  fcallqueue_length.inc(queuebufs[i]->calls.size());
337  fcallqueue[i].enqueue(queuebufs[i]);
338  }
339  else {
340  delete queuebufs[i];
341  }
342  }
343  END_TRACEPOINT(dc_receive_queuing);
344  if (immediate_queue.calls.size() > 0) process_fcall_block(immediate_queue);
345  }
346 #endif
347 }
348 
349 void distributed_control::stop_handler_threads(size_t threadid,
350  size_t total_threadid) {
351  stop_handler_threads_no_wait(threadid, total_threadid);
352 }
353 
354 void distributed_control::stop_handler_threads_no_wait(size_t threadid,
355  size_t total_threadid) {
356  for (size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
357  fcall_handler_blockers[i].lock();
358  }
359 }
360 
361 
362 void distributed_control::start_handler_threads(size_t threadid,
363  size_t total_threadid) {
364  for (size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
365  fcall_handler_blockers[i].unlock();
366  }
367 }
368 
369 void distributed_control::handle_incoming_calls(size_t threadid,
370  size_t total_threadid) {
371  for (size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
372  if (fcallqueue[i].empty_unsafe() == false) {
373  std::deque<fcallqueue_entry*> q;
374  fcallqueue[i].swap(q);
375  while (!q.empty()) {
376  fcallqueue_entry* entry;
377  entry = q.front();
378  q.pop_front();
379 
380  process_fcall_block(*entry);
381  delete entry;
382  }
383  }
384  }
385 }
386 
387 void distributed_control::fcallhandler_loop(size_t id) {
388  // pop an element off the queue
389 // float t = timer::approx_time_seconds();
390  fcall_handler_active[id].inc();
391  while(1) {
392  fcallqueue[id].wait_for_data();
393  fcall_handler_blockers[id].lock();
394  if (fcallqueue[id].is_alive() == false) {
395  fcall_handler_blockers[id].unlock();
396  break;
397  }
398  std::deque<fcallqueue_entry*> q;
399  fcallqueue[id].swap(q);
400  while (!q.empty()) {
401  fcallqueue_entry* entry;
402  entry = q.front();
403  q.pop_front();
404 
405  process_fcall_block(*entry);
406  delete entry;
407  }
408  fcall_handler_blockers[id].unlock();
409  // std::cerr << "Handler " << id << " died." << std::endl;
410  }
411  fcall_handler_active[id].dec();
412 }
413 
414 
415 std::map<std::string, std::string>
416  distributed_control::parse_options(std::string initstring) {
417  std::map<std::string, std::string> options;
418  std::replace(initstring.begin(), initstring.end(), ',', ' ');
419  std::replace(initstring.begin(), initstring.end(), ';', ' ');
420  std::string opt, value;
421  // read till the equal
422  std::stringstream s(initstring);
423  while(s.good()) {
424  getline(s, opt, '=');
425  if (s.bad() || s.eof()) break;
426  getline(s, value, ' ');
427  if (s.bad()) break;
428  options[trim(opt)] = trim(value);
429  }
430  return options;
431 }
432 
433 void distributed_control::init(const std::vector<std::string> &machines,
434  const std::string &initstring,
435  procid_t curmachineid,
436  size_t numhandlerthreads,
437  dc_comm_type commtype) {
438 
439  if (numhandlerthreads == RPC_DEFAULT_NUMHANDLERTHREADS) {
440  // autoconfigure
441  if (thread::cpu_count() > 2) numhandlerthreads = thread::cpu_count() - 2;
442  else numhandlerthreads = 2;
443  }
444  dc_impl::last_dc = this;
445  ASSERT_MSG(machines.size() <= RPC_MAX_N_PROCS,
446  "Number of processes exceeded hard limit of %d", RPC_MAX_N_PROCS);
447 
448  // initialize thread local storage
449  if (dc_impl::thrlocal_sequentialization_key_initialized == false) {
450  dc_impl::thrlocal_sequentialization_key_initialized = true;
451  int err = pthread_key_create(&dc_impl::thrlocal_sequentialization_key, NULL);
452  ASSERT_EQ(err, 0);
453  }
454 
455  //-------- Initialize the full barrier ---------
456  full_barrier_in_effect = false;
457  procs_complete.resize(machines.size());
458  //-----------------------------------------------
459 
460  // initialize the counters
461 
462  global_calls_sent.resize(machines.size());
463  global_calls_received.resize(machines.size());
464  global_bytes_received.resize(machines.size());
465  fcallqueue.resize(numhandlerthreads);
466 
467 
468  // parse the initstring
469  std::map<std::string,std::string> options = parse_options(initstring);
470 
471  if (commtype == TCP_COMM) {
472  comm = new dc_impl::dc_tcp_comm();
473  }
474 /* else if (commtype == SCTP_COMM) {
475  #ifdef HAS_SCTP
476  comm = new dc_impl::dc_sctp_comm();
477  std::cerr << "SCTP Communication layer constructed." << std::endl;
478  #else
479  logger(LOG_FATAL, "SCTP support was not compiled");
480  #endif
481  }*/
482  else {
483  ASSERT_MSG(false, "Unexpected value for comm type");
484  }
485  // create the receiving objects
486  if (comm->capabilities() & dc_impl::COMM_STREAM) {
487  for (procid_t i = 0; i < machines.size(); ++i) {
488  receivers.push_back(new dc_impl::dc_stream_receive(this, i));
489  senders.push_back(new dc_impl::dc_buffered_stream_send2(this, comm, i));
490  }
491  }
492  // create the handler threads
493  // store the threads in the threadgroup
494  fcall_handler_active.resize(numhandlerthreads);
495  fcall_handler_blockers.resize(numhandlerthreads);
496  fcallhandlers.resize(numhandlerthreads);
497  for (size_t i = 0;i < numhandlerthreads; ++i) {
498  fcallhandlers.launch(boost::bind(&distributed_control::fcallhandler_loop,
499  this, i));
500  }
501 
502 
503  // set the local proc values
504  localprocid = curmachineid;
505  localnumprocs = machines.size();
506 
507  // set the static variable for the global function get_last_dc_procid()
508  dc_impl::last_dc_procid = localprocid;
509 
510  // construct the services
511  distributed_services = new dc_services(*this);
512  // start the machines
513 
514  // improves reliability of initialization
515 #ifdef HAS_MPI
516  if (mpi_tools::initialized()) MPI_Barrier(MPI_COMM_WORLD);
517 #endif
518 
519  comm->init(machines, options, curmachineid,
520  receivers, senders);
521  std::cerr << "TCP Communication layer constructed." << std::endl;
522 
523  // improves reliability of initialization
524 #ifdef HAS_MPI
525  if (mpi_tools::initialized()) MPI_Barrier(MPI_COMM_WORLD);
526 #endif
527  barrier();
528  // initialize the empty stream
529  nullstrm.open(boost::iostreams::null_sink());
530 
531  // initialize the event log
532 
533  INITIALIZE_EVENT_LOG(*this);
534  ADD_CUMULATIVE_CALLBACK_EVENT(EVENT_NETWORK_BYTES, "Network Utilization",
535  "MB", boost::bind(&distributed_control::network_megabytes_sent, this));
536  ADD_CUMULATIVE_CALLBACK_EVENT(EVENT_RPC_CALLS, "RPC Calls",
537  "Calls", boost::bind(&distributed_control::calls_sent, this));
538 }
539 
540 
541 
543  distributed_services->barrier();
544 }
545 
547  for (procid_t i = 0;i < senders.size(); ++i) {
548  senders[i]->flush();
549  }
550 }
551 
552 
553  /*****************************************************************************
554  Implementation of Full Barrier
555 *****************************************************************************/
556 /* It is unfortunate but this is copy paste code from dc_dist_object.hpp
557  I thought for a long time how to implement this without copy pasting and
558  I can't think of a simple enough solution.
559 
560  Part of the issue is that the "context" concept was not built into to the
561  RPC system to begin with and is currently folded in through the dc_dist_object system.
562  As a result, the global context becomes very hard to define properly.
563  Including a dc_dist_object as a member only resolves the high level contexts
564  such as barrier, broadcast, etc which do not require intrusive access into
565  deeper information about the context. The full barrier however, requires deep
566  information about the context which cannot be resolved easily.
567 */
568 
569 /**
570 This barrier ensures globally across all machines that
571 all calls issued prior to this barrier are completed before
572 returning. This function could return prematurely if
573 other threads are still issuing function calls since we
574 cannot differentiate between calls issued before the barrier
575 and calls issued while the barrier is being evaluated.
576 */
578  // gather a sum of all the calls issued to machine 0
579  std::vector<size_t> calls_sent_to_target(numprocs(), 0);
580  for (size_t i = 0;i < numprocs(); ++i) {
581  calls_sent_to_target[i] = global_calls_sent[i].value;
582  }
583 
584  // tell node 0 how many calls there are
585  std::vector<std::vector<size_t> > all_calls_sent(numprocs());
586  all_calls_sent[procid()] = calls_sent_to_target;
587  all_gather(all_calls_sent, true);
588 
589  // get the number of calls I am supposed to receive from each machine
590  calls_to_receive.clear(); calls_to_receive.resize(numprocs(), 0);
591  for (size_t i = 0;i < numprocs(); ++i) {
592  calls_to_receive[i] += all_calls_sent[i][procid()];
593 // std::cout << "Expecting " << calls_to_receive[i] << " calls from " << i << std::endl;
594  }
595  // clear the counters
596  num_proc_recvs_incomplete.value = numprocs();
597  procs_complete.clear();
598  // activate the full barrier
599  full_barrier_in_effect = true;
600  __asm("mfence");
601  // begin one pass to set all which are already completed
602  for (procid_t i = 0;i < numprocs(); ++i) {
603  if (global_calls_received[i].value >= calls_to_receive[i]) {
604  if (procs_complete.set_bit(i) == false) {
605  num_proc_recvs_incomplete.dec();
606  }
607  }
608  }
609 
610  full_barrier_lock.lock();
611  while (num_proc_recvs_incomplete.value > 0) full_barrier_cond.wait(full_barrier_lock);
612  full_barrier_lock.unlock();
613  full_barrier_in_effect = false;
614  barrier();
615 // for (size_t i = 0; i < numprocs(); ++i) {
616 // std::cout << "Received " << global_calls_received[i].value << " from " << i << std::endl;
617 // }
618 }
619 
620 
621 } //namespace graphlab
622