25 #ifndef GRAPHLAB_SEMI_SYNCHRONOUS_ENGINE_HPP
26 #define GRAPHLAB_SEMI_SYNCHRONOUS_ENGINE_HPP
30 #include <boost/bind.hpp>
32 #include <graphlab/engine/iengine.hpp>
34 #include <graphlab/vertex_program/ivertex_program.hpp>
35 #include <graphlab/vertex_program/icontext.hpp>
36 #include <graphlab/vertex_program/context.hpp>
38 #include <graphlab/engine/execution_status.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
44 #include <graphlab/parallel/pthread_tools.hpp>
45 #include <graphlab/parallel/atomic_add_vector.hpp>
46 #include <graphlab/parallel/lockfree_push_back.hpp>
47 #include <graphlab/util/tracepoint.hpp>
48 #include <graphlab/util/memory_info.hpp>
50 #include <graphlab/rpc/dc_dist_object.hpp>
51 #include <graphlab/rpc/distributed_event_log.hpp>
52 #include <graphlab/rpc/buffered_exchange.hpp>
59 #include <graphlab/macros_def.hpp>
214 template<
typename VertexProgram>
216 public iengine<VertexProgram> {
314 typedef typename graph_type::local_vertex_type local_vertex_type;
319 typedef typename graph_type::local_edge_type local_edge_type;
324 typedef typename graph_type::lvid_type lvid_type;
326 std::vector<double> per_thread_compute_time;
367 size_t max_iterations;
374 int snapshot_interval;
377 std::string snapshot_path;
383 size_t iteration_counter;
409 size_t max_active_vertices;
417 std::vector<simple_spinlock> vlocks;
427 std::vector<simple_spinlock> elocks;
435 std::vector<vertex_program_type> vertex_programs;
447 atomic<int> num_to_activate;
458 std::vector<gather_type> gather_accum;
480 std::vector<gather_type> gather_cache;
488 std::vector<lvid_type> active_superstep;
495 atomic<size_t> num_active_vertices;
502 std::vector<lvid_type> active_minorstep;
508 atomic<size_t> completed_applys;
515 atomic<size_t> shared_lvid_counter;
521 typedef std::pair<vertex_id_type, vertex_program_type> vid_prog_pair_type;
526 typedef buffered_exchange<vid_prog_pair_type> vprog_exchange_type;
532 vprog_exchange_type vprog_exchange;
537 typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
542 typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
548 vdata_exchange_type vdata_exchange;
553 typedef std::pair<vertex_id_type, gather_type> vid_gather_pair_type;
559 typedef buffered_exchange<vid_gather_pair_type> gather_exchange_type;
565 gather_exchange_type gather_exchange;
570 typedef std::pair<vertex_id_type, message_type> vid_message_pair_type;
575 typedef buffered_exchange<vid_message_pair_type> message_exchange_type;
580 message_exchange_type message_exchange;
589 DECLARE_EVENT(EVENT_APPLIES);
590 DECLARE_EVENT(EVENT_GATHERS);
591 DECLARE_EVENT(EVENT_SCATTERS);
592 DECLARE_EVENT(EVENT_ACTIVE_CPUS);
629 delete scheduler_ptr;
656 const std::string& order =
"shuffle");
660 const std::string& order =
"shuffle");
699 void internal_stop();
744 void internal_post_delta(
const vertex_type& vertex,
755 void internal_clear_gather_cache(
const vertex_type& vertex);
761 void thread_launch_wrapped_event_counter(
size_t thread_id,
762 boost::function<
void(
void)> fn) {
763 INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
764 rmi.
dc().stop_handler_threads(thread_id, threads.
size());
766 rmi.
dc().start_handler_threads(thread_id, threads.
size());
767 DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
788 template<
typename MemberFunction>
789 void run_synchronous(MemberFunction member_fun) {
790 shared_lvid_counter = 0;
792 for(
size_t i = 0; i < threads.
size(); ++i) {
793 boost::function<void(void)> invoke = boost::bind(member_fun,
this, i);
794 threads.
launch(boost::bind(
795 &semi_synchronous_engine::thread_launch_wrapped_event_counter,
820 void exchange_messages(
size_t thread_id);
829 void transfer_scheduler_to_active(
size_t thread_id);
840 void execute_gathers(
size_t thread_id);
852 void execute_applys(
size_t thread_id);
862 void execute_scatters(
size_t thread_id);
872 void sync_vertex_program(lvid_type lvid,
size_t thread_id);
882 void recv_vertex_programs(
size_t threadid,
const bool try_to_recv =
false);
891 void sync_vertex_data(lvid_type lvid,
size_t thread_id);
901 void recv_vertex_data(
size_t threadid,
const bool try_to_recv =
false);
909 void sync_gather(lvid_type lvid,
const gather_type& accum,
920 void recv_gathers(
size_t threadid,
const bool try_to_recv =
false);
928 void sync_message(lvid_type lvid,
929 const size_t thread_id,
939 void recv_messages(
size_t threadid,
const bool try_to_recv =
false);
987 template<
typename VertexProgram>
992 rmi(dc, this), graph(graph),
993 threads(opts.get_ncpus()),
994 thread_barrier(opts.get_ncpus()),
995 max_iterations(-1), snapshot_interval(-1), iteration_counter(0),
996 started(false), timeout(0),
997 max_active_vertices(1000),
999 active_superstep(128),
1000 active_superstep_pushback(active_superstep, 0),
1001 active_minorstep(128),
1002 active_minorstep_pushback(active_minorstep, 0),
1003 vprog_exchange(dc, opts.get_ncpus(), 65536),
1004 vdata_exchange(dc, opts.get_ncpus(), 65536),
1005 gather_exchange(dc, opts.get_ncpus(), 65536),
1006 message_exchange(dc, opts.get_ncpus(), 65536),
1007 aggregator(dc, graph, new
context_type(*this, graph)) {
1010 per_thread_compute_time.resize(opts.
get_ncpus());
1011 bool use_cache =
false;
1015 max_active_vertices = graph.num_local_vertices() * 0.1;
1016 max_active_vertices = std::max<size_t>(max_active_vertices, 1000);
1018 foreach(std::string opt, keys) {
1019 if (opt ==
"max_iterations") {
1022 logstream(
LOG_EMPH) <<
"Engine Option: max_iterations = "
1023 << max_iterations << std::endl;
1024 }
else if (opt ==
"timeout") {
1027 logstream(
LOG_EMPH) <<
"Engine Option: timeout = "
1028 << timeout << std::endl;
1029 }
else if (opt ==
"use_cache") {
1032 logstream(
LOG_EMPH) <<
"Engine Option: use_cache = "
1033 << use_cache << std::endl;
1034 }
else if (opt ==
"snapshot_interval") {
1037 logstream(
LOG_EMPH) <<
"Engine Option: snapshot_interval = "
1038 << snapshot_interval << std::endl;
1039 }
else if (opt ==
"snapshot_path") {
1042 logstream(
LOG_EMPH) <<
"Engine Option: snapshot_path = "
1043 << snapshot_path << std::endl;
1044 }
else if (opt ==
"max_active_vertices") {
1047 logstream(
LOG_EMPH) <<
"Engine Option: max_active_vertices = "
1048 << max_active_vertices << std::endl;
1049 }
else if (opt ==
"max_active_fraction") {
1050 float max_active_fraction = 0.1;
1052 if (max_active_fraction > 0) {
1053 max_active_vertices = graph.num_local_vertices() * max_active_fraction;
1054 max_active_vertices += (max_active_vertices == 0);
1057 max_active_vertices = (size_t)(-1);
1060 logstream(
LOG_EMPH) <<
"Engine Option: max_active_fraction = "
1061 << max_active_fraction << std::endl;
1064 logstream(
LOG_FATAL) <<
"Unexpected Engine Option: " << opt << std::endl;
1068 if (snapshot_interval >= 0 && snapshot_path.length() == 0) {
1070 <<
"Snapshot interval specified, but no snapshot path" << std::endl;
1072 INITIALIZE_EVENT_LOG(dc);
1073 ADD_CUMULATIVE_EVENT(EVENT_APPLIES,
"Applies",
"Calls");
1074 ADD_CUMULATIVE_EVENT(EVENT_GATHERS ,
"Gathers",
"Calls");
1075 ADD_CUMULATIVE_EVENT(EVENT_SCATTERS ,
"Scatters",
"Calls");
1076 ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS,
"Active Threads",
"Threads");
1079 memory_info::log_usage(
"Before Engine Initialization");
1081 active_superstep.resize(2 * max_active_vertices);
1082 active_minorstep.resize(2 * max_active_vertices);
1083 vlocks.resize(graph.num_local_vertices());
1084 vertex_programs.resize(graph.num_local_vertices());
1085 has_remote_message.
resize(graph.num_local_vertices());
1086 has_remote_message.
clear();
1090 gather_accum.resize(graph.num_local_vertices(),
gather_type());
1091 has_gather_accum.
resize(graph.num_local_vertices());
1092 has_gather_accum.
clear();
1095 gather_cache.resize(graph.num_local_vertices(),
gather_type());
1096 has_cache.
resize(graph.num_local_vertices());
1100 active_superstep.resize(opts.
get_ncpus());
1102 memory_info::log_usage(
"After Engine Initialization");
1108 opts_copy.set_scheduler_type(
"queued_fifo");
1125 template<
typename VertexProgram>
1126 typename semi_synchronous_engine<VertexProgram>::aggregator_type*
1133 template<
typename VertexProgram>
1135 for (
size_t i = 0; i < rmi.numprocs(); ++i)
1139 template<
typename VertexProgram>
1140 void semi_synchronous_engine<VertexProgram>::rpc_stop() {
1145 template<
typename VertexProgram>
1149 internal_signal_rpc(gvid, message);
1155 template<
typename VertexProgram>
1158 for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1159 if(graph.l_is_master(lvid)) {
1160 internal_signal(
vertex_type(graph.l_vertex(lvid)), message);
1166 template<
typename VertexProgram>
1169 const message_type& message,
const std::string& order) {
1170 for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1171 if(graph.l_is_master(lvid) && vset.l_contains(lvid)) {
1172 internal_signal(
vertex_type(graph.l_vertex(lvid)), message);
1178 template<
typename VertexProgram>
1181 const message_type& message) {
1182 const lvid_type lvid = vertex.local_id();
1183 if (!graph.l_is_master(lvid)) {
1184 scheduler_ptr->place(lvid, message);
1185 has_remote_message.set_bit(lvid);
1192 scheduler_ptr->schedule(lvid, message);
1198 template<
typename VertexProgram>
1199 void semi_synchronous_engine<VertexProgram>::
1200 internal_signal_broadcast(
vertex_id_type gvid,
const message_type& message) {
1201 for (
size_t i = 0; i < rmi.numprocs(); ++i) {
1202 if(i == rmi.procid()) internal_signal_rpc(gvid, message);
1203 else rmi.remote_call(i, &semi_synchronous_engine<VertexProgram>::internal_signal_rpc,
1208 template<
typename VertexProgram>
1209 void semi_synchronous_engine<VertexProgram>::
1211 const message_type& message) {
1212 if (graph.is_master(gvid)) {
1213 internal_signal(graph.vertex(gvid), message);
1221 template<
typename VertexProgram>
1222 void semi_synchronous_engine<VertexProgram>::
1223 internal_post_delta(
const vertex_type& vertex,
const gather_type& delta) {
1224 const bool caching_enabled = !gather_cache.empty();
1225 if(caching_enabled) {
1226 const lvid_type lvid = vertex.local_id();
1227 vlocks[lvid].lock();
1228 if( has_cache.get(lvid) ) {
1229 gather_cache[lvid] += delta;
1236 vlocks[lvid].unlock();
1241 template<
typename VertexProgram>
1242 void semi_synchronous_engine<VertexProgram>::
1243 internal_clear_gather_cache(
const vertex_type& vertex) {
1244 const bool caching_enabled = !gather_cache.empty();
1245 const lvid_type lvid = vertex.local_id();
1246 if(caching_enabled && has_cache.get(lvid)) {
1247 vlocks[lvid].lock();
1248 gather_cache[lvid] = gather_type();
1249 has_cache.clear_bit(lvid);
1250 vlocks[lvid].unlock();
1257 template<
typename VertexProgram>
1261 template<
typename VertexProgram>
1265 template<
typename VertexProgram>
1271 template<
typename VertexProgram>
1273 size_t allocated_memory = memory_info::allocated_bytes();
1274 rmi.all_reduce(allocated_memory);
1275 return allocated_memory;
1288 iteration_counter = 0;
1289 force_abort =
false;
1292 scheduler_ptr->start();
1296 if (snapshot_interval == 0) {
1297 graph.save_binary(snapshot_path);
1301 float last_print = -5;
1302 if (rmi.procid() == 0) {
1303 logstream(
LOG_EMPH) <<
"Iteration counter will only output every 5 seconds."
1309 while(iteration_counter < max_iterations && !force_abort ) {
1312 if(timeout != 0 && timeout < elapsed_seconds()) {
1317 bool print_this_round = (elapsed_seconds() - last_print) >= 5;
1319 if(rmi.procid() == 0 && print_this_round) {
1321 << rmi.procid() <<
": Starting iteration: " << iteration_counter
1323 last_print = elapsed_seconds();
1326 run_synchronous( &semi_synchronous_engine::exchange_messages);
1327 has_remote_message.clear();
1332 active_superstep_pushback.set_size(0);
1333 active_minorstep_pushback.set_size(0);
1334 has_gather_accum.clear();
1338 num_to_activate = max_active_vertices;
1339 num_active_vertices = 0;
1345 run_synchronous( &semi_synchronous_engine::transfer_scheduler_to_active);
1366 size_t total_active_vertices = num_active_vertices;
1367 rmi.all_reduce(total_active_vertices);
1368 if (rmi.procid() == 0 && print_this_round)
1370 <<
"\tActive vertices: " << total_active_vertices << std::endl;
1371 if(total_active_vertices == 0) {
1380 run_synchronous( &semi_synchronous_engine::execute_gathers );
1384 active_minorstep_pushback.set_size(0);
1398 run_synchronous( &semi_synchronous_engine::execute_applys );
1416 run_synchronous( &semi_synchronous_engine::execute_scatters );
1422 if(rmi.procid() == 0 && print_this_round)
1423 logstream(
LOG_EMPH) <<
"\t Running Aggregators" << std::endl;
1425 aggregator.tick_synchronous();
1427 ++iteration_counter;
1429 if (snapshot_interval > 0 && iteration_counter % snapshot_interval == 0) {
1430 graph.save_binary(snapshot_path);
1434 if (rmi.procid() == 0) {
1435 logstream(
LOG_EMPH) << iteration_counter
1436 <<
" iterations completed." << std::endl;
1439 double total_compute_time = 0;
1440 for (
size_t i = 0;i < per_thread_compute_time.size(); ++i) {
1441 total_compute_time += per_thread_compute_time[i];
1443 std::vector<double> all_compute_time_vec(rmi.numprocs());
1444 all_compute_time_vec[rmi.procid()] = total_compute_time;
1445 rmi.all_gather(all_compute_time_vec);
1447 size_t global_completed = completed_applys;
1448 rmi.all_reduce(global_completed);
1449 completed_applys = global_completed;
1450 rmi.cout() <<
"Updates: " << completed_applys.value <<
"\n";
1451 if (rmi.procid() == 0) {
1452 logstream(
LOG_INFO) <<
"Compute Balance: ";
1453 for (
size_t i = 0;i < all_compute_time_vec.size(); ++i) {
1454 logstream(
LOG_INFO) << all_compute_time_vec[i] <<
" ";
1463 return termination_reason;
1468 template<
typename VertexProgram>
1471 context_type
context(*
this, graph);
1472 const bool TRY_TO_RECV =
true;
1473 const size_t TRY_RECV_MOD = 100;
1481 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1482 if (lvid_block_start >= graph.num_local_vertices())
break;
1484 size_t lvid_bit_block = has_remote_message.
containing_word(lvid_block_start);
1485 if (lvid_bit_block == 0)
continue;
1487 local_bitset.
clear();
1489 foreach(
size_t lvid_block_offset, local_bitset) {
1490 lvid_type lvid = lvid_block_start + lvid_block_offset;
1491 if (lvid >= graph.num_local_vertices())
break;
1495 ASSERT_FALSE(graph.l_is_master(lvid));
1497 sync_message(lvid, thread_id, msg);
1499 if(++vcount % TRY_RECV_MOD == 0) recv_messages(thread_id, TRY_TO_RECV);
1502 message_exchange.partial_flush(thread_id);
1504 rmi.dc().start_handler_threads(thread_id, threads.size());
1505 thread_barrier.wait();
1506 if(thread_id == 0) message_exchange.flush();
1507 thread_barrier.wait();
1508 rmi.dc().stop_handler_threads(thread_id, threads.size());
1509 recv_messages(thread_id);
1515 template<
typename VertexProgram>
1516 void semi_synchronous_engine<VertexProgram>::
1517 transfer_scheduler_to_active(
const size_t thread_id) {
1518 context_type context(*
this, graph);
1519 const bool TRY_TO_RECV =
true;
1520 const size_t TRY_RECV_MOD = 100;
1522 size_t curthread_num_to_activate = num_to_activate / threads.size();
1523 curthread_num_to_activate += (curthread_num_to_activate == 0);
1524 size_t nactive_inc = 0;
1525 while (nactive_inc < curthread_num_to_activate) {
1529 scheduler_ptr->get_next(thread_id, lvid, msg);
1531 if (has_sched_msg) {
1534 ASSERT_TRUE(graph.l_is_master(lvid));
1538 active_superstep_pushback.push_back(lvid);
1539 vertex_type vertex = vertex_type(graph.l_vertex(lvid));
1540 vertex_programs[lvid].init(context, vertex, msg);
1542 const vertex_program_type& const_vprog = vertex_programs[lvid];
1543 const vertex_type const_vertex = vertex;
1544 if(const_vprog.gather_edges(context, const_vertex) !=
1546 active_minorstep_pushback.push_back(lvid);
1547 sync_vertex_program(lvid, thread_id);
1549 if (++vcount % TRY_RECV_MOD == 0) {
1554 recv_vertex_programs(thread_id, TRY_TO_RECV);
1560 vprog_exchange.partial_flush(thread_id);
1561 num_active_vertices.inc(nactive_inc);
1563 rmi.dc().start_handler_threads(thread_id, threads.size());
1564 thread_barrier.wait();
1565 if(thread_id == 0) {
1566 vprog_exchange.flush();
1568 thread_barrier.wait();
1569 rmi.dc().stop_handler_threads(thread_id, threads.size());
1570 recv_vertex_programs(thread_id);
1574 template<
typename VertexProgram>
1575 void semi_synchronous_engine<VertexProgram>::
1576 execute_gathers(
const size_t thread_id) {
1577 context_type context(*
this, graph);
1578 const bool TRY_TO_RECV =
true;
1579 const size_t TRY_RECV_MOD = 1000;
1581 const bool caching_enabled = !gather_cache.empty();
1584 size_t numminorstep = active_minorstep_pushback.size();
1586 size_t i = shared_lvid_counter.inc_ret_last();
1587 if (i >= numminorstep)
break;
1590 bool accum_is_set =
false;
1591 gather_type accum = gather_type();
1594 if( caching_enabled && has_cache.get(lvid) ) {
1595 accum = gather_cache[lvid];
1596 accum_is_set =
true;
1599 const vertex_program_type& vprog = vertex_programs[lvid];
1600 local_vertex_type local_vertex = graph.l_vertex(lvid);
1601 const vertex_type vertex(local_vertex);
1602 const edge_dir_type gather_dir = vprog.gather_edges(context, vertex);
1604 size_t edges_touched = 0;
1605 vprog.pre_local_gather(accum);
1607 foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1608 edge_type edge(local_edge);
1611 accum += vprog.gather(context, vertex, edge);
1613 accum = vprog.gather(context, vertex, edge);
1614 accum_is_set =
true;
1622 foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1623 edge_type edge(local_edge);
1626 accum += vprog.gather(context, vertex, edge);
1628 accum = vprog.gather(context, vertex, edge);
1629 accum_is_set =
true;
1634 INCREMENT_EVENT(EVENT_GATHERS, edges_touched);
1636 vprog.post_local_gather(accum);
1641 if(caching_enabled && accum_is_set) {
1642 gather_cache[lvid] = accum; has_cache.set_bit(lvid);
1647 if(accum_is_set) sync_gather(lvid, accum, thread_id);
1648 if(!graph.l_is_master(lvid)) {
1650 vertex_programs[lvid] = vertex_program_type();
1654 if(++vcount % TRY_RECV_MOD == 0) recv_gathers(thread_id, TRY_TO_RECV);
1656 per_thread_compute_time[thread_id] += ti.current_time();
1657 gather_exchange.partial_flush(thread_id);
1659 rmi.dc().start_handler_threads(thread_id, threads.size());
1660 thread_barrier.wait();
1661 if(thread_id == 0) gather_exchange.flush();
1662 thread_barrier.wait();
1663 rmi.dc().stop_handler_threads(thread_id, threads.size());
1664 recv_gathers(thread_id);
1668 template<
typename VertexProgram>
1669 void semi_synchronous_engine<VertexProgram>::
1670 execute_applys(
const size_t thread_id) {
1671 context_type context(*
this, graph);
1672 const bool TRY_TO_RECV =
true;
1673 const size_t TRY_RECV_MOD = 1000;
1677 size_t numactive = active_superstep_pushback.size();
1679 size_t i = shared_lvid_counter.inc_ret_last();
1680 if (i >= numactive)
break;
1683 ASSERT_TRUE(graph.l_is_master(lvid));
1684 vertex_type vertex(graph.l_vertex(lvid));
1687 const gather_type& accum = gather_accum[lvid];
1688 INCREMENT_EVENT(EVENT_APPLIES, 1);
1689 vertex_programs[lvid].apply(context, vertex, accum);
1693 gather_accum[lvid] = gather_type();
1695 sync_vertex_data(lvid, thread_id);
1697 const vertex_program_type& const_vprog = vertex_programs[lvid];
1698 const vertex_type const_vertex = vertex;
1699 if(const_vprog.scatter_edges(context, const_vertex) !=
1701 active_minorstep_pushback.push_back(lvid);
1702 sync_vertex_program(lvid, thread_id);
1704 vertex_programs[lvid] = vertex_program_type();
1707 if(++vcount % TRY_RECV_MOD == 0) {
1708 recv_vertex_programs(thread_id, TRY_TO_RECV);
1709 recv_vertex_data(thread_id, TRY_TO_RECV);
1713 per_thread_compute_time[thread_id] += ti.current_time();
1714 vprog_exchange.partial_flush(thread_id);
1715 vdata_exchange.partial_flush(thread_id);
1717 rmi.dc().start_handler_threads(thread_id, threads.size());
1718 thread_barrier.wait();
1719 if(thread_id == 0) {
1720 vprog_exchange.flush(); vdata_exchange.flush();
1722 thread_barrier.wait();
1723 rmi.dc().stop_handler_threads(thread_id, threads.size());
1724 recv_vertex_programs(thread_id);
1725 recv_vertex_data(thread_id);
1732 template<
typename VertexProgram>
1733 void semi_synchronous_engine<VertexProgram>::
1734 execute_scatters(
const size_t thread_id) {
1735 context_type context(*
this, graph);
1738 size_t numminorstep = active_minorstep_pushback.size();
1740 size_t i = shared_lvid_counter.inc_ret_last();
1741 if (i >= numminorstep)
break;
1744 const vertex_program_type& vprog = vertex_programs[lvid];
1745 local_vertex_type local_vertex = graph.l_vertex(lvid);
1746 const vertex_type vertex(local_vertex);
1747 const edge_dir_type scatter_dir = vprog.scatter_edges(context, vertex);
1748 size_t edges_touched = 0;
1751 foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1752 edge_type edge(local_edge);
1754 vprog.scatter(context, vertex, edge);
1761 foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1762 edge_type edge(local_edge);
1764 vprog.scatter(context, vertex, edge);
1769 INCREMENT_EVENT(EVENT_SCATTERS, edges_touched);
1771 vertex_programs[lvid] = vertex_program_type();
1774 per_thread_compute_time[thread_id] += ti.current_time();
1780 template<
typename VertexProgram>
1781 void semi_synchronous_engine<VertexProgram>::
1782 sync_vertex_program(
lvid_type lvid,
const size_t thread_id) {
1783 ASSERT_TRUE(graph.l_is_master(lvid));
1785 local_vertex_type vertex = graph.l_vertex(lvid);
1786 foreach(
const procid_t& mirror, vertex.mirrors()) {
1787 vprog_exchange.send(mirror,
1788 std::make_pair(vid, vertex_programs[lvid]),
1795 template<
typename VertexProgram>
1796 void semi_synchronous_engine<VertexProgram>::
1797 recv_vertex_programs(
size_t threadid,
const bool try_to_recv) {
1798 rmi.dc().handle_incoming_calls(threadid, threads.size());
1800 typename vprog_exchange_type::buffer_type buffer;
1801 while(vprog_exchange.recv(procid, buffer, try_to_recv)) {
1802 foreach(
const vid_prog_pair_type& pair, buffer) {
1803 const lvid_type lvid = graph.local_vid(pair.first);
1805 vertex_programs[lvid] = pair.second;
1806 active_minorstep_pushback.push_back(lvid);
1812 template<
typename VertexProgram>
1813 void semi_synchronous_engine<VertexProgram>::
1814 sync_vertex_data(
lvid_type lvid,
const size_t thread_id) {
1815 ASSERT_TRUE(graph.l_is_master(lvid));
1817 local_vertex_type vertex = graph.l_vertex(lvid);
1818 foreach(
const procid_t& mirror, vertex.mirrors()) {
1819 vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()), thread_id);
1827 template<
typename VertexProgram>
1828 void semi_synchronous_engine<VertexProgram>::
1829 recv_vertex_data(
size_t threadid,
bool try_to_recv) {
1830 rmi.dc().handle_incoming_calls(threadid, threads.size());
1832 typename vdata_exchange_type::buffer_type buffer;
1833 while(vdata_exchange.recv(procid, buffer, try_to_recv)) {
1834 foreach(
const vid_vdata_pair_type& pair, buffer) {
1835 const lvid_type lvid = graph.local_vid(pair.first);
1836 ASSERT_FALSE(graph.l_is_master(lvid));
1837 graph.l_vertex(lvid).data() = pair.second;
1843 template<
typename VertexProgram>
1844 void semi_synchronous_engine<VertexProgram>::
1845 sync_gather(
lvid_type lvid,
const gather_type& accum,
const size_t thread_id) {
1846 if(graph.l_is_master(lvid)) {
1847 vlocks[lvid].lock();
1848 if(has_gather_accum.get(lvid)) {
1849 gather_accum[lvid] += accum;
1851 gather_accum[lvid] = accum;
1852 has_gather_accum.set_bit(lvid);
1854 vlocks[lvid].unlock();
1856 const procid_t master = graph.l_master(lvid);
1858 gather_exchange.send(master, std::make_pair(vid, accum), thread_id);
1862 template<
typename VertexProgram>
1863 void semi_synchronous_engine<VertexProgram>::
1864 recv_gathers(
size_t threadid,
const bool try_to_recv) {
1865 rmi.dc().handle_incoming_calls(threadid, threads.size());
1867 typename gather_exchange_type::buffer_type buffer;
1868 while(gather_exchange.recv(procid, buffer, try_to_recv)) {
1869 foreach(
const vid_gather_pair_type& pair, buffer) {
1870 const lvid_type lvid = graph.local_vid(pair.first);
1871 const gather_type& accum = pair.second;
1872 ASSERT_TRUE(graph.l_is_master(lvid));
1873 vlocks[lvid].lock();
1874 if( has_gather_accum.get(lvid) ) {
1875 gather_accum[lvid] += accum;
1877 gather_accum[lvid] = accum;
1878 has_gather_accum.set_bit(lvid);
1880 vlocks[lvid].unlock();
1886 template<
typename VertexProgram>
1887 void semi_synchronous_engine<VertexProgram>::
1888 sync_message(
lvid_type lvid,
const size_t thread_id,
const message_type& msg) {
1889 ASSERT_FALSE(graph.l_is_master(lvid));
1890 const procid_t master = graph.l_master(lvid);
1892 message_exchange.send(master, std::make_pair(vid, msg), thread_id);
1898 template<
typename VertexProgram>
1899 void semi_synchronous_engine<VertexProgram>::
1900 recv_messages(
size_t threadid,
const bool try_to_recv) {
1901 rmi.dc().handle_incoming_calls(threadid, threads.size());
1903 typename message_exchange_type::buffer_type buffer;
1904 while(message_exchange.recv(procid, buffer, try_to_recv)) {
1905 foreach(
const vid_message_pair_type& pair, buffer) {
1906 internal_signal(graph.vertex(pair.first), pair.second);
1924 #include <graphlab/macros_undef.hpp>