25 #include <sys/param.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
30 #include <netinet/in.h>
35 #include <boost/unordered_map.hpp>
36 #include <boost/bind.hpp>
38 #include <graphlab/parallel/pthread_tools.hpp>
39 #include <graphlab/util/stl_util.hpp>
40 #include <graphlab/util/net_util.hpp>
41 #include <graphlab/util/mpi_tools.hpp>
43 #include <graphlab/rpc/dc.hpp>
44 #include <graphlab/rpc/dc_tcp_comm.hpp>
46 #include <graphlab/rpc/dc_buffered_stream_send2.hpp>
47 #include <graphlab/rpc/dc_stream_receive.hpp>
48 #include <graphlab/rpc/reply_increment_counter.hpp>
49 #include <graphlab/rpc/dc_services.hpp>
51 #include <graphlab/rpc/dc_init_from_env.hpp>
52 #include <graphlab/rpc/dc_init_from_mpi.hpp>
60 #define RPC_FAST_DISPATCH
68 static distributed_control* last_dc = NULL;
71 return last_dc_procid;
74 distributed_control* get_last_dc() {
75 if (last_dc == NULL) {
76 last_dc =
new distributed_control();
84 bool thrlocal_sequentialization_key_initialized =
false;
85 pthread_key_t thrlocal_sequentialization_key;
90 size_t oldval =
reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
91 size_t newval = newkey;
92 pthread_setspecific(dc_impl::thrlocal_sequentialization_key, reinterpret_cast<void*>(newval));
94 return (
unsigned char)oldval;
98 size_t oldval =
reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
99 size_t newval = (oldval + 1) % 256;
100 pthread_setspecific(dc_impl::thrlocal_sequentialization_key, reinterpret_cast<void*>(newval));
101 assert(oldval < 256);
102 return (
unsigned char)oldval;
106 size_t oldval =
reinterpret_cast<size_t>(pthread_getspecific(dc_impl::thrlocal_sequentialization_key));
107 assert(oldval < 256);
108 return (
unsigned char)oldval;
115 logstream(
LOG_INFO) <<
"Distributed Control Initialized from Environment" << std::endl;
117 logstream(
LOG_INFO) <<
"Distributed Control Initialized from MPI" << std::endl;
120 logstream(
LOG_INFO) <<
"Shared Memory Execution" << std::endl;
123 size_t port = port_and_sock.first;
124 int sock = port_and_sock.second;
126 initparam.
machines.push_back(std::string(
"localhost:") +
tostr(port));
128 initparam.
initstring = std::string(
" __sockhandle__=") +
tostr(sock) +
" ";
137 INITIALIZE_TRACER(dc_receive_queuing,
"dc: time spent on enqueue");
138 INITIALIZE_TRACER(dc_receive_multiplexing,
"dc: time spent exploding a chunk");
139 INITIALIZE_TRACER(dc_call_dispatch,
"dc: time spent issuing RPC calls");
148 INITIALIZE_TRACER(dc_receive_queuing,
"dc: time spent on enqueue");
149 INITIALIZE_TRACER(dc_receive_multiplexing,
"dc: time spent exploding a chunk");
150 INITIALIZE_TRACER(dc_call_dispatch,
"dc: time spent issuing RPC calls");
154 distributed_control::~distributed_control() {
155 distributed_services->full_barrier();
156 logstream(
LOG_INFO) <<
"Shutting down distributed control " << std::endl;
157 FREE_CALLBACK_EVENT(EVENT_NETWORK_BYTES);
158 FREE_CALLBACK_EVENT(EVENT_RPC_CALLS);
160 for (
size_t i = 0; i < deletion_callbacks.size(); ++i) {
161 deletion_callbacks[i]();
165 for (
size_t i = 0;i < senders.size(); ++i) {
171 for (
size_t i = 0;i < senders.size(); ++i) {
175 for (
size_t i = 0;i < receivers.size(); ++i) {
176 receivers[i]->shutdown();
182 for (
size_t i = 0;i < fcallqueue.size(); ++i) fcallqueue[i].stop_blocking();
183 fcallhandlers.
join();
184 logstream(
LOG_INFO) <<
"Bytes Sent: " << bytessent << std::endl;
187 logstream(
LOG_INFO) <<
"Bytes Received: " << bytesreceived << std::endl;
195 void distributed_control::exec_function_call(
procid_t source,
196 unsigned char packet_type_mask,
199 BEGIN_TRACEPOINT(dc_call_dispatch);
201 if ((packet_type_mask & POD_CALL) == 0) {
203 iarchive arc(data, len);
207 dc_impl::dispatch_type dispatch = (dc_impl::dispatch_type)f;
208 dispatch(*
this, source, packet_type_mask, data + arc.off, len - arc.off);
211 dc_impl::dispatch_type2 dispatch2 = *
reinterpret_cast<const dc_impl::dispatch_type2*
>(data);
212 dispatch2(*
this, source, packet_type_mask, data, len);
214 if ((packet_type_mask & CONTROL_PACKET) == 0) inc_calls_received(source);
215 END_TRACEPOINT(dc_call_dispatch);
218 void distributed_control::deferred_function_call_chunk(
char* buf,
size_t len,
procid_t src) {
219 BEGIN_TRACEPOINT(dc_receive_queuing);
220 fcallqueue_entry* fc =
new fcallqueue_entry;
223 fc->chunk_ref_counter = NULL;
226 fcallqueue_length.inc();
227 fcallqueue[src % fcallqueue.size()].enqueue(fc);
228 END_TRACEPOINT(dc_receive_queuing);
232 void distributed_control::process_fcall_block(fcallqueue_entry &fcallblock) {
233 if (fcallblock.is_chunk ==
false) {
234 for (
size_t i = 0;i < fcallblock.calls.size(); ++i) {
235 fcallqueue_length.dec();
236 exec_function_call(fcallblock.source, fcallblock.calls[i].packet_mask,
237 fcallblock.calls[i].data, fcallblock.calls[i].len);
239 if (fcallblock.chunk_ref_counter != NULL) {
240 if (fcallblock.chunk_ref_counter->dec(fcallblock.calls.size()) == 0) {
241 delete fcallblock.chunk_ref_counter;
242 free(fcallblock.chunk_src);
246 #ifdef RPC_FAST_DISPATCH
248 fcallqueue_length.dec();
251 char* data = fcallblock.chunk_src;
252 size_t remaininglen = fcallblock.chunk_len;
254 while(remaininglen > 0) {
255 ASSERT_GE(remaininglen,
sizeof(dc_impl::packet_hdr));
256 dc_impl::packet_hdr hdr = *
reinterpret_cast<dc_impl::packet_hdr*
>(data);
257 ASSERT_LE(hdr.len, remaininglen);
259 exec_function_call(fcallblock.source, hdr.packet_type_mask,
260 data +
sizeof(dc_impl::packet_hdr),
262 data +=
sizeof(dc_impl::packet_hdr) + hdr.len;
263 remaininglen -=
sizeof(dc_impl::packet_hdr) + hdr.len;
265 free(fcallblock.chunk_src);
269 fcallqueue_length.dec();
270 BEGIN_TRACEPOINT(dc_receive_multiplexing);
271 fcallqueue_entry* queuebufs[fcallqueue.size()];
272 atomic<size_t>* refctr =
new atomic<size_t>(0);
274 fcallqueue_entry immediate_queue;
276 immediate_queue.chunk_src = fcallblock.chunk_src;
277 immediate_queue.chunk_ref_counter = refctr;
278 immediate_queue.chunk_len = 0;
279 immediate_queue.source = fcallblock.source;
280 immediate_queue.is_chunk =
false;
282 for (
size_t i = 0;i < fcallqueue.size(); ++i) {
283 queuebufs[i] =
new fcallqueue_entry;
284 queuebufs[i]->chunk_src = fcallblock.chunk_src;
285 queuebufs[i]->chunk_ref_counter = refctr;
286 queuebufs[i]->chunk_len = 0;
287 queuebufs[i]->source = fcallblock.source;
288 queuebufs[i]->is_chunk =
false;
292 char* data = fcallblock.chunk_src;
293 size_t remaininglen = fcallblock.chunk_len;
296 while(remaininglen > 0) {
297 ASSERT_GE(remaininglen,
sizeof(dc_impl::packet_hdr));
298 dc_impl::packet_hdr hdr = *
reinterpret_cast<dc_impl::packet_hdr*
>(data);
299 ASSERT_LE(hdr.len, remaininglen);
304 if ((hdr.packet_type_mask & CONTROL_PACKET)) {
306 immediate_queue.calls.push_back(function_call_block(
307 data +
sizeof(dc_impl::packet_hdr),
309 hdr.packet_type_mask));
311 global_bytes_received[hdr.src].inc(hdr.len);
312 if (hdr.sequentialization_key == 0) {
313 queuebufs[stripe]->calls.push_back(function_call_block(
314 data +
sizeof(dc_impl::packet_hdr),
316 hdr.packet_type_mask));
318 if (stripe == (fcallblock.source % fcallqueue.size())) ++stripe;
319 if (stripe >= fcallqueue.size()) stripe -= fcallqueue.size();
322 size_t idx = (hdr.sequentialization_key % (fcallqueue.size()));
323 queuebufs[idx]->calls.push_back(function_call_block(
324 data +
sizeof(dc_impl::packet_hdr),
326 hdr.packet_type_mask));
329 data +=
sizeof(dc_impl::packet_hdr) + hdr.len;
330 remaininglen -=
sizeof(dc_impl::packet_hdr) + hdr.len;
332 END_TRACEPOINT(dc_receive_multiplexing);
333 BEGIN_TRACEPOINT(dc_receive_queuing);
334 for (
size_t i = 0;i < fcallqueue.size(); ++i) {
335 if (queuebufs[i]->calls.size() > 0) {
336 fcallqueue_length.inc(queuebufs[i]->calls.size());
337 fcallqueue[i].enqueue(queuebufs[i]);
343 END_TRACEPOINT(dc_receive_queuing);
344 if (immediate_queue.calls.size() > 0) process_fcall_block(immediate_queue);
349 void distributed_control::stop_handler_threads(
size_t threadid,
350 size_t total_threadid) {
351 stop_handler_threads_no_wait(threadid, total_threadid);
354 void distributed_control::stop_handler_threads_no_wait(
size_t threadid,
355 size_t total_threadid) {
356 for (
size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
357 fcall_handler_blockers[i].lock();
362 void distributed_control::start_handler_threads(
size_t threadid,
363 size_t total_threadid) {
364 for (
size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
365 fcall_handler_blockers[i].unlock();
369 void distributed_control::handle_incoming_calls(
size_t threadid,
370 size_t total_threadid) {
371 for (
size_t i = threadid;i < fcallqueue.size(); i += total_threadid) {
372 if (fcallqueue[i].empty_unsafe() ==
false) {
373 std::deque<fcallqueue_entry*> q;
374 fcallqueue[i].swap(q);
376 fcallqueue_entry* entry;
380 process_fcall_block(*entry);
387 void distributed_control::fcallhandler_loop(
size_t id) {
390 fcall_handler_active[id].inc();
392 fcallqueue[id].wait_for_data();
393 fcall_handler_blockers[id].lock();
394 if (fcallqueue[
id].is_alive() ==
false) {
395 fcall_handler_blockers[id].unlock();
398 std::deque<fcallqueue_entry*> q;
399 fcallqueue[id].swap(q);
401 fcallqueue_entry* entry;
405 process_fcall_block(*entry);
408 fcall_handler_blockers[id].unlock();
411 fcall_handler_active[id].dec();
415 std::map<std::string, std::string>
416 distributed_control::parse_options(std::string initstring) {
417 std::map<std::string, std::string> options;
418 std::replace(initstring.begin(), initstring.end(),
',',
' ');
419 std::replace(initstring.begin(), initstring.end(),
';',
' ');
420 std::string opt, value;
422 std::stringstream s(initstring);
424 getline(s, opt,
'=');
425 if (s.bad() || s.eof())
break;
426 getline(s, value,
' ');
433 void distributed_control::init(
const std::vector<std::string> &machines,
434 const std::string &initstring,
436 size_t numhandlerthreads,
442 else numhandlerthreads = 2;
444 dc_impl::last_dc =
this;
449 if (dc_impl::thrlocal_sequentialization_key_initialized ==
false) {
450 dc_impl::thrlocal_sequentialization_key_initialized =
true;
451 int err = pthread_key_create(&dc_impl::thrlocal_sequentialization_key, NULL);
456 full_barrier_in_effect =
false;
457 procs_complete.
resize(machines.size());
462 global_calls_sent.resize(machines.size());
463 global_calls_received.resize(machines.size());
464 global_bytes_received.resize(machines.size());
465 fcallqueue.resize(numhandlerthreads);
469 std::map<std::string,std::string> options = parse_options(initstring);
472 comm =
new dc_impl::dc_tcp_comm();
483 ASSERT_MSG(
false,
"Unexpected value for comm type");
486 if (comm->capabilities() & dc_impl::COMM_STREAM) {
487 for (
procid_t i = 0; i < machines.size(); ++i) {
488 receivers.push_back(
new dc_impl::dc_stream_receive(
this, i));
489 senders.push_back(
new dc_impl::dc_buffered_stream_send2(
this, comm, i));
494 fcall_handler_active.resize(numhandlerthreads);
495 fcall_handler_blockers.resize(numhandlerthreads);
496 fcallhandlers.
resize(numhandlerthreads);
497 for (
size_t i = 0;i < numhandlerthreads; ++i) {
498 fcallhandlers.
launch(boost::bind(&distributed_control::fcallhandler_loop,
504 localprocid = curmachineid;
505 localnumprocs = machines.size();
508 dc_impl::last_dc_procid = localprocid;
511 distributed_services =
new dc_services(*
this);
516 if (mpi_tools::initialized()) MPI_Barrier(MPI_COMM_WORLD);
519 comm->init(machines, options, curmachineid,
521 std::cerr <<
"TCP Communication layer constructed." << std::endl;
525 if (mpi_tools::initialized()) MPI_Barrier(MPI_COMM_WORLD);
529 nullstrm.open(boost::iostreams::null_sink());
533 INITIALIZE_EVENT_LOG(*
this);
534 ADD_CUMULATIVE_CALLBACK_EVENT(EVENT_NETWORK_BYTES,
"Network Utilization",
536 ADD_CUMULATIVE_CALLBACK_EVENT(EVENT_RPC_CALLS,
"RPC Calls",
543 distributed_services->barrier();
547 for (
procid_t i = 0;i < senders.size(); ++i) {
579 std::vector<size_t> calls_sent_to_target(
numprocs(), 0);
580 for (
size_t i = 0;i <
numprocs(); ++i) {
581 calls_sent_to_target[i] = global_calls_sent[i].value;
585 std::vector<std::vector<size_t> > all_calls_sent(
numprocs());
586 all_calls_sent[
procid()] = calls_sent_to_target;
590 calls_to_receive.clear(); calls_to_receive.resize(
numprocs(), 0);
591 for (
size_t i = 0;i <
numprocs(); ++i) {
592 calls_to_receive[i] += all_calls_sent[i][
procid()];
596 num_proc_recvs_incomplete.value =
numprocs();
597 procs_complete.
clear();
599 full_barrier_in_effect =
true;
603 if (global_calls_received[i].value >= calls_to_receive[i]) {
604 if (procs_complete.
set_bit(i) ==
false) {
605 num_proc_recvs_incomplete.dec();
610 full_barrier_lock.
lock();
611 while (num_proc_recvs_incomplete.value > 0) full_barrier_cond.
wait(full_barrier_lock);
612 full_barrier_lock.
unlock();
613 full_barrier_in_effect =
false;