24 #include <graphlab/rpc/dc.hpp>
26 #ifndef GRAPHLAB_DC_DIST_OBJECT_HPP
27 #define GRAPHLAB_DC_DIST_OBJECT_HPP
31 #include <graphlab/parallel/atomic.hpp>
32 #include <graphlab/rpc/dc_internal_types.hpp>
33 #include <graphlab/rpc/dc_dist_object_base.hpp>
34 #include <graphlab/rpc/object_request_issue.hpp>
35 #include <graphlab/rpc/object_call_issue.hpp>
36 #include <graphlab/rpc/object_podcall_issue.hpp>
37 #include <graphlab/rpc/object_broadcast_issue.hpp>
38 #include <graphlab/rpc/object_podcall_broadcast_issue.hpp>
39 #include <graphlab/rpc/function_ret_type.hpp>
40 #include <graphlab/rpc/mem_function_arg_types_def.hpp>
41 #include <graphlab/util/charstream.hpp>
42 #include <boost/preprocessor.hpp>
43 #include <graphlab/util/tracepoint.hpp>
44 #include <graphlab/macros_def.hpp>
46 #define BARRIER_BRANCH_FACTOR 128
114 template <
typename T>
119 size_t control_obj_id;
121 std::vector<atomic<size_t> > callsreceived;
122 std::vector<atomic<size_t> > callssent;
123 std::vector<atomic<size_t> > bytessent;
129 DECLARE_TRACER(distobj_remote_call_time);
137 void inc_calls_received(
procid_t p) {
138 if (!full_barrier_in_effect) {
139 size_t t = callsreceived[p].inc();
140 if (full_barrier_in_effect) {
141 if (t == calls_to_receive[p]) {
143 if (procs_complete.
set_bit(p) ==
false) {
147 full_barrier_lock.
lock();
148 if (num_proc_recvs_incomplete.dec() == 0) {
149 full_barrier_cond.
signal();
151 full_barrier_lock.
unlock();
160 if (callsreceived[p].inc() == calls_to_receive[p]) {
162 if (procs_complete.
set_bit(p) ==
false) {
166 full_barrier_lock.
lock();
167 if (num_proc_recvs_incomplete.dec() == 0) {
168 full_barrier_cond.
signal();
170 full_barrier_lock.
unlock();
182 void inc_bytes_sent(
procid_t p,
size_t bytes) {
183 bytessent[p].inc(bytes);
200 dc_(dc_),owner(owner) {
202 callsreceived.resize(dc_.
numprocs());
207 gather_receive.resize(dc_.
numprocs());
211 child_barrier_counter.value = 0;
213 barrier_release = -1;
217 childbase = size_t(dc_.
procid()) * BARRIER_BRANCH_FACTOR + 1;
222 size_t maxchild = std::min<size_t>(dc_.
numprocs(),
223 childbase + BARRIER_BRANCH_FACTOR);
224 numchild = (
procid_t)(maxchild - childbase);
230 ab_child_barrier_counter.value = 0;
231 ab_barrier_sense = 1;
232 ab_barrier_release = -1;
237 full_barrier_in_effect =
false;
241 obj_id = dc_.register_object(owner,
this);
242 control_obj_id = dc_.register_object(
this,
this);
245 std::string name =
typeid(T).name();
246 INITIALIZE_TRACER(distobj_remote_call_time,
247 std::string(
"dc_dist_object ") + name +
": remote_call time");
253 for (
size_t i = 0;i <
numprocs(); ++i) {
254 ctr += callsreceived[i].value;
262 for (
size_t i = 0;i <
numprocs(); ++i) {
263 ctr += callssent[i].value;
273 for (
size_t i = 0;i <
numprocs(); ++i) {
274 ctr += bytessent[i].value;
342 #define GENARGS(Z,N,_) BOOST_PP_CAT(T, N) BOOST_PP_CAT(i, N)
343 #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
344 #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
345 #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
347 #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
348 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
349 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
350 ASSERT_LT(target, dc_.senders.size()); \
351 BEGIN_TRACEPOINT(distobj_remote_call_time); \
352 if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
353 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
354 <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
355 ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
356 END_TRACEPOINT(distobj_remote_call_time); \
362 BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (
remote_call, dc_impl::object_call_issue, STANDARD_CALL) )
363 BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (pod_call, dc_impl::object_podcall_issue, STANDARD_CALL) )
364 BOOST_PP_REPEAT(7, RPC_INTERFACE_GENERATOR, (control_call,dc_impl::object_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
367 oarchive* split_call_begin(
void (T::*remote_function)(
size_t, wild_pointer)) {
368 return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_begin(
this, obj_id, remote_function);
371 void split_call_end(
procid_t target, oarchive* oarc) {
372 inc_calls_sent(target);
373 return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_end(
this, oarc, dc_.senders[target],
374 target, STANDARD_CALL);
377 void split_call_cancel(oarchive* oarc) {
378 return dc_impl::object_split_call<T, void(T::*)(size_t, wild_pointer)>::split_call_cancel(oarc);
383 #define BROADCAST_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
384 template<typename Iterator, typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
385 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (Iterator target_begin, Iterator target_end, \
386 F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
387 if (target_begin == target_end) return; \
388 BEGIN_TRACEPOINT(distobj_remote_call_time); \
389 if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) { \
390 Iterator iter = target_begin; \
391 while (iter != target_end){ \
392 inc_calls_sent(*iter); \
396 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
397 <Iterator, T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
398 ::exec(this, dc_.senders, BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target_begin, target_end,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
399 END_TRACEPOINT(distobj_remote_call_time); \
402 BOOST_PP_REPEAT(7, BROADCAST_INTERFACE_GENERATOR, (
remote_call, dc_impl::object_broadcast_issue, STANDARD_CALL) )
403 BOOST_PP_REPEAT(7, BROADCAST_INTERFACE_GENERATOR, (pod_call, dc_impl::object_podcall_broadcast_issue, STANDARD_CALL) )
420 #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
421 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
422 BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
423 ASSERT_LT(target, dc_.senders.size()); \
424 if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
425 return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
426 <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
427 ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) )(); \
430 #define FUTURE_REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
431 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
432 BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
433 ASSERT_LT(target, dc_.senders.size()); \
434 if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
435 return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
436 <T, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
437 ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
445 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (
typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type
remote_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY) ) )
446 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
447 BOOST_PP_REPEAT(6, FUTURE_REQUEST_INTERFACE_GENERATOR, (request_future<__GLRPC_FRESULT>
future_remote_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY) ) )
451 #undef RPC_INTERFACE_GENERATOR
452 #undef BROADCAST_INTERFACE_GENERATOR
453 #undef REQUEST_INTERFACE_GENERATOR
454 #undef FUTURE_REQUEST_INTERFACE_GENERATOR
461 #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
462 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
463 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
464 ASSERT_LT(target, dc_.senders.size()); \
465 if ((BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
466 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
467 <dc_dist_object<T>, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
468 ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target,control_obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
471 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (internal_call,dc_impl::object_call_issue, STANDARD_CALL) )
472 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (internal_control_call,dc_impl::object_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
475 #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
476 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
477 BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
478 ASSERT_LT(target, dc_.senders.size()); \
479 if ((BOOST_PP_TUPLE_ELEM(3,2,ARGS) & CONTROL_PACKET) == 0) inc_calls_sent(target); \
480 return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
481 <dc_dist_object<T>, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
482 ::exec(this, dc_.senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target,control_obj_id, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) )(); \
488 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (
typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type internal_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY)) )
489 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type internal_control_request, dc_impl::object_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
492 #undef RPC_INTERFACE_GENERATOR
493 #undef REQUEST_INTERFACE_GENERATOR
501 #if DOXYGEN_DOCUMENTATION
619 void remote_call(Iterator machine_begin, Iterator machine_end, Fn fn, ...);
725 std::vector<dc_impl::recv_from_struct> recv_froms;
727 void block_and_wait_for_recv(
size_t src,
730 recv_froms[src].lock.lock();
731 recv_froms[src].data = str;
732 recv_froms[src].tag = tag;
733 recv_froms[src].hasdata =
true;
734 recv_froms[src].cond.signal();
735 recv_froms[src].lock.unlock();
743 template <
typename U>
745 std::stringstream strm;
749 dc_impl::reply_ret_type rt(REQUEST_WAIT_METHOD);
753 size_t rtptr =
reinterpret_cast<size_t>(&rt);
754 if (control ==
false) {
756 procid(), strm.str(), rtptr);
760 procid(), strm.str(), rtptr);
765 if (control ==
false) inc_calls_sent(target);
772 template <
typename U>
775 dc_impl::recv_from_struct &recvstruct = recv_froms[source];
776 recvstruct.lock.lock();
777 while (recvstruct.hasdata ==
false) {
778 recvstruct.cond.wait(recvstruct.lock);
782 std::stringstream strm(recvstruct.data);
786 std::string(
"").swap(recvstruct.data);
788 size_t tag = recvstruct.tag;
790 recvstruct.hasdata =
false;
792 recvstruct.lock.unlock();
793 if (control ==
false) {
795 dc_.control_call(source, reply_increment_counter, tag, dc_impl::blob());
800 inc_calls_received(source);
803 dc_.control_call(source, reply_increment_counter, tag, dc_impl::blob());
815 std::string broadcast_receive;
817 void set_broadcast_receive(
const std::string &s) {
818 broadcast_receive = s;
825 template <
typename U>
826 void broadcast(U& data,
bool originator,
bool control =
false) {
829 std::stringstream strm;
833 broadcast_receive = strm.str();
834 if (control ==
false) {
835 for (
size_t i = 0;i <
numprocs(); ++i) {
844 for (
size_t i = 0;i <
numprocs(); ++i) {
846 internal_control_request(i,
861 std::stringstream strm(broadcast_receive);
874 std::vector<std::string> gather_receive;
875 atomic<size_t> gatherid;
877 void set_gather_receive(
procid_t source,
const std::string &s,
size_t gid) {
878 while(gatherid.value != gid) sched_yield();
879 gather_receive[source] = s;
884 template <
typename U>
888 std::stringstream strm( std::ios::out | std::ios::binary );
892 if (control ==
false) {
893 internal_request(sendto,
900 internal_control_request(sendto,
913 std::stringstream strm(gather_receive[i],
914 std::ios::in | std::ios::binary);
934 int ab_barrier_sense;
936 int ab_barrier_release;
941 atomic<int> ab_child_barrier_counter;
944 mutex ab_barrier_mut;
945 std::string ab_children_data[BARRIER_BRANCH_FACTOR];
946 std::string ab_alldata;
951 void __ab_child_to_parent_barrier_trigger(
procid_t source, std::string collect) {
952 ab_barrier_mut.
lock();
954 ASSERT_GE(source, childbase);
955 ASSERT_LT(source, childbase + BARRIER_BRANCH_FACTOR);
956 ab_children_data[source - childbase] = collect;
957 ab_child_barrier_counter.inc(ab_barrier_sense);
966 void __ab_parent_to_child_barrier_release(
int releaseval,
967 std::string allstrings,
968 int use_control_calls) {
972 ab_alldata = allstrings;
973 for (
procid_t i = 0;i < numchild; ++i) {
974 if (use_control_calls) {
975 internal_control_call((
procid_t)(childbase + i),
976 &dc_dist_object<T>::__ab_parent_to_child_barrier_release,
982 internal_call((
procid_t)(childbase + i),
983 &dc_dist_object<T>::__ab_parent_to_child_barrier_release,
989 ab_barrier_mut.
lock();
990 ab_barrier_release = releaseval;
999 template <
typename U>
1008 int ab_barrier_val = ab_barrier_sense;
1009 ab_barrier_mut.
lock();
1012 if ((ab_barrier_sense == -1 && ab_child_barrier_counter.value == 0) ||
1013 (ab_barrier_sense == 1 && ab_child_barrier_counter.value == (
int)(numchild))) {
1015 ab_barrier_sense = -ab_barrier_sense;
1022 oarc2 << std::string(strm->c_str(), strm->size());
1023 for (
procid_t i = 0;i < numchild; ++i) {
1024 strstrm.
write(ab_children_data[i].c_str(), ab_children_data[i].length());
1028 internal_control_call(parent,
1031 std::string(strstrm->c_str(), strstrm->size()));
1034 internal_call(parent,
1037 std::string(strstrm->c_str(), strstrm->size()));
1042 ab_barrier_cond.
wait(ab_barrier_mut);
1049 ab_barrier_release = ab_barrier_val;
1053 oarc2 << std::string(strm->c_str(), strm->size());
1054 for (
procid_t i = 0;i < numchild; ++i) {
1055 strstrm.
write(ab_children_data[i].c_str(), ab_children_data[i].length());
1058 ab_alldata = std::string(strstrm->c_str(), strstrm->size());
1059 for (
procid_t i = 0;i < numchild; ++i) {
1061 internal_control_call((
procid_t)(childbase + i),
1071 ab_barrier_mut.
lock();
1073 if (ab_barrier_release == ab_barrier_val)
break;
1074 ab_barrier_cond.
wait(ab_barrier_mut);
1077 std::string local_ab_alldata = ab_alldata;
1084 std::stringstream istrm(local_ab_alldata);
1087 for (
size_t i = 0;i <
numprocs(); ++i) {
1091 std::stringstream strm2(s);
1093 iarc2 >> data[heappos];
1098 bool lefttraverseblock =
false;
1101 size_t leftbranch = heappos * BARRIER_BRANCH_FACTOR + 1;
1102 if (lefttraverseblock ==
false && leftbranch <
numprocs()) {
1103 heappos = leftbranch;
1107 bool this_is_a_right_branch = (((heappos - 1) % BARRIER_BRANCH_FACTOR) == BARRIER_BRANCH_FACTOR - 1);
1109 if (this_is_a_right_branch ==
false) {
1110 size_t sibling = heappos + 1;
1121 heappos = (heappos - 1) / BARRIER_BRANCH_FACTOR;
1122 lefttraverseblock =
true;
1131 template <
typename U,
typename PlusEqual>
1132 void all_reduce2(U& data, PlusEqual plusequal,
bool control =
false) {
1140 int ab_barrier_val = ab_barrier_sense;
1141 ab_barrier_mut.
lock();
1144 if ((ab_barrier_sense == -1 && ab_child_barrier_counter.value == 0) ||
1145 (ab_barrier_sense == 1 && ab_child_barrier_counter.value == (
int)(numchild))) {
1147 ab_barrier_sense = -ab_barrier_sense;
1152 for (
procid_t i = 0;i < numchild; ++i) {
1153 std::stringstream istrm(ab_children_data[i]);
1157 plusequal(data, tmp);
1165 internal_control_call(parent,
1168 std::string(ostrm->c_str(), ostrm->size()));
1171 internal_call(parent,
1174 std::string(ostrm->c_str(), ostrm->size()));
1179 ab_barrier_cond.
wait(ab_barrier_mut);
1186 ab_barrier_release = ab_barrier_val;
1187 for (
procid_t i = 0;i < numchild; ++i) {
1188 std::stringstream istrm(ab_children_data[i]);
1192 plusequal(data, tmp);
1199 ab_alldata = std::string(ostrm->c_str(), ostrm->size());
1200 for (
procid_t i = 0;i < numchild; ++i) {
1201 internal_control_call((
procid_t)(childbase + i),
1211 ab_barrier_mut.
lock();
1213 if (ab_barrier_release == ab_barrier_val)
break;
1214 ab_barrier_cond.
wait(ab_barrier_mut);
1219 std::string local_ab_alldata = ab_alldata;
1224 std::stringstream istrm(local_ab_alldata);
1234 template <
typename U>
1235 struct default_plus_equal {
1236 void operator()(U& u,
const U& v) {
1242 template <
typename U>
1244 all_reduce2(data, default_plus_equal<U>(), control);
1254 template <
typename U>
1255 void all_to_all(std::vector<U>& data,
bool control =
false) {
1256 ASSERT_EQ(data.size(),
numprocs());
1257 for (
size_t i = 0;i < data.size(); ++i) {
1259 std::stringstream strm( std::ios::out | std::ios::binary );
1263 if (control ==
false) {
1271 internal_control_call(i,
1272 &dc_dist_object<T>::set_gather_receive,
1280 for (
size_t i = 0; i < data.size(); ++i) {
1282 std::stringstream strm(gather_receive[i],
1283 std::ios::in | std::ios::binary);
1284 assert(strm.good());
1285 iarchive iarc(strm);
1305 int barrier_release;
1310 atomic<int> child_barrier_counter;
1312 conditional barrier_cond;
1324 void __child_to_parent_barrier_trigger(
procid_t source) {
1327 ASSERT_GE(source, childbase);
1328 ASSERT_LT(source, childbase + BARRIER_BRANCH_FACTOR);
1329 child_barrier_counter.inc(barrier_sense);
1338 void __parent_to_child_barrier_release(
int releaseval) {
1342 for (
procid_t i = 0;i < numchild; ++i) {
1343 internal_control_call((
procid_t)(childbase + i),
1344 &dc_dist_object<T>::__parent_to_child_barrier_release,
1349 barrier_release = releaseval;
1360 int barrier_val = barrier_sense;
1364 if ((barrier_sense == -1 && child_barrier_counter.value == 0) ||
1365 (barrier_sense == 1 && child_barrier_counter.value == (
int)(numchild))) {
1367 barrier_sense = -barrier_sense;
1371 internal_control_call(parent,
1377 barrier_cond.
wait(barrier_mut);
1384 barrier_release = barrier_val;
1386 for (
procid_t i = 0;i < numchild; ++i) {
1387 internal_control_call((
procid_t)(childbase + i),
1397 if (barrier_release == barrier_val)
break;
1398 barrier_cond.
wait(barrier_mut);
1410 mutex full_barrier_lock;
1412 std::vector<size_t> calls_to_receive;
1416 volatile bool full_barrier_in_effect;
1420 atomic<size_t> num_proc_recvs_incomplete;
1430 std::vector<size_t> calls_sent_to_target(
numprocs(), 0);
1431 for (
size_t i = 0;i <
numprocs(); ++i) {
1432 calls_sent_to_target[i] = callssent[i].value;
1436 std::vector<std::vector<size_t> > all_calls_sent(
numprocs());
1437 all_calls_sent[
procid()] = calls_sent_to_target;
1441 calls_to_receive.clear(); calls_to_receive.resize(
numprocs(), 0);
1442 for (
size_t i = 0;i <
numprocs(); ++i) {
1443 calls_to_receive[i] += all_calls_sent[i][
procid()];
1447 num_proc_recvs_incomplete.value =
numprocs();
1448 procs_complete.
clear();
1450 full_barrier_in_effect =
true;
1453 for (
size_t i = 0;i <
numprocs(); ++i) {
1454 if (callsreceived[i].value >= calls_to_receive[i]) {
1455 if (procs_complete.
set_bit(i) ==
false) {
1456 num_proc_recvs_incomplete.dec();
1461 full_barrier_lock.
lock();
1462 while (num_proc_recvs_incomplete.value > 0) full_barrier_cond.
wait(full_barrier_lock);
1463 full_barrier_lock.
unlock();
1464 full_barrier_in_effect =
false;
1473 struct collected_statistics {
1476 collected_statistics(): callssent(0), bytessent(0) { }
1477 void save(oarchive &oarc)
const {
1478 oarc << callssent << bytessent;
1480 void load(iarchive &iarc) {
1481 iarc >> callssent >> bytessent;
1489 std::map<std::string, size_t> ret;
1491 std::vector<collected_statistics> stats(
numprocs());
1495 for (
size_t i = 0;i <
numprocs(); ++i) {
1496 logstream(
LOG_INFO) << callssent[i].value <<
", ";
1500 for (
size_t i = 0;i <
numprocs(); ++i) {
1501 logstream(
LOG_INFO) << callsreceived[i].value <<
", ";
1508 collected_statistics cs;
1509 for (
size_t i = 0;i <
numprocs(); ++i) {
1510 cs.callssent += stats[i].callssent;
1511 cs.bytessent += stats[i].bytessent;
1513 ret[
"total_calls_sent"] = cs.callssent;
1514 ret[
"total_bytes_sent"] = cs.bytessent;
1520 #include <graphlab/macros_undef.hpp>
1521 #include <graphlab/rpc/mem_function_arg_types_undef.hpp>
1522 #undef BARRIER_BRANCH_FACTOR