26 #ifndef GRAPHLAB_ASYNC_CONSISTENT_ENGINE
27 #define GRAPHLAB_ASYNC_CONSISTENT_ENGINE
30 #include <boost/bind.hpp>
32 #include <graphlab/scheduler/ischeduler.hpp>
33 #include <graphlab/scheduler/scheduler_factory.hpp>
34 #include <graphlab/vertex_program/ivertex_program.hpp>
35 #include <graphlab/vertex_program/icontext.hpp>
36 #include <graphlab/vertex_program/context.hpp>
37 #include <graphlab/engine/iengine.hpp>
38 #include <graphlab/engine/execution_status.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
40 #include <graphlab/rpc/dc_dist_object.hpp>
41 #include <graphlab/engine/distributed_chandy_misra.hpp>
43 #include <graphlab/util/tracepoint.hpp>
44 #include <graphlab/util/memory_info.hpp>
45 #include <graphlab/rpc/distributed_event_log.hpp>
46 #include <graphlab/rpc/async_consensus.hpp>
47 #include <graphlab/engine/fake_chandy_misra.hpp>
48 #include <graphlab/aggregation/distributed_aggregator.hpp>
50 #include <graphlab/macros_def.hpp>
53 #define ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
207 template<
typename VertexProgram>
308 typedef conditional_addition_wrapper<gather_type> conditional_gather_type;
323 typedef typename graph_type::local_vertex_type local_vertex_type;
325 typedef typename graph_type::local_edge_type local_edge_type;
327 typedef typename graph_type::lvid_type lvid_type;
396 enum vertex_execution_state {
404 MIRROR_SCATTERING_AND_NEXT_LOCKING,
405 MIRROR_SCATTERING_AND_NEXT_GATHERING,
414 struct vertex_state {
433 conditional_gather_type combined_gather;
441 uint32_t apply_count_down;
457 vertex_execution_state state;
475 inline void d_lock() {
476 factorized_lock.lock();
480 inline bool d_trylock() {
481 return factorized_lock.try_lock();
485 inline void d_unlock() {
486 factorized_lock.unlock();
502 struct thread_local_data {
503 std::vector<mutex> lock;
504 atomic<size_t> npending;
505 std::vector<std::vector<lvid_type> > pending_vertices;
506 thread_local_data() : lock(4), npending(0),
507 pending_vertices(4) { }
508 void add_task_priority(lvid_type v) {
511 pending_vertices[lid].push_back(v);
515 void add_task(lvid_type v) {
516 size_t lid = (v / 32) % 4;
518 pending_vertices[lid].push_back(v);
522 bool get_task(std::vector<std::vector<lvid_type> > &v) {
526 for (
size_t i = 0;i < 4; ++i) v[i].clear();
527 for (
int i = 0; i < 4; ++i) {
529 if (!pending_vertices[i].empty()) {
530 v[i].swap(pending_vertices[i]);
533 npending -= v[i].size();
541 dc_dist_object<async_consistent_engine<VertexProgram> > rmi;
547 chandy_misra_interface<graph_type>* cmlocks;
550 thread_group thrgroup;
553 ischeduler_type* scheduler_ptr;
557 std::vector<vertex_state> vstate;
560 std::vector<conditional_gather_type> cache;
562 typedef typename iengine<VertexProgram>::aggregator_type aggregator_type;
563 aggregator_type aggregator;
570 async_consensus* consensus;
573 std::vector<thread_local_data> thrlocal;
576 atomic<uint64_t> joined_messages;
577 atomic<uint64_t> blocked_issues;
583 atomic<uint64_t> issued_messages;
584 atomic<uint64_t> pending_updates;
585 atomic<uint64_t> programs_executed;
586 atomic<double> total_update_time;
591 float max_clean_fraction;
594 size_t max_clean_forks;
600 size_t timed_termination;
604 bool factorized_consistency;
606 bool handler_intercept;
613 bool track_task_retire_time;
615 std::vector<double> task_start_time;
618 float engine_start_time;
621 graphlab_options opts_copy;
626 DECLARE_TRACER(disteng_eval_sched_task);
627 DECLARE_TRACER(disteng_chandy_misra);
628 DECLARE_TRACER(disteng_init_gathering);
629 DECLARE_TRACER(disteng_init_scattering);
630 DECLARE_TRACER(disteng_evalfac);
631 DECLARE_TRACER(disteng_internal_task_queue);
633 DECLARE_EVENT(EVENT_APPLIES);
634 DECLARE_EVENT(EVENT_GATHERS);
635 DECLARE_EVENT(EVENT_SCATTERS);
636 DECLARE_EVENT(EVENT_ACTIVE_CPUS);
637 DECLARE_EVENT(EVENT_ACTIVE_TASKS);
640 inline void ASSERT_I_AM_OWNER(
const lvid_type lvid)
const {
641 ASSERT_EQ(graph.l_get_vertex_record(lvid).owner, rmi.
procid());
643 inline void ASSERT_I_AM_NOT_OWNER(
const lvid_type lvid)
const {
644 ASSERT_NE(graph.l_get_vertex_record(lvid).owner, rmi.
procid());
672 rmi(dc, this), graph(graph), scheduler_ptr(NULL),
673 aggregator(dc, graph, new
context_type(*this, graph)), started(false),
674 engine_start_time(
timer::approx_time_seconds()), force_stop(false),
675 vdata_exchange(dc),thread_barrier(opts.get_ncpus()) {
679 max_clean_fraction = 0.2;
680 max_clean_forks = (size_t)(-1);
681 max_pending = (size_t)(-1);
682 timed_termination = (size_t)(-1);
684 factorized_consistency =
true;
685 handler_intercept = rmi.
numprocs() > 1;
686 track_task_retire_time =
false;
687 disable_locks =
false;
688 termination_reason = execution_status::UNSET;
691 INITIALIZE_TRACER(disteng_eval_sched_task,
692 "distributed_engine: Evaluate Scheduled Task");
693 INITIALIZE_TRACER(disteng_init_gathering,
694 "distributed_engine: Initialize Gather");
695 INITIALIZE_TRACER(disteng_init_scattering,
696 "distributed_engine: Initialize Scattering");
697 INITIALIZE_TRACER(disteng_evalfac,
698 "distributed_engine: Time in Factorized Update user code");
699 INITIALIZE_TRACER(disteng_internal_task_queue,
700 "distributed_engine: Time in Internal Task Queue");
701 INITIALIZE_TRACER(disteng_chandy_misra,
702 "distributed_engine: Time in Chandy Misra");
704 INITIALIZE_EVENT_LOG(dc);
705 ADD_CUMULATIVE_EVENT(EVENT_APPLIES,
"Applies",
"Calls");
706 ADD_CUMULATIVE_EVENT(EVENT_GATHERS ,
"Gathers",
"Calls");
707 ADD_CUMULATIVE_EVENT(EVENT_SCATTERS ,
"Scatters",
"Calls");
708 ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS,
"Active Threads",
"Threads");
709 ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_TASKS,
"Active Tasks",
"Tasks");
730 thread_barrier.resize_unsafe(opts.
get_ncpus());
732 foreach(std::string opt, keys) {
733 if (opt ==
"max_clean_fraction") {
736 logstream(
LOG_EMPH) <<
"Engine Option: max_clean_fraction = "
737 << max_clean_fraction << std::endl;
738 max_clean_forks = graph.num_local_edges() * max_clean_fraction;
739 }
else if (opt ==
"handler_intercept") {
741 }
else if (opt ==
"disable_locks") {
744 logstream(
LOG_EMPH) <<
"Engine Option: disable_locks = "
745 << disable_locks << std::endl;
746 }
else if (opt ==
"max_pending") {
749 logstream(
LOG_EMPH) <<
"Engine Option: max_pending = "
750 << max_pending << std::endl;
751 }
else if (opt ==
"timeout") {
754 logstream(
LOG_EMPH) <<
"Engine Option: timeout = "
755 << max_clean_fraction << std::endl;
756 }
else if (opt ==
"use_cache") {
759 logstream(
LOG_EMPH) <<
"Engine Option: use_cache = "
760 << use_cache << std::endl;
761 }
else if (opt ==
"factorized") {
764 logstream(
LOG_EMPH) <<
"Engine Option: factorized = "
765 << factorized_consistency << std::endl;
766 }
else if (opt ==
"track_task_time") {
769 logstream(
LOG_EMPH) <<
"Engine Option: track_task_time = "
770 << track_task_retire_time << std::endl;
772 logstream(
LOG_FATAL) <<
"Unexpected Engine Option: " << opt << std::endl;
778 opts_copy.set_scheduler_type(
"queued_fifo");
794 if (rmi.
procid() == 0) memory_info::print_usage(
"Before Engine Initialization");
796 << rmi.
procid() <<
": Initializing..." << std::endl;
799 scheduler_ptr = scheduler_factory<message_type>::
800 new_scheduler(graph.num_local_vertices(),
804 if (factorized_consistency ==
false) {
805 cmlocks =
new distributed_chandy_misra<graph_type>(rmi.
dc(), graph,
806 boost::bind(&engine_type::lock_ready,
this, _1),
807 boost::bind(&engine_type::forward_cached_schedule,
this, _1));
810 cmlocks =
new fake_chandy_misra<graph_type>(rmi.
dc(), graph,
811 boost::bind(&engine_type::lock_ready,
this, _1),
812 boost::bind(&engine_type::forward_cached_schedule,
this, _1));
815 vstate.resize(graph.num_local_vertices());
818 consensus =
new async_consensus(rmi.
dc(), ncpus);
821 if (use_cache) cache.resize(graph.num_local_vertices());
824 thrlocal.resize(ncpus);
825 if (rmi.
procid() == 0) memory_info::print_usage(
"After Engine Initialization");
835 delete scheduler_ptr;
843 return programs_executed.value;
868 void internal_post_delta(
const vertex_type& vertex,
872 if (!factorized_consistency) vstate[vertex.local_id()].d_lock();
873 if (cache[vertex.local_id()].not_empty()) {
874 cache[vertex.local_id()] += delta;
876 if (!factorized_consistency) vstate[vertex.local_id()].d_unlock();
880 void internal_clear_gather_cache(
const vertex_type& vertex) {
883 if (!factorized_consistency) vstate[vertex.local_id()].d_lock();
884 if(use_cache) cache[vertex.local_id()].clear();
885 if (!factorized_consistency) vstate[vertex.local_id()].d_unlock();
897 if (force_stop)
return;
898 const lvid_type local_vid = graph.local_vid(vid);
899 BEGIN_TRACEPOINT(disteng_scheduler_task_queue);
900 bool direct_injection =
false;
904 if (vstate[local_vid].state == LOCKING) {
905 vstate[local_vid].lock();
906 if (vstate[local_vid].state == LOCKING) {
907 vstate[local_vid].current_message += message;
908 direct_injection =
true;
909 joined_messages.inc();
911 vstate[local_vid].unlock();
915 if (direct_injection ==
false) {
916 scheduler_ptr->
schedule(local_vid, message);
918 END_TRACEPOINT(disteng_scheduler_task_queue);
935 if (force_stop)
return;
950 if (force_stop)
return;
953 BEGIN_TRACEPOINT(disteng_scheduler_task_queue);
954 if (factorized_consistency) {
956 const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(vtx.local_id());
958 if (owner != rmi.
procid()) {
960 rmi.
remote_call(owner, &engine_type::rpc_signal, vid, message);
964 vtx.local_id(), message);
970 vtx.local_id(), message);
973 END_TRACEPOINT(disteng_scheduler_task_queue);
976 scheduler_ptr->
schedule(vtx.local_id(), message);
991 if (force_stop)
return;
992 if (graph.is_master(gvid)) {
993 internal_signal(graph.vertex(gvid), message);
1000 for (
size_t i = 0;i < rmi.
numprocs(); ++i) {
1001 rmi.
remote_call(i, &async_consistent_engine::internal_signal_gvid,
1007 void rpc_internal_stop() {
1018 void internal_stop() {
1020 rmi.
remote_call(i, &async_consistent_engine::rpc_internal_stop);
1032 internal_signal_gvid(gvid, message);
1038 const std::string& order =
"shuffle") {
1039 logstream(
LOG_DEBUG) << rmi.
procid() <<
": Schedule All" << std::endl;
1042 std::vector<vertex_id_type> vtxs;
1043 vtxs.reserve(graph.num_local_own_vertices());
1044 for(lvid_type lvid = 0;
1045 lvid < graph.get_local_graph().num_vertices();
1047 if (graph.l_vertex(lvid).owner() == rmi.
procid()) {
1048 vtxs.push_back(lvid);
1052 if(order ==
"shuffle") {
1055 foreach(lvid_type lvid, vtxs) {
1056 scheduler_ptr->
schedule(lvid, message);
1061 void signal_vset(
const vertex_set& vset,
1063 const std::string& order =
"shuffle") {
1064 logstream(
LOG_DEBUG) << rmi.
procid() <<
": Schedule All" << std::endl;
1067 std::vector<vertex_id_type> vtxs;
1068 vtxs.reserve(graph.num_local_own_vertices());
1069 for(lvid_type lvid = 0;
1070 lvid < graph.get_local_graph().num_vertices();
1072 if (graph.l_vertex(lvid).owner() == rmi.
procid() &&
1073 vset.l_contains(lvid)) {
1074 vtxs.push_back(lvid);
1078 if(order ==
"shuffle") {
1081 foreach(lvid_type lvid, vtxs) {
1082 scheduler_ptr->
schedule(lvid, message);
1103 void master_broadcast_locking(lvid_type sched_lvid) {
1104 local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1107 << lvertex.global_id() << std::endl;
1110 const unsigned char prevkey =
1112 rmi.
remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1113 &engine_type::rpc_begin_locking, lvertex.global_id());
1129 << sched_vid << std::endl;
1133 vstate[sched_lvid].lock();
1134 if (vstate[sched_lvid].state == NONE) {
1135 vstate[sched_lvid].state = LOCKING;
1136 cmlocks->make_philosopher_hungry_per_replica(sched_lvid);
1139 else if (vstate[sched_lvid].state == MIRROR_SCATTERING) {
1140 vstate[sched_lvid].state = MIRROR_SCATTERING_AND_NEXT_LOCKING;
1142 vstate[sched_lvid].unlock();
1153 void lock_ready(lvid_type lvid) {
1154 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1155 if (!factorized_consistency) {
1159 logstream(
LOG_DEBUG) <<
"Lock ready on " <<
"L" << lvid << std::endl;
1160 vstate[lvid].lock();
1161 vstate[lvid].state = GATHERING;
1162 do_init_gather(lvid);
1163 vstate[lvid].unlock();
1164 master_broadcast_gathering(lvid, vstate[lvid].vertex_program);
1165 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1176 void master_broadcast_gathering(lvid_type sched_lvid,
1178 local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1179 BEGIN_TRACEPOINT(disteng_init_gathering);
1181 << lvertex.global_id() << std::endl;
1185 const unsigned char prevkey =
1187 rmi.
remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1188 &engine_type::rpc_begin_gathering, lvertex.global_id() , prog);
1190 END_TRACEPOINT(disteng_init_gathering);
1191 add_internal_task(sched_lvid);
1204 << sched_vid << std::endl;
1209 vstate[sched_lvid].lock();
1211 if (vstate[sched_lvid].state == MIRROR_SCATTERING) {
1212 vstate[sched_lvid].state = MIRROR_SCATTERING_AND_NEXT_GATHERING;
1213 vstate[sched_lvid].factorized_next = prog;
1217 vstate[sched_lvid].state = MIRROR_GATHERING;
1218 vstate[sched_lvid].vertex_program = prog;
1219 vstate[sched_lvid].combined_gather.clear();
1220 add_internal_task(sched_lvid);
1222 vstate[sched_lvid].unlock();
1233 const conditional_gather_type& uf) {
1234 logstream(
LOG_DEBUG) << rmi.
procid() <<
": Receiving Gather Complete of "
1235 << vid << std::endl;
1236 lvid_type lvid = graph.local_vid(vid);
1237 vstate[lvid].lock();
1238 vstate[lvid].combined_gather += uf;
1239 decrement_gather_counter(lvid,
true );
1240 vstate[lvid].unlock();
1255 bool decrement_gather_counter(
const lvid_type lvid,
bool from_rpc =
false) {
1256 vstate[lvid].apply_count_down--;
1258 << graph.global_vid(lvid) <<
"(" << vstate[lvid].apply_count_down <<
")" << std::endl;
1259 if (vstate[lvid].apply_count_down == 0) {
1261 << graph.global_vid(lvid) << std::endl;
1262 vstate[lvid].state = APPLYING;
1266 add_internal_task(lvid);
1279 void do_init_gather(lvid_type lvid) {
1280 context_type context(*
this, graph);
1281 vstate[lvid].vertex_program.init(context,
1283 vstate[lvid].current_message);
1285 vstate[lvid].combined_gather.clear();
1288 void factorized_lock_edge2_begin(lvid_type hold) {
1289 vstate[hold].d_lock();
1291 void factorized_lock_edge2(lvid_type hold, lvid_type advance) {
1292 if(vstate[advance].d_trylock())
return;
1293 else vstate[hold].d_unlock();
1294 lvid_type a = std::min(hold, advance);
1295 lvid_type b = std::max(hold, advance);
1300 void factorized_unlock_edge2(lvid_type hold,
1301 lvid_type advance) {
1302 vstate[advance].d_unlock();
1305 void factorized_unlock_edge2_end(lvid_type hold) {
1306 vstate[hold].d_unlock();
1310 void factorized_lock_edge(local_edge_type edge) {
1311 lvid_type src = edge.source().id();
1312 lvid_type target = edge.target().id();
1313 lvid_type a = std::min(src, target);
1314 lvid_type b = std::max(src, target);
1319 void factorized_unlock_edge(local_edge_type edge) {
1320 vstate[edge.source().id()].d_unlock();
1321 vstate[edge.target().id()].d_unlock();
1327 void do_gather(lvid_type lvid) {
1328 BEGIN_TRACEPOINT(disteng_evalfac);
1329 local_vertex_type lvertex(graph.l_vertex(lvid));
1332 conditional_gather_type* gather_target = NULL;
1333 bool gather_target_is_cache =
false;
1335 vstate[lvid].d_lock();
1336 if (cache[lvid].not_empty()) {
1338 vstate[lvid].combined_gather += cache[lvid];
1339 vstate[lvid].d_unlock();
1345 gather_target = &(cache[lvid]);
1346 gather_target_is_cache =
true;
1347 vstate[lvid].d_unlock();
1352 gather_target = &(vstate[lvid].combined_gather);
1355 context_type context(*
this, graph);
1356 vstate[lvid].vertex_program.pre_local_gather(gather_target->value);
1357 edge_dir_type gatherdir = vstate[lvid].vertex_program.gather_edges(context, vertex);
1361 if (!disable_locks) factorized_lock_edge2_begin(lvid);
1362 foreach(local_edge_type edge, lvertex.in_edges()) {
1363 if (!disable_locks && factorized_consistency) {
1364 factorized_lock_edge2(lvid, edge.source().id());
1368 vstate[lvid].vertex_program.gather(context, vertex, e);
1369 if (factorized_consistency && !disable_locks) {
1370 factorized_unlock_edge2(lvid, edge.source().id());
1373 if (!disable_locks) factorized_unlock_edge2_end(lvid);
1374 INCREMENT_EVENT(EVENT_GATHERS, lvertex.num_in_edges());
1378 if (!disable_locks) factorized_lock_edge2_begin(lvid);
1379 foreach(local_edge_type edge, lvertex.out_edges()) {
1380 if (!disable_locks && factorized_consistency) {
1381 factorized_lock_edge2(lvid, edge.target().id());
1385 vstate[lvid].vertex_program.gather(context, vertex, e);
1386 if (factorized_consistency && !disable_locks) factorized_unlock_edge2(lvid, edge.target().id());
1388 if (!disable_locks) factorized_unlock_edge2_end(lvid);
1389 INCREMENT_EVENT(EVENT_GATHERS, lvertex.num_out_edges());
1392 vstate[lvid].vertex_program.post_local_gather(gather_target->value);
1393 if (use_cache && gather_target_is_cache) {
1394 vstate[lvid].d_lock();
1397 vstate[lvid].combined_gather += cache[lvid];
1398 vstate[lvid].d_unlock();
1401 END_TRACEPOINT(disteng_evalfac);
1413 bool process_gather(lvid_type lvid) {
1420 const procid_t vowner = graph.l_get_vertex_record(lvid).owner;
1421 if (vowner == rmi.
procid()) {
1422 return decrement_gather_counter(lvid);
1424 vstate[lvid].state = MIRROR_SCATTERING;
1425 logstream(
LOG_DEBUG) << rmi.
procid() <<
": Send Gather Complete of " << vid
1426 <<
" to " << vowner << std::endl;
1429 &engine_type::rpc_gather_complete,
1430 graph.global_vid(lvid),
1431 vstate[lvid].combined_gather);
1433 vstate[lvid].combined_gather.clear();
1445 void do_apply(lvid_type lvid) {
1446 BEGIN_TRACEPOINT(disteng_evalfac);
1447 context_type context(*
this, graph);
1451 logstream(
LOG_DEBUG) << rmi.
procid() <<
": Apply On " << vertex.id() << std::endl;
1452 vstate[lvid].d_lock();
1453 vstate[lvid].vertex_program.apply(context,
1455 vstate[lvid].combined_gather.value);
1456 vstate[lvid].d_unlock();
1457 vstate[lvid].combined_gather.clear();
1459 DECREMENT_EVENT(EVENT_ACTIVE_TASKS, 1);
1460 INCREMENT_EVENT(EVENT_APPLIES, 1);
1461 END_TRACEPOINT(disteng_evalfac);
1470 void master_broadcast_scattering(lvid_type sched_lvid,
1472 const vertex_data_type ¢ral_vdata) {
1473 BEGIN_TRACEPOINT(disteng_init_scattering);
1474 local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1476 << lvertex.global_id() << std::endl;
1479 const unsigned char prevkey =
1481 rmi.
remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1482 &engine_type::rpc_begin_scattering,
1483 lvertex.global_id(), prog, central_vdata);
1485 END_TRACEPOINT(disteng_init_scattering);
1497 const vertex_data_type ¢ral_vdata) {
1499 vstate[lvid].lock();
1503 ASSERT_MSG(vstate[lvid].state == MIRROR_SCATTERING ||
1504 vstate[lvid].state == MIRROR_SCATTERING_AND_NEXT_GATHERING,
1505 "Unexpected state: %d", (
int)(vstate[lvid].state));
1506 graph.get_local_graph().vertex_data(lvid) = central_vdata;
1507 vstate[lvid].vertex_program = prog;
1508 add_internal_task(lvid);
1509 vstate[lvid].unlock();
1517 void do_scatter(lvid_type lvid) {
1518 BEGIN_TRACEPOINT(disteng_evalfac);
1519 local_vertex_type lvertex(graph.l_vertex(lvid));
1522 context_type context(*
this, graph);
1524 edge_dir_type scatterdir = vstate[lvid].vertex_program.scatter_edges(context, vertex);
1528 if (!disable_locks) factorized_lock_edge2_begin(lvid);
1529 foreach(local_edge_type edge, lvertex.in_edges()) {
1530 if (!disable_locks && factorized_consistency) {
1531 factorized_lock_edge2(lvid, edge.source().id());
1534 vstate[lvid].vertex_program.scatter(context, vertex, e);
1535 if (!disable_locks && factorized_consistency) {
1536 factorized_unlock_edge2(lvid, edge.source().id());
1539 if (!disable_locks) factorized_unlock_edge2_end(lvid);
1540 INCREMENT_EVENT(EVENT_SCATTERS, lvertex.num_in_edges());
1544 if (!disable_locks) factorized_lock_edge2_begin(lvid);
1545 foreach(local_edge_type edge, lvertex.out_edges()) {
1546 if (!disable_locks && factorized_consistency) {
1547 factorized_lock_edge2(lvid, edge.target().id());
1550 vstate[lvid].vertex_program.scatter(context, vertex, e);
1551 if (!disable_locks && factorized_consistency) {
1552 factorized_unlock_edge2(lvid, edge.target().id());
1555 if (!disable_locks) factorized_unlock_edge2_end(lvid);
1556 INCREMENT_EVENT(EVENT_SCATTERS, lvertex.num_out_edges());
1558 END_TRACEPOINT(disteng_evalfac);
1574 void eval_internal_task(
size_t threadid, lvid_type lvid) {
1576 if (lvid >= vstate.size()) {
1577 aggregator.tick_asynchronous_compute(threadid,
1578 aggregate_id_to_key[-lvid]);
1581 bool gather_fast_path =
false;
1582 vstate[lvid].lock();
1583 EVAL_INTERNAL_TASK_RE_EVAL_STATE:
1584 switch(vstate[lvid].state) {
1588 BEGIN_TRACEPOINT(disteng_chandy_misra);
1589 cmlocks->make_philosopher_hungry_per_replica(lvid);
1590 END_TRACEPOINT(disteng_chandy_misra);
1595 << graph.global_vid(lvid) <<
": GATHERING(" << vstate[lvid].apply_count_down <<
")" << std::endl;
1597 gather_fast_path = process_gather(lvid);
1598 if (gather_fast_path) {
1600 goto EVAL_INTERNAL_TASK_RE_EVAL_STATE;
1605 case MIRROR_GATHERING: {
1607 << graph.global_vid(lvid) <<
": MIRROR_GATHERING" << std::endl;
1608 process_gather(lvid);
1613 << graph.global_vid(lvid) <<
": APPLYING" << std::endl;
1616 vstate[lvid].state = SCATTERING;
1617 master_broadcast_scattering(lvid,
1618 vstate[lvid].vertex_program,
1619 graph.get_local_graph().vertex_data(lvid));
1624 << graph.global_vid(lvid) <<
": SCATTERING" << std::endl;
1627 programs_executed.inc();
1628 pending_updates.dec();
1629 if (track_task_retire_time) {
1630 total_update_time.inc(launch_timer.
current_time() - task_start_time[lvid]);
1634 BEGIN_TRACEPOINT(disteng_chandy_misra);
1635 cmlocks->philosopher_stops_eating_per_replica(lvid);
1636 END_TRACEPOINT(disteng_chandy_misra);
1638 if (vstate[lvid].hasnext) {
1640 signal_local_next(lvid);
1641 vstate[lvid].hasnext =
false;
1643 vstate[lvid].state = NONE;
1646 case MIRROR_SCATTERING: {
1648 << graph.global_vid(lvid) <<
": MIRROR_SCATTERING" << std::endl;
1651 vstate[lvid].state = NONE;
1652 cmlocks->philosopher_stops_eating_per_replica(lvid);
1656 case MIRROR_SCATTERING_AND_NEXT_LOCKING: {
1658 << graph.global_vid(lvid) <<
": MIRROR_SCATTERING_AND_NEXT_LOCKING" << std::endl;
1661 vstate[lvid].state = LOCKING;
1663 cmlocks->philosopher_stops_eating_per_replica(lvid);
1664 cmlocks->make_philosopher_hungry_per_replica(lvid);
1667 case MIRROR_SCATTERING_AND_NEXT_GATHERING: {
1669 << graph.global_vid(lvid) <<
": MIRROR_SCATTERING_AND_NEXT_GATHERING" << std::endl;
1671 vstate[lvid].vertex_program = vstate[lvid].factorized_next;
1673 vstate[lvid].state = MIRROR_GATHERING;
1674 vstate[lvid].combined_gather.clear();
1675 goto EVAL_INTERNAL_TASK_RE_EVAL_STATE;
1678 vstate[lvid].unlock();
1692 void get_a_task(
size_t threadid,
1693 bool& has_internal_task,
1694 std::vector<std::vector<lvid_type> >& internal_lvid,
1695 bool& has_sched_msg,
1696 lvid_type& sched_lvid,
1698 has_internal_task =
false;
1699 has_sched_msg =
false;
1703 BEGIN_TRACEPOINT(disteng_internal_task_queue);
1704 if (thrlocal[threadid].get_task(internal_lvid)) {
1705 has_internal_task =
true;
1706 END_TRACEPOINT(disteng_internal_task_queue);
1709 END_TRACEPOINT(disteng_internal_task_queue);
1711 if (cmlocks->num_clean_forks() >= max_clean_forks) {
1714 if (pending_updates.value > max_pending) {
1718 scheduler_ptr->
get_next(threadid, sched_lvid, msg);
1729 bool try_to_quit(
size_t threadid,
1730 bool& has_internal_task,
1731 std::vector<std::vector<lvid_type> >& internal_lvid,
1732 bool& has_sched_msg,
1733 lvid_type& sched_lvid,
1736 if (handler_intercept) rmi.
dc().handle_incoming_calls(threadid, ncpus);
1737 static size_t ctr = 0;
1743 issued_messages.value != programs_executed.value + blocked_issues.value) {
1773 logstream(
LOG_DEBUG) << rmi.
procid() <<
"-" << threadid <<
": " <<
"Termination Attempt "
1774 << programs_executed.value <<
"/" << issued_messages.value << std::endl;
1775 has_internal_task =
false;
1776 has_sched_msg =
false;
1778 if (handler_intercept) rmi.
dc().start_handler_threads(threadid, ncpus);
1781 BEGIN_TRACEPOINT(disteng_internal_task_queue);
1782 if (thrlocal[threadid].get_task(internal_lvid)) {
1784 <<
"\tCancelled by Internal Task" << std::endl;
1785 has_internal_task =
true;
1787 END_TRACEPOINT(disteng_internal_task_queue);
1789 if (handler_intercept) rmi.
dc().stop_handler_threads(threadid, ncpus);
1792 END_TRACEPOINT(disteng_internal_task_queue);
1795 scheduler_ptr->
get_next(threadid, sched_lvid, msg);
1798 <<
"\tTermination Double Checked" << std::endl;
1800 DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
1801 if (!endgame_mode) logstream(
LOG_EMPH) <<
"Endgame mode\n";
1802 endgame_mode =
true;
1824 INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
1826 if (handler_intercept) rmi.
dc().stop_handler_threads(threadid, ncpus);
1828 <<
"\tCancelled" << std::endl;
1833 <<
"\tCancelled by Scheduler Task" << std::endl;
1835 has_sched_msg =
true;
1836 if (handler_intercept) rmi.
dc().stop_handler_threads(threadid, ncpus);
1846 void add_internal_task(lvid_type lvid) {
1847 if (force_stop)
return;
1848 BEGIN_TRACEPOINT(disteng_internal_task_queue);
1850 a = lvid % thrlocal.size();
1851 thrlocal[a].add_task(lvid);
1853 END_TRACEPOINT(disteng_internal_task_queue);
1856 void add_internal_aggregation_task(
const std::string& key) {
1857 ASSERT_GT(aggregate_key_to_id.count(key), 0);
1858 lvid_type
id = -aggregate_key_to_id[key];
1860 for (
size_t i = 0;i < ncpus; ++i) {
1861 thrlocal[i].add_task_priority(
id);
1876 void forward_cached_schedule(lvid_type lvid) {
1877 if (!factorized_consistency) {
1879 const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(lvid);
1880 if (rec.owner != rmi.
procid()) {
1882 rmi.
remote_call(rec.owner, &engine_type::rpc_signal, rec.gvid, msg);
1893 template <
bool prelocked>
1894 void eval_sched_task(
const lvid_type sched_lvid,
1896 BEGIN_TRACEPOINT(disteng_eval_sched_task);
1898 << graph.global_vid(sched_lvid) << std::endl;
1901 const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(sched_lvid);
1903 bool acquirelock =
false;
1904 if (owner != rmi.
procid()) {
1906 rmi.
remote_call(owner, &engine_type::rpc_signal, vid, msg);
1911 issued_messages.inc();
1912 pending_updates.inc();
1913 if (prelocked ==
false) {
1914 vstate[sched_lvid].lock();
1916 if (track_task_retire_time) {
1917 task_start_time[sched_lvid] = launch_timer.
current_time();
1919 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1920 if (factorized_consistency) {
1922 if (vstate[sched_lvid].state == NONE) {
1924 vstate[sched_lvid].state = GATHERING;
1925 vstate[sched_lvid].hasnext =
false;
1926 vstate[sched_lvid].current_message = msg;
1927 vstate[sched_lvid].apply_count_down = graph.l_vertex(sched_lvid).num_mirrors() + 1;
1928 do_init_gather(sched_lvid);
1929 master_broadcast_gathering(sched_lvid, vstate[sched_lvid].vertex_program);
1932 blocked_issues.inc();
1933 pending_updates.dec();
1934 if (vstate[sched_lvid].hasnext) {
1935 scheduler_ptr->
place(sched_lvid, msg);
1936 joined_messages.inc();
1938 vstate[sched_lvid].hasnext =
true;
1939 scheduler_ptr->
place(sched_lvid, msg);
1942 if (prelocked ==
false) vstate[sched_lvid].unlock();
1943 END_TRACEPOINT(disteng_eval_sched_task);
1947 if (vstate[sched_lvid].state == NONE) {
1949 INCREMENT_EVENT(EVENT_ACTIVE_TASKS, 1);
1952 vstate[sched_lvid].state = LOCKING;
1953 vstate[sched_lvid].hasnext =
false;
1954 vstate[sched_lvid].current_message = msg;
1955 vstate[sched_lvid].apply_count_down = graph.l_vertex(sched_lvid).num_mirrors() + 1;
1958 }
else if (vstate[sched_lvid].state == LOCKING) {
1959 blocked_issues.inc();
1960 pending_updates.dec();
1961 vstate[sched_lvid].current_message += msg;
1962 joined_messages.inc();
1964 blocked_issues.inc();
1965 pending_updates.dec();
1966 if (vstate[sched_lvid].hasnext) {
1967 scheduler_ptr->
place(sched_lvid, msg);
1968 joined_messages.inc();
1970 vstate[sched_lvid].hasnext =
true;
1971 scheduler_ptr->
place(sched_lvid, msg);
1974 if (prelocked ==
false) vstate[sched_lvid].unlock();
1975 END_TRACEPOINT(disteng_eval_sched_task);
1977 BEGIN_TRACEPOINT(disteng_chandy_misra);
1978 cmlocks->make_philosopher_hungry_per_replica(sched_lvid);
1979 END_TRACEPOINT(disteng_chandy_misra);
1980 master_broadcast_locking(sched_lvid);
1982 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1988 atomic<size_t> pingid;
1993 void thread_start(
size_t threadid) {
1994 if (handler_intercept) rmi.
dc().stop_handler_threads(threadid, ncpus);
1995 bool has_internal_task =
false;
1996 bool has_sched_msg =
false;
1997 std::vector<std::vector<lvid_type> > internal_lvid;
1998 lvid_type sched_lvid;
2000 INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
2005 float next_processing_time = 0.05;
2006 timer ti; ti.start();
2010 std::string key = aggregator.tick_asynchronous();
2011 if (key !=
"") add_internal_aggregation_task(key);
2018 if (handler_intercept) rmi.
dc().handle_incoming_calls(threadid, ncpus);
2020 if (ti.current_time() >= next_processing_time && rmi.
numprocs() > 1) {
2024 if (handler_intercept) rmi.
dc().start_handler_threads(threadid, ncpus);
2025 size_t p = pingid.inc() % rmi.
numprocs();
2031 if (handler_intercept) {
2032 rmi.
dc().stop_handler_threads(threadid, ncpus);
2037 get_a_task(threadid,
2038 has_internal_task, internal_lvid,
2039 has_sched_msg, sched_lvid, msg);
2041 if (has_internal_task) {
2042 for (
size_t i = 0;i < internal_lvid.size(); ++i) {
2043 for (
size_t j = 0;j < internal_lvid[i].size(); ++j) {
2044 eval_internal_task(threadid, internal_lvid[i][j]);
2047 if (endgame_mode) rmi.
dc().
flush();
2048 }
else if (has_sched_msg) {
2049 eval_sched_task<false>(sched_lvid, msg);
2050 if (endgame_mode) rmi.
dc().
flush();
2056 else if (!try_to_quit(threadid,
2057 has_internal_task, internal_lvid,
2058 has_sched_msg, sched_lvid, msg)) {
2061 if (has_internal_task) {
2062 for (
size_t i = 0;i < internal_lvid.size(); ++i) {
2063 for (
size_t j = 0;j < internal_lvid[i].size(); ++j) {
2064 eval_internal_task(threadid, internal_lvid[i][j]);
2067 }
else if (has_sched_msg) {
2068 eval_sched_task<false>(sched_lvid, msg);
2072 if (endgame_mode) next_processing_time = 0.01;
2073 DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
2085 typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
2087 typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
2089 vdata_exchange_type vdata_exchange;
2098 void sync_vertex_data(lvid_type lvid) {
2099 ASSERT_TRUE(graph.l_is_master(lvid));
2101 local_vertex_type vertex = graph.l_vertex(lvid);
2102 foreach(
const procid_t& mirror, vertex.mirrors()) {
2103 vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()));
2111 void recv_vertex_data() {
2113 typename vdata_exchange_type::buffer_type buffer;
2114 while(vdata_exchange.recv(procid, buffer)) {
2115 foreach(
const vid_vdata_pair_type& pair, buffer) {
2116 const lvid_type lvid = graph.local_vid(pair.first);
2117 ASSERT_FALSE(graph.l_is_master(lvid));
2118 graph.l_vertex(lvid).data() = pair.second;
2164 logstream(
LOG_INFO) <<
"Spawning " << ncpus <<
" threads" << std::endl;
2165 ASSERT_TRUE(scheduler_ptr != NULL);
2168 scheduler_ptr->
start();
2172 aggregator.start(ncpus);
2173 aggregator.aggregate_all_periodic();
2174 generate_aggregate_keys();
2179 lvid_type lv = (lvid_type)graph.num_local_vertices();
2180 ASSERT_MSG(lv + aggregate_id_to_key.size() >= lv,
2181 "Internal Queue IDs numeric overflow");
2187 size_t allocatedmem = memory_info::allocated_bytes();
2192 total_update_time.value = 0.0;
2193 endgame_mode =
false;
2194 issued_messages = 0;
2195 pending_updates = 0;
2197 programs_executed = 0;
2198 if (track_task_retire_time) {
2199 task_start_time.resize(graph.num_local_vertices());
2201 launch_timer.
start();
2215 logstream(
LOG_INFO) <<
"Total Allocated Bytes: " << allocatedmem << std::endl;
2217 for (
size_t i = 0; i < ncpus; ++i) {
2218 thrgroup.
launch(boost::bind(&engine_type::thread_start,
this, i), i);
2229 size_t ctasks = programs_executed.value;
2231 programs_executed.value = ctasks;
2233 ctasks = issued_messages.value;
2235 issued_messages.value = ctasks;
2237 ctasks = blocked_issues.value;
2239 blocked_issues.value = ctasks;
2241 ctasks = joined_messages.value;
2244 joined_messages.value = ctasks;
2246 double total_upd_time = total_update_time.value;
2248 total_update_time.value = total_upd_time;
2250 rmi.
cout() <<
"Completed Tasks: " << programs_executed.value << std::endl;
2251 rmi.
cout() <<
"Issued Tasks: " << issued_messages.value << std::endl;
2252 rmi.
cout() <<
"Blocked Issues: " << blocked_issues.value << std::endl;
2253 if (track_task_retire_time) {
2254 rmi.
cout() <<
"Average Task Retire Time: "
2255 << total_update_time.value / programs_executed.value
2258 rmi.
cout() <<
"Joined Tasks: " << joined_messages.value << std::endl;
2276 return termination_reason;
2283 std::map<std::string, lvid_type> aggregate_key_to_id;
2284 std::vector<std::string> aggregate_id_to_key;
2291 void generate_aggregate_keys() {
2292 aggregate_key_to_id.clear();
2293 aggregate_id_to_key.clear();
2295 aggregate_id_to_key.push_back(
"");
2296 std::set<std::string> keys = aggregator.get_all_periodic_keys();
2298 foreach(std::string key, keys) {
2299 aggregate_id_to_key.push_back(key);
2300 aggregate_key_to_id[key] = (
lvid_type)(aggregate_id_to_key.size() - 1);
2357 #include <graphlab/macros_undef.hpp>
2359 #undef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
2361 #endif // GRAPHLAB_DISTRIBUTED_ENGINE_HPP