25 #ifndef GRAPHLAB_SYNCHRONOUS_ENGINE_HPP
26 #define GRAPHLAB_SYNCHRONOUS_ENGINE_HPP
29 #include <boost/bind.hpp>
31 #include <graphlab/engine/iengine.hpp>
33 #include <graphlab/vertex_program/ivertex_program.hpp>
34 #include <graphlab/vertex_program/icontext.hpp>
35 #include <graphlab/vertex_program/context.hpp>
37 #include <graphlab/engine/execution_status.hpp>
38 #include <graphlab/options/graphlab_options.hpp>
43 #include <graphlab/parallel/pthread_tools.hpp>
44 #include <graphlab/parallel/atomic_add_vector.hpp>
45 #include <graphlab/util/tracepoint.hpp>
46 #include <graphlab/util/memory_info.hpp>
48 #include <graphlab/rpc/dc_dist_object.hpp>
49 #include <graphlab/rpc/distributed_event_log.hpp>
50 #include <graphlab/rpc/buffered_exchange.hpp>
57 #include <graphlab/macros_def.hpp>
206 template<
typename VertexProgram>
208 public iengine<VertexProgram> {
306 typedef typename graph_type::local_vertex_type local_vertex_type;
311 typedef typename graph_type::local_edge_type local_edge_type;
316 typedef typename graph_type::lvid_type lvid_type;
318 std::vector<double> per_thread_compute_time;
359 size_t max_iterations;
366 int snapshot_interval;
369 std::string snapshot_path;
375 size_t iteration_counter;
403 std::vector<simple_spinlock> vlocks;
413 std::vector<simple_spinlock> elocks;
421 std::vector<vertex_program_type> vertex_programs;
426 std::vector<message_type> messages;
443 std::vector<gather_type> gather_accum;
465 std::vector<gather_type> gather_cache;
483 atomic<size_t> num_active_vertices;
494 atomic<size_t> completed_applys;
501 atomic<size_t> shared_lvid_counter;
507 typedef std::pair<vertex_id_type, vertex_program_type> vid_prog_pair_type;
512 typedef buffered_exchange<vid_prog_pair_type> vprog_exchange_type;
518 vprog_exchange_type vprog_exchange;
523 typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
528 typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
534 vdata_exchange_type vdata_exchange;
539 typedef std::pair<vertex_id_type, gather_type> vid_gather_pair_type;
545 typedef buffered_exchange<vid_gather_pair_type> gather_exchange_type;
551 gather_exchange_type gather_exchange;
556 typedef std::pair<vertex_id_type, message_type> vid_message_pair_type;
561 typedef buffered_exchange<vid_message_pair_type> message_exchange_type;
566 message_exchange_type message_exchange;
575 DECLARE_EVENT(EVENT_APPLIES);
576 DECLARE_EVENT(EVENT_GATHERS);
577 DECLARE_EVENT(EVENT_SCATTERS);
578 DECLARE_EVENT(EVENT_ACTIVE_CPUS);
638 const std::string& order =
"shuffle");
642 const std::string& order =
"shuffle");
681 void internal_stop();
726 void internal_post_delta(
const vertex_type& vertex,
737 void internal_clear_gather_cache(
const vertex_type& vertex);
743 void thread_launch_wrapped_event_counter(boost::function<
void(
void)> fn) {
744 INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
746 DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
767 template<
typename MemberFunction>
768 void run_synchronous(MemberFunction member_fun) {
769 shared_lvid_counter = 0;
770 if (threads.
size() <= 1) {
771 INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
772 ( (
this)->*(member_fun))(0);
773 DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
777 for(
size_t i = 0; i < threads.
size(); ++i) {
778 boost::function<void(void)> invoke = boost::bind(member_fun,
this, i);
779 threads.
launch(boost::bind(
780 &synchronous_engine::thread_launch_wrapped_event_counter,
805 void exchange_messages(
size_t thread_id);
815 void receive_messages(
size_t thread_id);
826 void execute_gathers(
size_t thread_id);
838 void execute_applys(
size_t thread_id);
848 void execute_scatters(
size_t thread_id);
858 void sync_vertex_program(lvid_type lvid,
size_t thread_id);
868 void recv_vertex_programs(
const bool try_to_recv =
false);
877 void sync_vertex_data(lvid_type lvid,
size_t thread_id);
887 void recv_vertex_data(
const bool try_to_recv =
false);
895 void sync_gather(lvid_type lvid,
const gather_type& accum,
906 void recv_gathers(
const bool try_to_recv =
false);
914 void sync_message(lvid_type lvid,
const size_t thread_id);
923 void recv_messages(
const bool try_to_recv =
false);
971 template<
typename VertexProgram>
976 rmi(dc, this), graph(graph),
977 threads(opts.get_ncpus()),
978 thread_barrier(opts.get_ncpus()),
979 max_iterations(-1), snapshot_interval(-1), iteration_counter(0),
980 timeout(0), sched_allv(false),
981 vprog_exchange(dc, opts.get_ncpus(), 64 * 1024),
982 vdata_exchange(dc, opts.get_ncpus(), 64 * 1024),
983 gather_exchange(dc, opts.get_ncpus(), 64 * 1024),
984 message_exchange(dc, opts.get_ncpus(), 64 * 1024),
988 per_thread_compute_time.resize(opts.
get_ncpus());
989 bool use_cache =
false;
990 foreach(std::string opt, keys) {
991 if (opt ==
"max_iterations") {
994 logstream(
LOG_EMPH) <<
"Engine Option: max_iterations = "
995 << max_iterations << std::endl;
996 }
else if (opt ==
"timeout") {
999 logstream(
LOG_EMPH) <<
"Engine Option: timeout = "
1000 << timeout << std::endl;
1001 }
else if (opt ==
"use_cache") {
1004 logstream(
LOG_EMPH) <<
"Engine Option: use_cache = "
1005 << use_cache << std::endl;
1006 }
else if (opt ==
"snapshot_interval") {
1009 logstream(
LOG_EMPH) <<
"Engine Option: snapshot_interval = "
1010 << snapshot_interval << std::endl;
1011 }
else if (opt ==
"snapshot_path") {
1014 logstream(
LOG_EMPH) <<
"Engine Option: snapshot_path = "
1015 << snapshot_path << std::endl;
1016 }
else if (opt ==
"sched_allv") {
1019 logstream(
LOG_EMPH) <<
"Engine Option: sched_allv = "
1020 << sched_allv << std::endl;
1022 logstream(
LOG_FATAL) <<
"Unexpected Engine Option: " << opt << std::endl;
1026 if (snapshot_interval >= 0 && snapshot_path.length() == 0) {
1028 <<
"Snapshot interval specified, but no snapshot path" << std::endl;
1030 INITIALIZE_EVENT_LOG(dc);
1031 ADD_CUMULATIVE_EVENT(EVENT_APPLIES,
"Applies",
"Calls");
1032 ADD_CUMULATIVE_EVENT(EVENT_GATHERS ,
"Gathers",
"Calls");
1033 ADD_CUMULATIVE_EVENT(EVENT_SCATTERS ,
"Scatters",
"Calls");
1034 ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS,
"Active Threads",
"Threads");
1038 memory_info::log_usage(
"Before Engine Initialization");
1040 vlocks.resize(graph.num_local_vertices());
1041 vertex_programs.resize(graph.num_local_vertices());
1045 messages.resize(graph.num_local_vertices(),
message_type());
1046 has_message.
resize(graph.num_local_vertices());
1047 has_message.
clear();
1049 gather_accum.resize(graph.num_local_vertices(),
gather_type());
1050 has_gather_accum.
resize(graph.num_local_vertices());
1051 has_gather_accum.
clear();
1054 gather_cache.resize(graph.num_local_vertices(),
gather_type());
1055 has_cache.
resize(graph.num_local_vertices());
1059 active_superstep.
resize(graph.num_local_vertices());
1060 active_superstep.
clear();
1061 active_minorstep.
resize(graph.num_local_vertices());
1062 active_minorstep.
clear();
1064 memory_info::log_usage(
"After Engine Initialization");
1074 template<
typename VertexProgram>
1075 typename synchronous_engine<VertexProgram>::aggregator_type*
1082 template<
typename VertexProgram>
1084 for (
size_t i = 0; i < rmi.numprocs(); ++i)
1088 template<
typename VertexProgram>
1089 void synchronous_engine<VertexProgram>::rpc_stop() {
1094 template<
typename VertexProgram>
1095 void synchronous_engine<VertexProgram>::
1098 internal_signal_rpc(gvid, message);
1104 template<
typename VertexProgram>
1105 void synchronous_engine<VertexProgram>::
1106 signal_all(
const message_type& message,
const std::string& order) {
1107 for(
lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1108 if(graph.l_is_master(lvid)) {
1109 internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1115 template<
typename VertexProgram>
1116 void synchronous_engine<VertexProgram>::
1117 signal_vset(
const vertex_set& vset,
1118 const message_type& message,
const std::string& order) {
1119 for(
lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1120 if(graph.l_is_master(lvid) && vset.l_contains(lvid)) {
1121 internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1127 template<
typename VertexProgram>
1128 void synchronous_engine<VertexProgram>::
1129 internal_signal(
const vertex_type& vertex,
1130 const message_type& message) {
1131 const lvid_type lvid = vertex.local_id();
1132 vlocks[lvid].lock();
1133 if( has_message.get(lvid) ) {
1134 messages[lvid] += message;
1136 messages[lvid] = message;
1137 has_message.set_bit(lvid);
1139 vlocks[lvid].unlock();
1143 template<
typename VertexProgram>
1144 void synchronous_engine<VertexProgram>::
1145 internal_signal_broadcast(
vertex_id_type gvid,
const message_type& message) {
1146 for (
size_t i = 0; i < rmi.numprocs(); ++i) {
1147 if(i == rmi.procid()) internal_signal_rpc(gvid, message);
1148 else rmi.remote_call(i, &synchronous_engine<VertexProgram>::internal_signal_rpc,
1153 template<
typename VertexProgram>
1154 void synchronous_engine<VertexProgram>::
1156 const message_type& message) {
1157 if (graph.is_master(gvid)) {
1158 internal_signal(graph.vertex(gvid), message);
1166 template<
typename VertexProgram>
1167 void synchronous_engine<VertexProgram>::
1168 internal_post_delta(
const vertex_type& vertex,
const gather_type& delta) {
1169 const bool caching_enabled = !gather_cache.empty();
1170 if(caching_enabled) {
1171 const lvid_type lvid = vertex.local_id();
1172 vlocks[lvid].lock();
1173 if( has_cache.get(lvid) ) {
1174 gather_cache[lvid] += delta;
1181 vlocks[lvid].unlock();
1186 template<
typename VertexProgram>
1187 void synchronous_engine<VertexProgram>::
1188 internal_clear_gather_cache(
const vertex_type& vertex) {
1189 const bool caching_enabled = !gather_cache.empty();
1190 const lvid_type lvid = vertex.local_id();
1191 if(caching_enabled && has_cache.get(lvid)) {
1192 vlocks[lvid].lock();
1193 gather_cache[lvid] = gather_type();
1194 has_cache.clear_bit(lvid);
1195 vlocks[lvid].unlock();
1202 template<
typename VertexProgram>
1206 template<
typename VertexProgram>
1210 template<
typename VertexProgram>
1216 template<
typename VertexProgram>
1218 size_t allocated_memory = memory_info::allocated_bytes();
1219 rmi.all_reduce(allocated_memory);
1220 return allocated_memory;
1233 iteration_counter = 0;
1234 force_abort =
false;
1236 execution_status::UNSET;
1243 if (snapshot_interval == 0) {
1244 graph.save_binary(snapshot_path);
1247 float last_print = -5;
1248 if (rmi.procid() == 0) {
1249 logstream(
LOG_EMPH) <<
"Iteration counter will only output every 5 seconds."
1253 while(iteration_counter < max_iterations && !force_abort ) {
1256 if(timeout != 0 && timeout < elapsed_seconds()) {
1261 bool print_this_round = (elapsed_seconds() - last_print) >= 5;
1263 if(rmi.procid() == 0 && print_this_round) {
1265 << rmi.procid() <<
": Starting iteration: " << iteration_counter
1267 last_print = elapsed_seconds();
1272 active_superstep.clear(); active_minorstep.clear();
1273 has_gather_accum.clear();
1279 run_synchronous( &synchronous_engine::exchange_messages );
1291 num_active_vertices = 0;
1292 run_synchronous( &synchronous_engine::receive_messages );
1294 active_minorstep.fill();
1296 has_message.clear();
1310 size_t total_active_vertices = num_active_vertices;
1311 rmi.all_reduce(total_active_vertices);
1312 if (rmi.procid() == 0 && print_this_round)
1314 <<
"\tActive vertices: " << total_active_vertices << std::endl;
1315 if(total_active_vertices == 0 ) {
1325 run_synchronous( &synchronous_engine::execute_gathers );
1329 active_minorstep.clear();
1341 run_synchronous( &synchronous_engine::execute_applys );
1356 run_synchronous( &synchronous_engine::execute_scatters );
1361 if(rmi.procid() == 0 && print_this_round)
1362 logstream(
LOG_EMPH) <<
"\t Running Aggregators" << std::endl;
1364 aggregator.tick_synchronous();
1366 ++iteration_counter;
1368 if (snapshot_interval > 0 && iteration_counter % snapshot_interval == 0) {
1369 graph.save_binary(snapshot_path);
1373 if (rmi.procid() == 0) {
1374 logstream(
LOG_EMPH) << iteration_counter
1375 <<
" iterations completed." << std::endl;
1378 double total_compute_time = 0;
1379 for (
size_t i = 0;i < per_thread_compute_time.size(); ++i) {
1380 total_compute_time += per_thread_compute_time[i];
1382 std::vector<double> all_compute_time_vec(rmi.numprocs());
1383 all_compute_time_vec[rmi.procid()] = total_compute_time;
1384 rmi.all_gather(all_compute_time_vec);
1386 size_t global_completed = completed_applys;
1387 rmi.all_reduce(global_completed);
1388 completed_applys = global_completed;
1389 rmi.cout() <<
"Updates: " << completed_applys.value <<
"\n";
1390 if (rmi.procid() == 0) {
1391 logstream(
LOG_INFO) <<
"Compute Balance: ";
1392 for (
size_t i = 0;i < all_compute_time_vec.size(); ++i) {
1393 logstream(
LOG_INFO) << all_compute_time_vec[i] <<
" ";
1401 return termination_reason;
1406 template<
typename VertexProgram>
1409 context_type
context(*
this, graph);
1410 const bool TRY_TO_RECV =
true;
1411 const size_t TRY_RECV_MOD = 100;
1419 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1420 if (lvid_block_start >= graph.num_local_vertices())
break;
1422 size_t lvid_bit_block = has_message.
containing_word(lvid_block_start);
1423 if (lvid_bit_block == 0)
continue;
1425 local_bitset.
clear();
1427 foreach(
size_t lvid_block_offset, local_bitset) {
1428 lvid_type lvid = lvid_block_start + lvid_block_offset;
1429 if (lvid >= graph.num_local_vertices())
break;
1432 if(!graph.l_is_master(lvid)) {
1433 sync_message(lvid, thread_id);
1434 has_message.clear_bit(lvid);
1436 messages[lvid] = message_type();
1438 if(++vcount % TRY_RECV_MOD == 0) recv_messages(TRY_TO_RECV);
1441 message_exchange.partial_flush(thread_id);
1443 thread_barrier.wait();
1444 if(thread_id == 0) message_exchange.flush();
1445 thread_barrier.wait();
1451 template<
typename VertexProgram>
1452 void synchronous_engine<VertexProgram>::
1453 receive_messages(
const size_t thread_id) {
1454 context_type context(*
this, graph);
1455 const bool TRY_TO_RECV =
true;
1456 const size_t TRY_RECV_MOD = 100;
1458 size_t nactive_inc = 0;
1459 fixed_dense_bitset<sizeof(size_t)> local_bitset;
1463 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1464 if (lvid_block_start >= graph.num_local_vertices())
break;
1466 size_t lvid_bit_block = has_message.containing_word(lvid_block_start);
1467 if (lvid_bit_block == 0)
continue;
1469 local_bitset.clear();
1470 local_bitset.initialize_from_mem(&lvid_bit_block,
sizeof(
size_t));
1472 foreach(
size_t lvid_block_offset, local_bitset) {
1473 lvid_type lvid = lvid_block_start + lvid_block_offset;
1474 if (lvid >= graph.num_local_vertices())
break;
1477 if(graph.l_is_master(lvid)) {
1479 active_superstep.set_bit(lvid);
1482 vertex_type vertex = vertex_type(graph.l_vertex(lvid));
1483 vertex_programs[lvid].init(context, vertex, messages[lvid]);
1485 messages[lvid] = message_type();
1486 if (sched_allv)
continue;
1488 const vertex_program_type& const_vprog = vertex_programs[lvid];
1489 const vertex_type const_vertex = vertex;
1490 if(const_vprog.gather_edges(context, const_vertex) !=
1492 active_minorstep.set_bit(lvid);
1493 sync_vertex_program(lvid, thread_id);
1496 if(++vcount % TRY_RECV_MOD == 0) recv_vertex_programs(TRY_TO_RECV);
1500 num_active_vertices += nactive_inc;
1501 vprog_exchange.partial_flush(thread_id);
1504 thread_barrier.wait();
1505 if(thread_id == 0) {
1506 vprog_exchange.flush();
1508 thread_barrier.wait();
1510 recv_vertex_programs();
1515 template<
typename VertexProgram>
1516 void synchronous_engine<VertexProgram>::
1517 execute_gathers(
const size_t thread_id) {
1518 context_type context(*
this, graph);
1519 const bool TRY_TO_RECV =
true;
1520 const size_t TRY_RECV_MOD = 1000;
1522 const bool caching_enabled = !gather_cache.empty();
1527 fixed_dense_bitset<sizeof(size_t)> local_bitset;
1531 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1532 if (lvid_block_start >= graph.num_local_vertices())
break;
1534 size_t lvid_bit_block = active_minorstep.containing_word(lvid_block_start);
1535 if (lvid_bit_block == 0)
continue;
1537 local_bitset.clear();
1538 local_bitset.initialize_from_mem(&lvid_bit_block,
sizeof(
size_t));
1540 foreach(
size_t lvid_block_offset, local_bitset) {
1541 lvid_type lvid = lvid_block_start + lvid_block_offset;
1542 if (lvid >= graph.num_local_vertices())
break;
1544 bool accum_is_set =
false;
1545 gather_type accum = gather_type();
1548 if( caching_enabled && has_cache.get(lvid) ) {
1549 accum = gather_cache[lvid];
1550 accum_is_set =
true;
1553 const vertex_program_type& vprog = vertex_programs[lvid];
1554 local_vertex_type local_vertex = graph.l_vertex(lvid);
1555 const vertex_type vertex(local_vertex);
1556 const edge_dir_type gather_dir = vprog.gather_edges(context, vertex);
1558 size_t edges_touched = 0;
1559 vprog.pre_local_gather(accum);
1561 foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1562 edge_type edge(local_edge);
1565 accum += vprog.gather(context, vertex, edge);
1567 accum = vprog.gather(context, vertex, edge);
1568 accum_is_set =
true;
1576 foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1577 edge_type edge(local_edge);
1580 accum += vprog.gather(context, vertex, edge);
1582 accum = vprog.gather(context, vertex, edge);
1583 accum_is_set =
true;
1588 INCREMENT_EVENT(EVENT_GATHERS, edges_touched);
1590 vprog.post_local_gather(accum);
1595 if(caching_enabled && accum_is_set) {
1596 gather_cache[lvid] = accum; has_cache.set_bit(lvid);
1601 if(accum_is_set) sync_gather(lvid, accum, thread_id);
1602 if(!graph.l_is_master(lvid)) {
1604 vertex_programs[lvid] = vertex_program_type();
1608 if(++vcount % TRY_RECV_MOD == 0) recv_gathers(TRY_TO_RECV);
1611 per_thread_compute_time[thread_id] += ti.current_time();
1612 gather_exchange.partial_flush(thread_id);
1614 thread_barrier.wait();
1615 if(thread_id == 0) gather_exchange.flush();
1616 thread_barrier.wait();
1621 template<
typename VertexProgram>
1622 void synchronous_engine<VertexProgram>::
1623 execute_applys(
const size_t thread_id) {
1624 context_type context(*
this, graph);
1625 const bool TRY_TO_RECV =
true;
1626 const size_t TRY_RECV_MOD = 1000;
1632 fixed_dense_bitset<sizeof(size_t)> local_bitset;
1636 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1637 if (lvid_block_start >= graph.num_local_vertices())
break;
1639 size_t lvid_bit_block = active_superstep.containing_word(lvid_block_start);
1640 if (lvid_bit_block == 0)
continue;
1642 local_bitset.clear();
1643 local_bitset.initialize_from_mem(&lvid_bit_block,
sizeof(
size_t));
1644 foreach(
size_t lvid_block_offset, local_bitset) {
1645 lvid_type lvid = lvid_block_start + lvid_block_offset;
1646 if (lvid >= graph.num_local_vertices())
break;
1649 ASSERT_TRUE(graph.l_is_master(lvid));
1650 vertex_type vertex(graph.l_vertex(lvid));
1653 const gather_type& accum = gather_accum[lvid];
1654 INCREMENT_EVENT(EVENT_APPLIES, 1);
1655 vertex_programs[lvid].apply(context, vertex, accum);
1659 gather_accum[lvid] = gather_type();
1661 sync_vertex_data(lvid, thread_id);
1663 const vertex_program_type& const_vprog = vertex_programs[lvid];
1664 const vertex_type const_vertex = vertex;
1665 if(const_vprog.scatter_edges(context, const_vertex) !=
1667 active_minorstep.set_bit(lvid);
1668 sync_vertex_program(lvid, thread_id);
1670 vertex_programs[lvid] = vertex_program_type();
1673 if(++vcount % TRY_RECV_MOD == 0) {
1674 recv_vertex_programs(TRY_TO_RECV);
1675 recv_vertex_data(TRY_TO_RECV);
1680 per_thread_compute_time[thread_id] += ti.current_time();
1681 vprog_exchange.partial_flush(thread_id);
1682 vdata_exchange.partial_flush(thread_id);
1684 thread_barrier.wait();
1685 if(thread_id == 0) { vprog_exchange.flush(); vdata_exchange.flush(); }
1686 thread_barrier.wait();
1687 recv_vertex_programs();
1695 template<
typename VertexProgram>
1696 void synchronous_engine<VertexProgram>::
1697 execute_scatters(
const size_t thread_id) {
1698 context_type context(*
this, graph);
1702 fixed_dense_bitset<sizeof(size_t)> local_bitset;
1706 shared_lvid_counter.inc_ret_last(8 *
sizeof(
size_t));
1707 if (lvid_block_start >= graph.num_local_vertices())
break;
1709 size_t lvid_bit_block = active_minorstep.containing_word(lvid_block_start);
1710 if (lvid_bit_block == 0)
continue;
1712 local_bitset.clear();
1713 local_bitset.initialize_from_mem(&lvid_bit_block,
sizeof(
size_t));
1714 foreach(
size_t lvid_block_offset, local_bitset) {
1715 lvid_type lvid = lvid_block_start + lvid_block_offset;
1716 if (lvid >= graph.num_local_vertices())
break;
1718 const vertex_program_type& vprog = vertex_programs[lvid];
1719 local_vertex_type local_vertex = graph.l_vertex(lvid);
1720 const vertex_type vertex(local_vertex);
1721 const edge_dir_type scatter_dir = vprog.scatter_edges(context, vertex);
1722 size_t edges_touched = 0;
1725 foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1726 edge_type edge(local_edge);
1728 vprog.scatter(context, vertex, edge);
1735 foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1736 edge_type edge(local_edge);
1738 vprog.scatter(context, vertex, edge);
1743 INCREMENT_EVENT(EVENT_SCATTERS, edges_touched);
1745 vertex_programs[lvid] = vertex_program_type();
1749 per_thread_compute_time[thread_id] += ti.current_time();
1755 template<
typename VertexProgram>
1756 void synchronous_engine<VertexProgram>::
1757 sync_vertex_program(
lvid_type lvid,
const size_t thread_id) {
1758 ASSERT_TRUE(graph.l_is_master(lvid));
1760 local_vertex_type vertex = graph.l_vertex(lvid);
1761 foreach(
const procid_t& mirror, vertex.mirrors()) {
1762 vprog_exchange.send(mirror,
1763 std::make_pair(vid, vertex_programs[lvid]),
1770 template<
typename VertexProgram>
1771 void synchronous_engine<VertexProgram>::
1772 recv_vertex_programs(
const bool try_to_recv) {
1774 typename vprog_exchange_type::buffer_type buffer;
1775 while(vprog_exchange.recv(procid, buffer, try_to_recv)) {
1776 foreach(
const vid_prog_pair_type& pair, buffer) {
1777 const lvid_type lvid = graph.local_vid(pair.first);
1779 vertex_programs[lvid] = pair.second;
1780 active_minorstep.set_bit(lvid);
1786 template<
typename VertexProgram>
1787 void synchronous_engine<VertexProgram>::
1788 sync_vertex_data(
lvid_type lvid,
const size_t thread_id) {
1789 ASSERT_TRUE(graph.l_is_master(lvid));
1791 local_vertex_type vertex = graph.l_vertex(lvid);
1792 foreach(
const procid_t& mirror, vertex.mirrors()) {
1793 vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()), thread_id);
1801 template<
typename VertexProgram>
1802 void synchronous_engine<VertexProgram>::
1803 recv_vertex_data(
bool try_to_recv) {
1805 typename vdata_exchange_type::buffer_type buffer;
1806 while(vdata_exchange.recv(procid, buffer, try_to_recv)) {
1807 foreach(
const vid_vdata_pair_type& pair, buffer) {
1808 const lvid_type lvid = graph.local_vid(pair.first);
1809 ASSERT_FALSE(graph.l_is_master(lvid));
1810 graph.l_vertex(lvid).data() = pair.second;
1816 template<
typename VertexProgram>
1817 void synchronous_engine<VertexProgram>::
1818 sync_gather(
lvid_type lvid,
const gather_type& accum,
const size_t thread_id) {
1819 if(graph.l_is_master(lvid)) {
1820 vlocks[lvid].lock();
1821 if(has_gather_accum.get(lvid)) {
1822 gather_accum[lvid] += accum;
1824 gather_accum[lvid] = accum;
1825 has_gather_accum.set_bit(lvid);
1827 vlocks[lvid].unlock();
1829 const procid_t master = graph.l_master(lvid);
1831 gather_exchange.send(master, std::make_pair(vid, accum), thread_id);
1835 template<
typename VertexProgram>
1836 void synchronous_engine<VertexProgram>::
1837 recv_gathers(
const bool try_to_recv) {
1839 typename gather_exchange_type::buffer_type buffer;
1840 while(gather_exchange.recv(procid, buffer, try_to_recv)) {
1841 foreach(
const vid_gather_pair_type& pair, buffer) {
1842 const lvid_type lvid = graph.local_vid(pair.first);
1843 const gather_type& accum = pair.second;
1844 ASSERT_TRUE(graph.l_is_master(lvid));
1845 vlocks[lvid].lock();
1846 if( has_gather_accum.get(lvid) ) {
1847 gather_accum[lvid] += accum;
1849 gather_accum[lvid] = accum;
1850 has_gather_accum.set_bit(lvid);
1852 vlocks[lvid].unlock();
1858 template<
typename VertexProgram>
1859 void synchronous_engine<VertexProgram>::
1860 sync_message(
lvid_type lvid,
const size_t thread_id) {
1861 ASSERT_FALSE(graph.l_is_master(lvid));
1862 const procid_t master = graph.l_master(lvid);
1864 message_exchange.send(master, std::make_pair(vid, messages[lvid]), thread_id);
1870 template<
typename VertexProgram>
1871 void synchronous_engine<VertexProgram>::
1872 recv_messages(
const bool try_to_recv) {
1874 typename message_exchange_type::buffer_type buffer;
1875 while(message_exchange.recv(procid, buffer, try_to_recv)) {
1876 foreach(
const vid_message_pair_type& pair, buffer) {
1877 const lvid_type lvid = graph.local_vid(pair.first);
1878 ASSERT_TRUE(graph.l_is_master(lvid));
1879 vlocks[lvid].lock();
1880 if( has_message.get(lvid) ) {
1881 messages[lvid] += pair.second;
1883 messages[lvid] = pair.second;
1884 has_message.set_bit(lvid);
1886 vlocks[lvid].unlock();
1904 #include <graphlab/macros_undef.hpp>