24 #ifndef GRAPHLAB_DISTRIBUTED_AGGREGATOR
25 #define GRAPHLAB_DISTRIBUTED_AGGREGATOR
35 #include <graphlab/rpc/dc_dist_object.hpp>
36 #include <graphlab/vertex_program/icontext.hpp>
37 #include <graphlab/graph/distributed_graph.hpp>
38 #include <graphlab/util/generics/conditional_addition_wrapper.hpp>
39 #include <graphlab/util/generics/test_function_or_functor_type.hpp>
41 #include <graphlab/util/generics/any.hpp>
42 #include <graphlab/util/timer.hpp>
43 #include <graphlab/util/mutable_queue.hpp>
44 #include <graphlab/logger/assertions.hpp>
45 #include <graphlab/parallel/pthread_tools.hpp>
46 #include <graphlab/macros_def.hpp>
79 template<
typename Graph,
typename IContext>
80 class distributed_aggregator {
83 typedef typename graph_type::local_edge_list_type local_edge_list_type;
84 typedef typename graph_type::local_edge_type local_edge_type;
86 typedef typename graph_type::local_vertex_type local_vertex_type;
88 typedef IContext icontext_type;
102 struct imap_reduce_base {
105 virtual imap_reduce_base* clone_empty()
const = 0;
109 virtual void perform_map_vertex(icontext_type&, vertex_type&) = 0;
113 virtual void perform_map_edge(icontext_type&, edge_type&) = 0;
117 virtual bool is_vertex_map()
const = 0;
121 virtual any get_accumulator()
const = 0;
126 virtual void add_accumulator_any(
any& other) = 0;
131 virtual void set_accumulator_any(
any& other) = 0;
137 virtual void add_accumulator(imap_reduce_base* other) = 0;
140 virtual void clear_accumulator() = 0;
143 virtual void finalize(icontext_type&) = 0;
145 virtual ~imap_reduce_base() { }
148 template <
typename ReductionType>
149 struct default_map_types{
150 typedef ReductionType (*vertex_map_type)(icontext_type&,
const vertex_type&);
151 typedef ReductionType (*edge_map_type)(icontext_type&,
const edge_type&);
160 template <
typename ReductionType,
161 typename VertexMapperType,
162 typename EdgeMapperType,
163 typename FinalizerType>
164 struct map_reduce_type :
public imap_reduce_base {
165 conditional_addition_wrapper<ReductionType> acc;
166 VertexMapperType map_vtx_function;
167 EdgeMapperType map_edge_function;
168 FinalizerType finalize_function;
176 map_reduce_type(VertexMapperType map_vtx_function,
177 FinalizerType finalize_function)
178 : map_vtx_function(map_vtx_function),
179 finalize_function(finalize_function), vertex_map(
true) { }
185 map_reduce_type(EdgeMapperType map_edge_function,
186 FinalizerType finalize_function,
188 : map_edge_function(map_edge_function),
189 finalize_function(finalize_function), vertex_map(
false) { }
192 void perform_map_vertex(icontext_type&
context, vertex_type& vertex) {
203 ReductionType temp = map_vtx_function(context, vertex);
215 void perform_map_edge(icontext_type& context, edge_type& edge) {
226 ReductionType temp = map_edge_function(context, edge);
238 bool is_vertex_map()
const {
242 any get_accumulator()
const {
246 void add_accumulator_any(
any& other) {
248 acc += other.
as<conditional_addition_wrapper<ReductionType> >();
252 void set_accumulator_any(
any& other) {
254 acc = other.
as<conditional_addition_wrapper<ReductionType> >();
259 void add_accumulator(imap_reduce_base* other) {
261 acc +=
dynamic_cast<map_reduce_type*
>(other)->acc;
265 void clear_accumulator() {
269 void finalize(icontext_type& context) {
270 finalize_function(context, acc.value);
273 imap_reduce_base* clone_empty()
const {
274 map_reduce_type* copy;
275 if (is_vertex_map()) {
276 copy =
new map_reduce_type(map_vtx_function,
280 copy =
new map_reduce_type(map_edge_function,
289 std::map<std::string, imap_reduce_base*> aggregators;
290 std::map<std::string, float> aggregate_period;
292 struct async_aggregator_state {
295 imap_reduce_base* root_reducer;
297 std::vector<imap_reduce_base*> per_thread_aggregation;
299 atomic<int> local_count_down;
301 atomic<int> distributed_count_down;
303 std::map<std::string, async_aggregator_state> async_state;
314 template <
typename ReductionType,
typename F>
315 static void test_vertex_mapper_type(std::string key =
"") {
317 ReductionType(icontext_type&,
321 const vertex_type&>::value;
323 std::stringstream strm;
326 strm <<
"Vertex Map Function does not pass strict runtime type checks. \n";
329 strm <<
"Map Function in Vertex Aggregator " << key
330 <<
" does not pass strict runtime type checks. \n";
332 if (boost::is_function<
typename boost::remove_pointer<F>::type>::value) {
334 <<
"Function prototype should be \n"
335 <<
"\t ReductionType f(icontext_type&, const vertex_type&)\n";
338 strm <<
"Functor's operator() prototype should be \n"
339 <<
"\t ReductionType operator()(icontext_type&, const vertex_type&) const\n";
341 strm <<
"If you are not intentionally violating the abstraction,"
342 <<
" we recommend fixing your function for safety reasons";
348 template <
typename ReductionType,
typename F>
349 static void test_edge_mapper_type(std::string key =
"") {
351 ReductionType(icontext_type&,
355 const edge_type&>::value;
358 std::stringstream strm;
361 strm <<
"Edge Map Function does not pass strict runtime type checks. \n";
364 strm <<
"Map Function in Edge Aggregator " << key
365 <<
" does not pass strict runtime type checks. \n";
367 if (boost::is_function<
typename boost::remove_pointer<F>::type>::value) {
368 strm <<
"Function prototype should be \n"
369 <<
"\t ReductionType f(icontext_type&, const edge_type&)\n";
372 strm <<
"Functor's operator() prototype should be "
373 <<
"\t ReductionType operator()(icontext_type&, const edge_type&) const\n";
375 strm <<
"If you are not intentionally violating the abstraction,"
376 <<
" we recommend fixing your function for safety reasons";
387 rmi(dc,
this), graph(graph),
388 context(context), ncpus(0) { }
393 template <
typename ReductionType,
394 typename VertexMapperType,
395 typename FinalizerType>
396 bool add_vertex_aggregator(
const std::string& key,
397 VertexMapperType map_function,
398 FinalizerType finalize_function) {
399 if (key.length() == 0)
return false;
400 if (aggregators.count(key) == 0) {
402 if (rmi.procid() == 0) {
404 test_vertex_mapper_type<ReductionType, VertexMapperType>(key);
407 aggregators[key] =
new map_reduce_type<ReductionType,
409 typename default_map_types<ReductionType>::edge_map_type,
410 FinalizerType>(map_function,
420 #if defined(__cplusplus) && __cplusplus >= 201103L
429 template <
typename VertexMapperType,
430 typename FinalizerType>
431 bool add_vertex_aggregator(
const std::string& key,
432 VertexMapperType map_function,
433 FinalizerType finalize_function) {
435 typedef decltype(map_function(*context, graph.
vertex(0))) ReductionType;
436 if (key.length() == 0)
return false;
437 if (aggregators.count(key) == 0) {
438 aggregators[key] =
new map_reduce_type<ReductionType,
440 typename default_map_types<ReductionType>::edge_map_type,
441 FinalizerType>(map_function,
455 template <
typename ReductionType,
456 typename EdgeMapperType,
457 typename FinalizerType>
458 bool add_edge_aggregator(
const std::string& key,
459 EdgeMapperType map_function,
460 FinalizerType finalize_function) {
461 if (key.length() == 0)
return false;
462 if (aggregators.count(key) == 0) {
463 if (rmi.procid() == 0) {
465 test_edge_mapper_type<ReductionType, EdgeMapperType>(key);
467 aggregators[key] =
new map_reduce_type<ReductionType,
468 typename default_map_types<ReductionType>::vertex_map_type,
470 FinalizerType>(map_function,
481 #if defined(__cplusplus) && __cplusplus >= 201103L
491 template <
typename EdgeMapperType,
492 typename FinalizerType>
493 bool add_edge_aggregator(
const std::string& key,
494 EdgeMapperType map_function,
495 FinalizerType finalize_function) {
497 typedef decltype(map_function(*context, edge_type(graph.l_vertex(0).in_edges()[0]) )) ReductionType;
498 if (key.length() == 0)
return false;
499 if (aggregators.count(key) == 0) {
500 aggregators[key] =
new map_reduce_type<ReductionType,
501 typename default_map_types<ReductionType>::vertex_map_type,
503 FinalizerType>(map_function,
518 bool aggregate_now(
const std::string& key) {
519 ASSERT_MSG(graph.
is_finalized(),
"Graph must be finalized");
520 if (aggregators.count(key) == 0) {
521 ASSERT_MSG(
false,
"Requested aggregator %s not found", key.c_str());
525 imap_reduce_base* mr = aggregators[key];
526 mr->clear_accumulator();
532 imap_reduce_base* localmr = mr->clone_empty();
533 if (localmr->is_vertex_map()) {
537 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
538 local_vertex_type lvertex = graph.l_vertex(i);
539 if (lvertex.owner() == rmi.procid()) {
540 vertex_type vertex(lvertex);
541 localmr->perform_map_vertex(*context, vertex);
549 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
550 foreach(local_edge_type e, graph.l_vertex(i).in_edges()) {
552 localmr->perform_map_edge(*context, edge);
560 mr->add_accumulator(localmr);
565 std::vector<any> gathervec(rmi.numprocs());
566 gathervec[rmi.procid()] = mr->get_accumulator();
568 rmi.gather(gathervec, 0);
570 if (rmi.procid() == 0) {
573 for (
procid_t i = 1; i < rmi.numprocs(); ++i) {
574 mr->add_accumulator_any(gathervec[i]);
576 any val = mr->get_accumulator();
577 rmi.broadcast(val,
true);
582 rmi.broadcast(val,
false);
583 mr->set_accumulator_any(val);
585 mr->finalize(*context);
586 mr->clear_accumulator();
595 bool aggregate_periodic(
const std::string& key,
float seconds) {
597 if (seconds < 0)
return false;
598 if (aggregators.count(key) == 0)
return false;
599 else aggregate_period[key] = seconds;
608 void aggregate_all_periodic() {
609 typename std::map<std::string, float>::iterator iter =
610 aggregate_period.begin();
611 while (iter != aggregate_period.end()) {
612 aggregate_now(iter->first);
627 void start(
size_t ncpus = 0) {
631 typename std::map<std::string, float>::iterator iter =
632 aggregate_period.begin();
633 while (iter != aggregate_period.end()) {
636 schedule.push(iter->first, -iter->second);
643 iter = aggregate_period.begin();
644 while (iter != aggregate_period.end()) {
645 async_state[iter->first].local_count_down = (int)ncpus;
646 async_state[iter->first].distributed_count_down =
649 async_state[iter->first].per_thread_aggregation.resize(ncpus);
650 for (
size_t i = 0; i < ncpus; ++i) {
651 async_state[iter->first].per_thread_aggregation[i] =
652 aggregators[iter->first]->clone_empty();
654 async_state[iter->first].root_reducer =
655 aggregators[iter->first]->clone_empty();
675 std::string tick_asynchronous() {
677 if (!schedule_lock.try_lock())
return "";
682 bool has_entry =
false;
683 if (!schedule.empty() && -schedule.top().second <= curtime) {
684 key = schedule.top().first;
688 schedule_lock.unlock();
691 if (has_entry ==
false)
return "";
703 void tick_asynchronous_compute(
size_t cpuid,
const std::string& key) {
705 typename std::map<std::string, async_aggregator_state>::iterator iter =
706 async_state.find(key);
707 ASSERT_MSG(iter != async_state.end(),
"Key %s not found", key.c_str());
708 ASSERT_GT(iter->second.per_thread_aggregation.size(), cpuid);
710 imap_reduce_base* localmr = iter->second.per_thread_aggregation[cpuid];
712 if (localmr->is_vertex_map()) {
713 for (
int i = cpuid;i < (int)graph.num_local_vertices(); i+=ncpus) {
714 local_vertex_type lvertex = graph.l_vertex(i);
715 if (lvertex.owner() == rmi.procid()) {
716 vertex_type vertex(lvertex);
717 localmr->perform_map_vertex(*context, vertex);
721 for (
int i = cpuid;i < (int)graph.num_local_vertices(); i+=ncpus) {
722 foreach(local_edge_type e, graph.l_vertex(i).in_edges()) {
724 localmr->perform_map_edge(*context, edge);
728 iter->second.root_reducer->add_accumulator(localmr);
729 int countdown_val = iter->second.local_count_down.dec();
731 ASSERT_LT(countdown_val, ncpus);
732 ASSERT_GE(countdown_val, 0);
733 if (countdown_val == 0) {
740 i < iter->second.per_thread_aggregation.size(); ++i) {
741 iter->second.per_thread_aggregation[i]->clear_accumulator();
743 iter->second.local_count_down = ncpus;
745 if (rmi.procid() != 0) {
748 any acc = iter->second.root_reducer->get_accumulator();
749 iter->second.root_reducer->clear_accumulator();
750 rmi.remote_call(0, &distributed_aggregator::rpc_key_merge,
754 decrement_distributed_counter(key);
764 void rpc_key_merge(
const std::string& key,
any& acc) {
766 typename std::map<std::string, async_aggregator_state>::iterator iter =
767 async_state.find(key);
768 ASSERT_MSG(iter != async_state.end(),
"Key %s not found", key.c_str());
769 iter->second.root_reducer->add_accumulator_any(acc);
770 decrement_distributed_counter(key);
779 void decrement_distributed_counter(
const std::string& key) {
781 ASSERT_EQ(rmi.procid(), 0);
783 typename std::map<std::string, async_aggregator_state>::iterator iter =
784 async_state.find(key);
785 ASSERT_MSG(iter != async_state.end(),
"Key %s not found", key.c_str());
786 int countdown_val = iter->second.distributed_count_down.dec();
787 logstream(
LOG_INFO) <<
"Distributed Aggregation of " << key <<
". "
788 << countdown_val <<
" remaining." << std::endl;
790 ASSERT_LE(countdown_val, rmi.numprocs());
791 ASSERT_GE(countdown_val, 0);
792 if (countdown_val == 0) {
793 logstream(
LOG_INFO) <<
"Aggregate completion of " << key << std::endl;
794 any acc_val = iter->second.root_reducer->get_accumulator();
797 iter->second.distributed_count_down = rmi.numprocs();
798 for (
procid_t i = 1;i < rmi.numprocs(); ++i) {
799 rmi.remote_call(i, &distributed_aggregator::rpc_perform_finalize,
802 iter->second.root_reducer->finalize(*context);
803 iter->second.root_reducer->clear_accumulator();
804 decrement_finalize_counter(key);
812 void rpc_perform_finalize(
const std::string& key,
any& acc_val) {
813 ASSERT_NE(rmi.procid(), 0);
814 typename std::map<std::string, async_aggregator_state>::iterator iter =
815 async_state.find(key);
816 ASSERT_MSG(iter != async_state.end(),
"Key %s not found", key.c_str());
818 iter->second.root_reducer->set_accumulator_any(acc_val);
819 iter->second.root_reducer->finalize(*context);
820 iter->second.root_reducer->clear_accumulator();
822 rmi.remote_call(0, &distributed_aggregator::decrement_finalize_counter,
827 void decrement_finalize_counter(
const std::string& key) {
828 typename std::map<std::string, async_aggregator_state>::iterator iter =
829 async_state.find(key);
830 ASSERT_MSG(iter != async_state.end(),
"Key %s not found", key.c_str());
831 int countdown_val = iter->second.distributed_count_down.dec();
832 if (countdown_val == 0) {
835 iter->second.distributed_count_down = rmi.numprocs();
839 aggregate_period[key] - start_time;
840 logstream(
LOG_INFO) << rmi.procid() <<
"Reschedule of " << key
841 <<
" at " << next_time << std::endl;
842 rpc_schedule_key(key, next_time);
843 for (
procid_t i = 1;i < rmi.numprocs(); ++i) {
844 rmi.remote_call(i, &distributed_aggregator::rpc_schedule_key,
853 void rpc_schedule_key(
const std::string& key,
float next_time) {
854 schedule_lock.lock();
855 schedule.push(key, -next_time);
856 schedule_lock.unlock();
867 void tick_synchronous() {
870 rmi.broadcast(curtime, rmi.procid() == 0);
874 std::vector<std::pair<std::string, float> > next_schedule;
875 while(!schedule.empty() && -schedule.top().second <= curtime) {
876 std::string key = schedule.top().first;
882 aggregate_period[key] - start_time);
883 rmi.broadcast(next_time, rmi.procid() == 0);
884 next_schedule.push_back(std::make_pair(key, -next_time));
887 for (
size_t i = 0;i < next_schedule.size(); ++i) {
888 schedule.push(next_schedule[i].first, next_schedule[i].second);
900 typename std::map<std::string, imap_reduce_base*>::iterator iter =
902 while (iter != aggregators.end()) {
903 iter->second->clear_accumulator();
909 typename std::map<std::string, async_aggregator_state>::iterator
910 iter = async_state.begin();
911 while (iter != async_state.end()) {
912 delete iter->second.root_reducer;
914 i < iter->second.per_thread_aggregation.size();
916 delete iter->second.per_thread_aggregation[i];
918 iter->second.per_thread_aggregation.clear();
926 std::set<std::string> get_all_periodic_keys()
const {
927 typename std::map<std::string, float>::const_iterator iter =
928 aggregate_period.begin();
929 std::set<std::string> ret;
930 while (iter != aggregate_period.end()) {
931 ret.insert(iter->first);
940 template <
typename ResultType,
typename MapFunctionType>
941 ResultType map_reduce_vertices(MapFunctionType mapfunction) {
942 ASSERT_MSG(graph.
is_finalized(),
"Graph must be finalized");
944 if (rmi.procid() == 0) {
946 test_vertex_mapper_type<ResultType, MapFunctionType>();
950 bool global_result_set =
false;
951 ResultType global_result = ResultType();
956 bool result_set =
false;
957 ResultType result = ResultType();
961 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
962 if (graph.l_vertex(i).owner() == rmi.procid()) {
964 vertex_type vtx(graph.l_vertex(i));
965 result = mapfunction(*context, vtx);
968 else if (result_set){
969 vertex_type vtx(graph.l_vertex(i));
970 result += mapfunction(*context, vtx);
979 if (!global_result_set) {
980 global_result = result;
981 global_result_set =
true;
984 global_result += result;
989 conditional_addition_wrapper<ResultType> wrapper(global_result, global_result_set);
990 rmi.all_reduce(wrapper);
991 return wrapper.value;
996 template <
typename ResultType,
typename MapFunctionType>
997 ResultType map_reduce_edges(MapFunctionType mapfunction) {
998 ASSERT_MSG(graph.
is_finalized(),
"Graph must be finalized");
1000 if (rmi.procid() == 0) {
1002 test_edge_mapper_type<ResultType, MapFunctionType>();
1006 bool global_result_set =
false;
1007 ResultType global_result = ResultType();
1009 #pragma omp parallel
1012 bool result_set =
false;
1013 ResultType result = ResultType();
1017 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1018 foreach(
const local_edge_type& e, graph.l_vertex(i).in_edges()) {
1021 result = mapfunction(*context, edge);
1024 else if (result_set){
1026 result += mapfunction(*context, edge);
1031 #pragma omp critical
1035 if (!global_result_set) {
1036 global_result = result;
1037 global_result_set =
true;
1040 global_result += result;
1046 conditional_addition_wrapper<ResultType> wrapper(global_result, global_result_set);
1047 rmi.all_reduce(wrapper);
1048 return wrapper.value;
1051 template <
typename TransformType>
1052 void transform_vertices(TransformType transform_functor) {
1053 ASSERT_MSG(graph.
is_finalized(),
"Graph must be finalized");
1056 #pragma omp parallel for
1058 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1059 if (graph.l_vertex(i).owner() == rmi.procid()) {
1060 vertex_type vtx(graph.l_vertex(i));
1061 transform_functor(*context, vtx);
1065 graph.synchronize();
1069 template <
typename TransformType>
1070 void transform_edges(TransformType transform_functor) {
1071 ASSERT_MSG(graph.
is_finalized(),
"Graph must be finalized");
1074 #pragma omp parallel for
1076 for (
int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1077 foreach(
const local_edge_type& e, graph.l_vertex(i).in_edges()) {
1079 transform_functor(*context, edge);
1089 ~distributed_aggregator() {
1096 #include <graphlab/macros_undef.hpp>