GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_aggregator.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_DISTRIBUTED_AGGREGATOR
25 #define GRAPHLAB_DISTRIBUTED_AGGREGATOR
26 
27 #ifndef __NO_OPENMP__
28 #include <omp.h>
29 #endif
30 
31 #include <map>
32 #include <set>
33 #include <string>
34 #include <vector>
35 #include <graphlab/rpc/dc_dist_object.hpp>
36 #include <graphlab/vertex_program/icontext.hpp>
37 #include <graphlab/graph/distributed_graph.hpp>
38 #include <graphlab/util/generics/conditional_addition_wrapper.hpp>
39 #include <graphlab/util/generics/test_function_or_functor_type.hpp>
40 
41 #include <graphlab/util/generics/any.hpp>
42 #include <graphlab/util/timer.hpp>
43 #include <graphlab/util/mutable_queue.hpp>
44 #include <graphlab/logger/assertions.hpp>
45 #include <graphlab/parallel/pthread_tools.hpp>
46 #include <graphlab/macros_def.hpp>
47 namespace graphlab {
48 
49  /**
50  * \internal
51  * Implements a distributed aggregator interface which can be plugged
52  * into the engine. This class includes management of periodic aggregators.
53  *
54  * Essentially, the engine should ideally pass-through all calls to
55  * - add_vertex_aggregator()
56  * - add_edge_aggregator()
57  * - aggregate_now()
58  * - aggregate_periodic()
59  *
60  * On engine start(), the engine should call aggregate_all_periodic()
61  * to ensure all periodic aggregators are called once prior to vertex program
62  * execution. After which, the start() function should be called to prepare
63  * the state of the schedule. At termination of the engine, the stop()
64  * function should be called to reset the state of the aggregator.
65  *
66  * During engine execution, two modes of operations are permitted:
67  * synchronous, and asynchronous. In a synchronous mode of execution,
68  * the tick_synchronous() function should be called periodically by
69  * exactly one thread on each machine, at the same time. In an asynchronous
70  * mode of execution, tick_asynchronous() should be called periodically
71  * on each machine by some arbitrary thread. This polls the state of the
72  * schedule and activates aggregation jobs which are ready.
73  *
74  * tick_synchronous() and tick_asynchronous() should not be used
75  * simultaneously within the same engine execution . For details on their
76  * usage, see their respective documentation.
77  *
78  */
79  template<typename Graph, typename IContext>
80  class distributed_aggregator {
81  public:
82  typedef Graph graph_type;
83  typedef typename graph_type::local_edge_list_type local_edge_list_type;
84  typedef typename graph_type::local_edge_type local_edge_type;
85  typedef typename graph_type::edge_type edge_type;
86  typedef typename graph_type::local_vertex_type local_vertex_type;
87  typedef typename graph_type::vertex_type vertex_type ;
88  typedef IContext icontext_type;
89 
91  graph_type& graph;
92  icontext_type* context;
93 
94  private:
95 
96  /**
97  * \internal
98  * A base class which contains a "type-free" specification of the
99  * reduction operation, thus allowing the aggregation to be performs at
100  * runtime with no other type information whatsoever.
101  */
102  struct imap_reduce_base {
103  /** \brief makes a copy of the current map reduce spec without copying
104  * accumulator data */
105  virtual imap_reduce_base* clone_empty() const = 0;
106 
107  /** \brief Performs a map operation on the given vertex adding to the
108  * internal accumulator */
109  virtual void perform_map_vertex(icontext_type&, vertex_type&) = 0;
110 
111  /** \brief Performs a map operation on the given edge adding to the
112  * internal accumulator */
113  virtual void perform_map_edge(icontext_type&, edge_type&) = 0;
114 
115  /** \brief Returns true if the accumulation is over vertices.
116  Returns false if it is over edges.*/
117  virtual bool is_vertex_map() const = 0;
118 
119  /** \brief Returns the accumulator stored in an any.
120  (by some magic, any's can be serialized) */
121  virtual any get_accumulator() const = 0;
122 
123  /** \brief Combines accumulators using a second accumulator
124  stored in an any (as returned by get_accumulator).
125  Must be thread safe.*/
126  virtual void add_accumulator_any(any& other) = 0;
127 
128  /** \brief Sets the value of the accumulator
129  from an any (as returned by get_accumulator).
130  Must be thread safe.*/
131  virtual void set_accumulator_any(any& other) = 0;
132 
133 
134  /** \brief Combines accumulators using a second accumulator
135  stored in a second imap_reduce_base class). Must be
136  thread safe. */
137  virtual void add_accumulator(imap_reduce_base* other) = 0;
138 
139  /** \brief Resets the accumulator */
140  virtual void clear_accumulator() = 0;
141 
142  /** \brief Calls the finalize operation on internal accumulator */
143  virtual void finalize(icontext_type&) = 0;
144 
145  virtual ~imap_reduce_base() { }
146  };
147 
148  template <typename ReductionType>
149  struct default_map_types{
150  typedef ReductionType (*vertex_map_type)(icontext_type&, const vertex_type&);
151  typedef ReductionType (*edge_map_type)(icontext_type&, const edge_type&);
152  };
153 
154  /**
155  * \internal
156  * A templated implementation of the imap_reduce_base above.
157  * \tparam ReductionType The reduction type. (The type the map function
158  * returns)
159  */
160  template <typename ReductionType,
161  typename VertexMapperType,
162  typename EdgeMapperType,
163  typename FinalizerType>
164  struct map_reduce_type : public imap_reduce_base {
165  conditional_addition_wrapper<ReductionType> acc;
166  VertexMapperType map_vtx_function;
167  EdgeMapperType map_edge_function;
168  FinalizerType finalize_function;
169 
170  bool vertex_map;
171  mutex lock;
172 
173  /**
174  * \brief Constructor which constructs a vertex reduction
175  */
176  map_reduce_type(VertexMapperType map_vtx_function,
177  FinalizerType finalize_function)
178  : map_vtx_function(map_vtx_function),
179  finalize_function(finalize_function), vertex_map(true) { }
180 
181  /**
182  * \brief Constructor which constructs an edge reduction. The last bool
183  * is unused and allows for disambiguation between the two constructors
184  */
185  map_reduce_type(EdgeMapperType map_edge_function,
186  FinalizerType finalize_function,
187  bool)
188  : map_edge_function(map_edge_function),
189  finalize_function(finalize_function), vertex_map(false) { }
190 
191 
192  void perform_map_vertex(icontext_type& context, vertex_type& vertex) {
193  /**
194  * A compiler error on this line is typically due to the
195  * aggregator map function not having the correct type.
196  *
197  * Verify that the map function has the following form:
198  *
199  * ReductionType mapfun(icontext_type& context, const vertex_type& vertex);
200  *
201  * It is also possible the accumulator type
202  */
203  ReductionType temp = map_vtx_function(context, vertex);
204  /**
205  * A compiler error on this line is typically due to the
206  * accumulator (ReductionType of the map function not having an
207  * operator+=. Ensure that the following is available:
208  *
209  * ReductionType& operator+=(ReductionType& lvalue,
210  * const ReductionType& rvalue);
211  */
212  acc += temp;
213  } // end of perform_map_vertex
214 
215  void perform_map_edge(icontext_type& context, edge_type& edge) {
216  /**
217  * A compiler error on this line is typically due to the
218  * aggregator map function not having the correct type.
219  *
220  * Verify that the map function has the following form:
221  *
222  * ReductionType mapfun(icontext_type& context, const edge_type& vertex);
223  *
224  * It is also possible the accumulator type
225  */
226  ReductionType temp = map_edge_function(context, edge);
227  /**
228  * A compiler error on this line is typically due to the
229  * accumulator (ReductionType of the map function not having an
230  * operator+=. Ensure that the following is available:
231  *
232  * ReductionType& operator+=(ReductionType& lvalue,
233  * const ReductionType& rvalue);
234  */
235  acc += temp;
236  } // end of perform_map_edge
237 
238  bool is_vertex_map() const {
239  return vertex_map;
240  }
241 
242  any get_accumulator() const {
243  return any(acc);
244  }
245 
246  void add_accumulator_any(any& other) {
247  lock.lock();
248  acc += other.as<conditional_addition_wrapper<ReductionType> >();
249  lock.unlock();
250  }
251 
252  void set_accumulator_any(any& other) {
253  lock.lock();
254  acc = other.as<conditional_addition_wrapper<ReductionType> >();
255  lock.unlock();
256  }
257 
258 
259  void add_accumulator(imap_reduce_base* other) {
260  lock.lock();
261  acc += dynamic_cast<map_reduce_type*>(other)->acc;
262  lock.unlock();
263  }
264 
265  void clear_accumulator() {
266  acc.clear();
267  }
268 
269  void finalize(icontext_type& context) {
270  finalize_function(context, acc.value);
271  }
272 
273  imap_reduce_base* clone_empty() const {
274  map_reduce_type* copy;
275  if (is_vertex_map()) {
276  copy = new map_reduce_type(map_vtx_function,
277  finalize_function);
278  }
279  else {
280  copy = new map_reduce_type(map_edge_function,
281  finalize_function,
282  true);
283  }
284  return copy;
285  }
286  };
287 
288 
289  std::map<std::string, imap_reduce_base*> aggregators;
290  std::map<std::string, float> aggregate_period;
291 
292  struct async_aggregator_state {
293  /// Performs reduction of all local threads. On machine 0, also
294  /// accumulates for all machines.
295  imap_reduce_base* root_reducer;
296  /// Accumulator used for each thread
297  std::vector<imap_reduce_base*> per_thread_aggregation;
298  /// Count down the completion of the local machine threads
299  atomic<int> local_count_down;
300  /// Count down the completion of machines. Used only on machine 0
301  atomic<int> distributed_count_down;
302  };
303  std::map<std::string, async_aggregator_state> async_state;
304 
305  float start_time;
306 
307  /* annoyingly the mutable queue is a max heap when I need a min-heap
308  * to track the next thing to activate. So we need to keep
309  * negative priorities... */
311  mutex schedule_lock;
312  size_t ncpus;
313 
314  template <typename ReductionType, typename F>
315  static void test_vertex_mapper_type(std::string key = "") {
316  bool test_result = test_function_or_const_functor_2<F,
317  ReductionType(icontext_type&,
318  const vertex_type&),
319  ReductionType,
320  icontext_type&,
321  const vertex_type&>::value;
322  if (!test_result) {
323  std::stringstream strm;
324  strm << "\n";
325  if (key.empty()) {
326  strm << "Vertex Map Function does not pass strict runtime type checks. \n";
327  }
328  else {
329  strm << "Map Function in Vertex Aggregator " << key
330  << " does not pass strict runtime type checks. \n";
331  }
332  if (boost::is_function<typename boost::remove_pointer<F>::type>::value) {
333  strm
334  << "Function prototype should be \n"
335  << "\t ReductionType f(icontext_type&, const vertex_type&)\n";
336  }
337  else {
338  strm << "Functor's operator() prototype should be \n"
339  << "\t ReductionType operator()(icontext_type&, const vertex_type&) const\n";
340  }
341  strm << "If you are not intentionally violating the abstraction,"
342  << " we recommend fixing your function for safety reasons";
343  strm.flush();
344  logstream(LOG_WARNING) << strm.str() << std::endl;
345  }
346  }
347 
348  template <typename ReductionType, typename F>
349  static void test_edge_mapper_type(std::string key = "") {
350  bool test_result = test_function_or_const_functor_2<F,
351  ReductionType(icontext_type&,
352  const edge_type&),
353  ReductionType,
354  icontext_type&,
355  const edge_type&>::value;
356 
357  if (!test_result) {
358  std::stringstream strm;
359  strm << "\n";
360  if (key.empty()) {
361  strm << "Edge Map Function does not pass strict runtime type checks. \n";
362  }
363  else {
364  strm << "Map Function in Edge Aggregator " << key
365  << " does not pass strict runtime type checks. \n";
366  }
367  if (boost::is_function<typename boost::remove_pointer<F>::type>::value) {
368  strm << "Function prototype should be \n"
369  << "\t ReductionType f(icontext_type&, const edge_type&)\n";
370  }
371  else {
372  strm << "Functor's operator() prototype should be "
373  << "\t ReductionType operator()(icontext_type&, const edge_type&) const\n";
374  }
375  strm << "If you are not intentionally violating the abstraction,"
376  << " we recommend fixing your function for safety reasons";
377  logstream(LOG_WARNING) << strm.str() << std::endl;
378  }
379  }
380 
381  public:
382 
383 
384  distributed_aggregator(distributed_control& dc,
385  graph_type& graph,
386  icontext_type* context):
387  rmi(dc, this), graph(graph),
388  context(context), ncpus(0) { }
389 
390  /**
391  * \copydoc graphlab::iengine::add_vertex_aggregator
392  */
393  template <typename ReductionType,
394  typename VertexMapperType,
395  typename FinalizerType>
396  bool add_vertex_aggregator(const std::string& key,
397  VertexMapperType map_function,
398  FinalizerType finalize_function) {
399  if (key.length() == 0) return false;
400  if (aggregators.count(key) == 0) {
401 
402  if (rmi.procid() == 0) {
403  // do a runtime type check
404  test_vertex_mapper_type<ReductionType, VertexMapperType>(key);
405  }
406 
407  aggregators[key] = new map_reduce_type<ReductionType,
408  VertexMapperType,
409  typename default_map_types<ReductionType>::edge_map_type,
410  FinalizerType>(map_function,
411  finalize_function);
412  return true;
413  }
414  else {
415  // aggregator already exists. fail
416  return false;
417  }
418  }
419 
420 #if defined(__cplusplus) && __cplusplus >= 201103L
421  /**
422  * \brief An overload of add_vertex_aggregator for C++11 which does not
423  * require the user to provide the reduction type.
424  *
425  * This function is available only if the compiler has C++11 support.
426  * Specifically, it uses C++11's decltype operation to infer the
427  * reduction type, thus eliminating the need for the function
428  */
429  template <typename VertexMapperType,
430  typename FinalizerType>
431  bool add_vertex_aggregator(const std::string& key,
432  VertexMapperType map_function,
433  FinalizerType finalize_function) {
434  //typedef decltype(map_function(*context,graph.vertex(0))) ReductionType;
435  typedef decltype(map_function(*context, graph.vertex(0))) ReductionType;
436  if (key.length() == 0) return false;
437  if (aggregators.count(key) == 0) {
438  aggregators[key] = new map_reduce_type<ReductionType,
439  VertexMapperType,
440  typename default_map_types<ReductionType>::edge_map_type,
441  FinalizerType>(map_function,
442  finalize_function);
443  return true;
444  }
445  else {
446  // aggregator already exists. fail
447  return false;
448  }
449  }
450 #endif
451 
452  /**
453  * \copydoc graphlab::iengine::add_edge_aggregator
454  */
455  template <typename ReductionType,
456  typename EdgeMapperType,
457  typename FinalizerType>
458  bool add_edge_aggregator(const std::string& key,
459  EdgeMapperType map_function,
460  FinalizerType finalize_function) {
461  if (key.length() == 0) return false;
462  if (aggregators.count(key) == 0) {
463  if (rmi.procid() == 0) {
464  // do a runtime type check
465  test_edge_mapper_type<ReductionType, EdgeMapperType>(key);
466  }
467  aggregators[key] = new map_reduce_type<ReductionType,
468  typename default_map_types<ReductionType>::vertex_map_type,
469  EdgeMapperType,
470  FinalizerType>(map_function,
471  finalize_function,
472  true);
473  return true;
474  }
475  else {
476  // aggregator already exists. fail
477  return false;
478  }
479  }
480 
481 #if defined(__cplusplus) && __cplusplus >= 201103L
482  /**
483  * \brief An overload of add_edge_aggregator for C++11 which does not
484  * require the user to provide the reduction type.
485  *
486  * This function is available only if the compiler has C++11 support.
487  * Specifically, it uses C++11's decltype operation to infer the
488  * reduction type, thus eliminating the need for the function
489  * call to be templatized over the reduction type.
490  */
491  template <typename EdgeMapperType,
492  typename FinalizerType>
493  bool add_edge_aggregator(const std::string& key,
494  EdgeMapperType map_function,
495  FinalizerType finalize_function) {
496  // an edge_type is actually hard to get
497  typedef decltype(map_function(*context, edge_type(graph.l_vertex(0).in_edges()[0]) )) ReductionType;
498  if (key.length() == 0) return false;
499  if (aggregators.count(key) == 0) {
500  aggregators[key] = new map_reduce_type<ReductionType,
501  typename default_map_types<ReductionType>::vertex_map_type,
502  EdgeMapperType,
503  FinalizerType>(map_function,
504  finalize_function,
505  true);
506  return true;
507  }
508  else {
509  // aggregator already exists. fail
510  return false;
511  }
512  }
513 #endif
514 
515  /**
516  * \copydoc graphlab::iengine::aggregate_now
517  */
518  bool aggregate_now(const std::string& key) {
519  ASSERT_MSG(graph.is_finalized(), "Graph must be finalized");
520  if (aggregators.count(key) == 0) {
521  ASSERT_MSG(false, "Requested aggregator %s not found", key.c_str());
522  return false;
523  }
524 
525  imap_reduce_base* mr = aggregators[key];
526  mr->clear_accumulator();
527  // ok. now we perform reduction on local data in parallel
528 #ifdef _OPENMP
529 #pragma omp parallel
530 #endif
531  {
532  imap_reduce_base* localmr = mr->clone_empty();
533  if (localmr->is_vertex_map()) {
534 #ifdef _OPENMP
535  #pragma omp for
536 #endif
537  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
538  local_vertex_type lvertex = graph.l_vertex(i);
539  if (lvertex.owner() == rmi.procid()) {
540  vertex_type vertex(lvertex);
541  localmr->perform_map_vertex(*context, vertex);
542  }
543  }
544  }
545  else {
546 #ifdef _OPENMP
547  #pragma omp for
548 #endif
549  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
550  foreach(local_edge_type e, graph.l_vertex(i).in_edges()) {
551  edge_type edge(e);
552  localmr->perform_map_edge(*context, edge);
553  }
554  }
555  }
556 #ifdef _OPENMP
557  #pragma omp critical
558 #endif
559  {
560  mr->add_accumulator(localmr);
561  }
562  delete localmr;
563  }
564 
565  std::vector<any> gathervec(rmi.numprocs());
566  gathervec[rmi.procid()] = mr->get_accumulator();
567 
568  rmi.gather(gathervec, 0);
569 
570  if (rmi.procid() == 0) {
571  // machine 0 aggregates the accumulators
572  // sums them together and broadcasts it
573  for (procid_t i = 1; i < rmi.numprocs(); ++i) {
574  mr->add_accumulator_any(gathervec[i]);
575  }
576  any val = mr->get_accumulator();
577  rmi.broadcast(val, true);
578  }
579  else {
580  // all other machines wait for the broadcast value
581  any val;
582  rmi.broadcast(val, false);
583  mr->set_accumulator_any(val);
584  }
585  mr->finalize(*context);
586  mr->clear_accumulator();
587  gathervec.clear();
588  return true;
589  }
590 
591 
592  /**
593  * \copydoc graphlab::iengine::aggregate_periodic
594  */
595  bool aggregate_periodic(const std::string& key, float seconds) {
596  rmi.barrier();
597  if (seconds < 0) return false;
598  if (aggregators.count(key) == 0) return false;
599  else aggregate_period[key] = seconds;
600  return true;
601  }
602 
603  /**
604  * Performs aggregation on all keys registered with a period.
605  * May be used on engine start() to ensure all periodic
606  * aggregators are executed before engine execution.
607  */
608  void aggregate_all_periodic() {
609  typename std::map<std::string, float>::iterator iter =
610  aggregate_period.begin();
611  while (iter != aggregate_period.end()) {
612  aggregate_now(iter->first);
613  ++iter;
614  }
615  }
616 
617 
618  /**
619  * Must be called on engine start. Initializes the internal scheduler.
620  * Must be called on all machines simultaneously.
621  * ncpus is really only important for the asynchronous implementation.
622  * It must be equal to the number of engine threads.
623  *
624  * \param [in] cpus Number of engine threads used. This is only necessary
625  * if the asynchronous form is used.
626  */
627  void start(size_t ncpus = 0) {
628  rmi.barrier();
629  schedule.clear();
630  start_time = timer::approx_time_seconds();
631  typename std::map<std::string, float>::iterator iter =
632  aggregate_period.begin();
633  while (iter != aggregate_period.end()) {
634  // schedule is a max heap. To treat it like a min heap
635  // I need to insert negative keys
636  schedule.push(iter->first, -iter->second);
637  ++iter;
638  }
639  this->ncpus = ncpus;
640 
641  // now initialize the asyncronous reduction states
642  if(ncpus > 0) {
643  iter = aggregate_period.begin();
644  while (iter != aggregate_period.end()) {
645  async_state[iter->first].local_count_down = (int)ncpus;
646  async_state[iter->first].distributed_count_down =
647  (int)rmi.numprocs();
648 
649  async_state[iter->first].per_thread_aggregation.resize(ncpus);
650  for (size_t i = 0; i < ncpus; ++i) {
651  async_state[iter->first].per_thread_aggregation[i] =
652  aggregators[iter->first]->clone_empty();
653  }
654  async_state[iter->first].root_reducer =
655  aggregators[iter->first]->clone_empty();
656  ++iter;
657  }
658  }
659  }
660 
661 
662  /**
663  * If asynchronous aggregation is desired, this function is
664  * to be called periodically on each machine. This polls the schedule to
665  * check if there is an aggregator which needs to be activated. If there
666  * is an aggregator to be started, this function will return a non empty
667  * string. This function is thread reentrant and each activated aggregator
668  * will only return a non empty string call to one call to
669  * tick_asynchronous() on each machine.
670  *
671  * If an empty is returned, the asynchronous engine
672  * must ensure that all threads (ncpus per machine) must eventually
673  * call tick_asynchronous_compute(cpuid, key) where key is the return string.
674  */
675  std::string tick_asynchronous() {
676  // if we fail to acquire the lock, go ahead
677  if (!schedule_lock.try_lock()) return "";
678 
679  // see if there is a key to run
680  float curtime = timer::approx_time_seconds() - start_time;
681  std::string key;
682  bool has_entry = false;
683  if (!schedule.empty() && -schedule.top().second <= curtime) {
684  key = schedule.top().first;
685  has_entry = true;
686  schedule.pop();
687  }
688  schedule_lock.unlock();
689 
690  // no key to run. return false
691  if (has_entry == false) return "";
692  else return key;
693  // ok. we have a key to run, construct the local reducers
694  }
695 
696 
697  /**
698  * Once tick_asynchronous() returns a key, all threads in the engine
699  * should call tick_asynchronous_compute() with a matching key.
700  * This function will perform the computation for the key in question
701  * and send the accumulated result back to machine 0 when done
702  */
703  void tick_asynchronous_compute(size_t cpuid, const std::string& key) {
704  // acquire and check the async_aggregator_state
705  typename std::map<std::string, async_aggregator_state>::iterator iter =
706  async_state.find(key);
707  ASSERT_MSG(iter != async_state.end(), "Key %s not found", key.c_str());
708  ASSERT_GT(iter->second.per_thread_aggregation.size(), cpuid);
709 
710  imap_reduce_base* localmr = iter->second.per_thread_aggregation[cpuid];
711  // perform the reduction using the local mr
712  if (localmr->is_vertex_map()) {
713  for (int i = cpuid;i < (int)graph.num_local_vertices(); i+=ncpus) {
714  local_vertex_type lvertex = graph.l_vertex(i);
715  if (lvertex.owner() == rmi.procid()) {
716  vertex_type vertex(lvertex);
717  localmr->perform_map_vertex(*context, vertex);
718  }
719  }
720  } else {
721  for (int i = cpuid;i < (int)graph.num_local_vertices(); i+=ncpus) {
722  foreach(local_edge_type e, graph.l_vertex(i).in_edges()) {
723  edge_type edge(e);
724  localmr->perform_map_edge(*context, edge);
725  }
726  }
727  }
728  iter->second.root_reducer->add_accumulator(localmr);
729  int countdown_val = iter->second.local_count_down.dec();
730 
731  ASSERT_LT(countdown_val, ncpus);
732  ASSERT_GE(countdown_val, 0);
733  if (countdown_val == 0) {
734  // reset the async_state to pristine condition.
735  // - clear all thread reducers since we got all we need from them
736  // - clear all the local root reducer except for machine 0 (and after
737  // we read the accumulator from them.
738  // - reset the counters
739  for (size_t i = 0;
740  i < iter->second.per_thread_aggregation.size(); ++i) {
741  iter->second.per_thread_aggregation[i]->clear_accumulator();
742  }
743  iter->second.local_count_down = ncpus;
744 
745  if (rmi.procid() != 0) {
746  // ok we need to signal back to the the root to perform finalization
747  // read the accumulator
748  any acc = iter->second.root_reducer->get_accumulator();
749  iter->second.root_reducer->clear_accumulator();
750  rmi.remote_call(0, &distributed_aggregator::rpc_key_merge,
751  key, acc);
752  }
753  else {
754  decrement_distributed_counter(key);
755  }
756  }
757  }
758 
759  /**
760  * RPC Call called by other machines with their accumulator for the key.
761  * This function will merge the accumulator and perform finalization
762  * when all accumulators are received
763  */
764  void rpc_key_merge(const std::string& key, any& acc) {
765  // acquire and check the async_aggregator_state
766  typename std::map<std::string, async_aggregator_state>::iterator iter =
767  async_state.find(key);
768  ASSERT_MSG(iter != async_state.end(), "Key %s not found", key.c_str());
769  iter->second.root_reducer->add_accumulator_any(acc);
770  decrement_distributed_counter(key);
771  }
772 
773  /**
774  * Called whenever one machine finishes all of its local accumulation.
775  * When the counter determines that all machine's accumulators have been
776  * received, this function performs finalization and prepares and
777  * broadcasts the next scheduled time for the key.
778  */
779  void decrement_distributed_counter(const std::string& key) {
780  // must be master machine
781  ASSERT_EQ(rmi.procid(), 0);
782  // acquire and check the async_aggregator_state
783  typename std::map<std::string, async_aggregator_state>::iterator iter =
784  async_state.find(key);
785  ASSERT_MSG(iter != async_state.end(), "Key %s not found", key.c_str());
786  int countdown_val = iter->second.distributed_count_down.dec();
787  logstream(LOG_INFO) << "Distributed Aggregation of " << key << ". "
788  << countdown_val << " remaining." << std::endl;
789 
790  ASSERT_LE(countdown_val, rmi.numprocs());
791  ASSERT_GE(countdown_val, 0);
792  if (countdown_val == 0) {
793  logstream(LOG_INFO) << "Aggregate completion of " << key << std::endl;
794  any acc_val = iter->second.root_reducer->get_accumulator();
795  // set distributed count down again for the second phase:
796  // waiting for everyone to finish finalization
797  iter->second.distributed_count_down = rmi.numprocs();
798  for (procid_t i = 1;i < rmi.numprocs(); ++i) {
799  rmi.remote_call(i, &distributed_aggregator::rpc_perform_finalize,
800  key, acc_val);
801  }
802  iter->second.root_reducer->finalize(*context);
803  iter->second.root_reducer->clear_accumulator();
804  decrement_finalize_counter(key);
805  }
806  }
807 
808  /**
809  * Called from the root machine to all machines to perform finalization
810  * on the key
811  */
812  void rpc_perform_finalize(const std::string& key, any& acc_val) {
813  ASSERT_NE(rmi.procid(), 0);
814  typename std::map<std::string, async_aggregator_state>::iterator iter =
815  async_state.find(key);
816  ASSERT_MSG(iter != async_state.end(), "Key %s not found", key.c_str());
817 
818  iter->second.root_reducer->set_accumulator_any(acc_val);
819  iter->second.root_reducer->finalize(*context);
820  iter->second.root_reducer->clear_accumulator();
821  // reply to the root machine
822  rmi.remote_call(0, &distributed_aggregator::decrement_finalize_counter,
823  key);
824  }
825 
826 
827  void decrement_finalize_counter(const std::string& key) {
828  typename std::map<std::string, async_aggregator_state>::iterator iter =
829  async_state.find(key);
830  ASSERT_MSG(iter != async_state.end(), "Key %s not found", key.c_str());
831  int countdown_val = iter->second.distributed_count_down.dec();
832  if (countdown_val == 0) {
833  // done! all finalization is complete.
834  // reset the counter
835  iter->second.distributed_count_down = rmi.numprocs();
836  // when is the next time we start.
837  // time is as an offset to start_time
838  float next_time = timer::approx_time_seconds() +
839  aggregate_period[key] - start_time;
840  logstream(LOG_INFO) << rmi.procid() << "Reschedule of " << key
841  << " at " << next_time << std::endl;
842  rpc_schedule_key(key, next_time);
843  for (procid_t i = 1;i < rmi.numprocs(); ++i) {
844  rmi.remote_call(i, &distributed_aggregator::rpc_schedule_key,
845  key, next_time);
846  }
847  }
848  }
849 
850  /**
851  * Called to schedule the next trigger time for the key
852  */
853  void rpc_schedule_key(const std::string& key, float next_time) {
854  schedule_lock.lock();
855  schedule.push(key, -next_time);
856  schedule_lock.unlock();
857  }
858 
859 
860  /**
861  * If synchronous aggregation is desired, this function is
862  * To be called simultaneously by one thread on each machine.
863  * This polls the schedule to see if there
864  * is an aggregator which needs to be activated. If there is an aggregator
865  * to be started, this function will perform aggregation.
866  */
867  void tick_synchronous() {
868  // if timer has exceeded our top key
869  float curtime = timer::approx_time_seconds() - start_time;
870  rmi.broadcast(curtime, rmi.procid() == 0);
871  // note that we do not call approx_time_seconds everytime
872  // this ensures that each key will only be run at most once.
873  // each time tick_synchronous is called.
874  std::vector<std::pair<std::string, float> > next_schedule;
875  while(!schedule.empty() && -schedule.top().second <= curtime) {
876  std::string key = schedule.top().first;
877  aggregate_now(key);
878  schedule.pop();
879  // when is the next time we start.
880  // time is as an offset to start_time
881  float next_time = (timer::approx_time_seconds() +
882  aggregate_period[key] - start_time);
883  rmi.broadcast(next_time, rmi.procid() == 0);
884  next_schedule.push_back(std::make_pair(key, -next_time));
885  }
886 
887  for (size_t i = 0;i < next_schedule.size(); ++i) {
888  schedule.push(next_schedule[i].first, next_schedule[i].second);
889  }
890  }
891 
892  /**
893  * Must be called on engine stop. Clears the internal scheduler
894  * And resets all incomplete states.
895  */
896  void stop() {
897  schedule.clear();
898  // clear the aggregators
899  {
900  typename std::map<std::string, imap_reduce_base*>::iterator iter =
901  aggregators.begin();
902  while (iter != aggregators.end()) {
903  iter->second->clear_accumulator();
904  ++iter;
905  }
906  }
907  // clear the asynchronous state
908  {
909  typename std::map<std::string, async_aggregator_state>::iterator
910  iter = async_state.begin();
911  while (iter != async_state.end()) {
912  delete iter->second.root_reducer;
913  for (size_t i = 0;
914  i < iter->second.per_thread_aggregation.size();
915  ++i) {
916  delete iter->second.per_thread_aggregation[i];
917  }
918  iter->second.per_thread_aggregation.clear();
919  ++iter;
920  }
921  async_state.clear();
922  }
923  }
924 
925 
926  std::set<std::string> get_all_periodic_keys() const {
927  typename std::map<std::string, float>::const_iterator iter =
928  aggregate_period.begin();
929  std::set<std::string> ret;
930  while (iter != aggregate_period.end()) {
931  ret.insert(iter->first);
932  ++iter;
933  }
934  return ret;
935  }
936 
937 
938 
939 
940  template <typename ResultType, typename MapFunctionType>
941  ResultType map_reduce_vertices(MapFunctionType mapfunction) {
942  ASSERT_MSG(graph.is_finalized(), "Graph must be finalized");
943 
944  if (rmi.procid() == 0) {
945  // do a runtime type check
946  test_vertex_mapper_type<ResultType, MapFunctionType>();
947  }
948 
949  rmi.barrier();
950  bool global_result_set = false;
951  ResultType global_result = ResultType();
952 #ifdef _OPENMP
953 #pragma omp parallel
954 #endif
955  {
956  bool result_set = false;
957  ResultType result = ResultType();
958 #ifdef _OPENMP
959  #pragma omp for
960 #endif
961  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
962  if (graph.l_vertex(i).owner() == rmi.procid()) {
963  if (!result_set) {
964  vertex_type vtx(graph.l_vertex(i));
965  result = mapfunction(*context, vtx);
966  result_set = true;
967  }
968  else if (result_set){
969  vertex_type vtx(graph.l_vertex(i));
970  result += mapfunction(*context, vtx);
971  }
972  }
973  }
974 #ifdef _OPENMP
975  #pragma omp critical
976 #endif
977  {
978  if (result_set) {
979  if (!global_result_set) {
980  global_result = result;
981  global_result_set = true;
982  }
983  else {
984  global_result += result;
985  }
986  }
987  }
988  }
989  conditional_addition_wrapper<ResultType> wrapper(global_result, global_result_set);
990  rmi.all_reduce(wrapper);
991  return wrapper.value;
992  }
993 
994 
995 
996  template <typename ResultType, typename MapFunctionType>
997  ResultType map_reduce_edges(MapFunctionType mapfunction) {
998  ASSERT_MSG(graph.is_finalized(), "Graph must be finalized");
999 
1000  if (rmi.procid() == 0) {
1001  // do a runtime type check
1002  test_edge_mapper_type<ResultType, MapFunctionType>();
1003  }
1004 
1005  rmi.barrier();
1006  bool global_result_set = false;
1007  ResultType global_result = ResultType();
1008 #ifdef _OPENMP
1009 #pragma omp parallel
1010 #endif
1011  {
1012  bool result_set = false;
1013  ResultType result = ResultType();
1014 #ifdef _OPENMP
1015  #pragma omp for
1016 #endif
1017  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1018  foreach(const local_edge_type& e, graph.l_vertex(i).in_edges()) {
1019  if (!result_set) {
1020  edge_type edge(e);
1021  result = mapfunction(*context, edge);
1022  result_set = true;
1023  }
1024  else if (result_set){
1025  edge_type edge(e);
1026  result += mapfunction(*context, edge);
1027  }
1028  }
1029  }
1030 #ifdef _OPENMP
1031  #pragma omp critical
1032 #endif
1033  {
1034  if (result_set) {
1035  if (!global_result_set) {
1036  global_result = result;
1037  global_result_set = true;
1038  }
1039  else {
1040  global_result += result;
1041  }
1042  }
1043  }
1044  }
1045 
1046  conditional_addition_wrapper<ResultType> wrapper(global_result, global_result_set);
1047  rmi.all_reduce(wrapper);
1048  return wrapper.value;
1049  }
1050 
1051  template <typename TransformType>
1052  void transform_vertices(TransformType transform_functor) {
1053  ASSERT_MSG(graph.is_finalized(), "Graph must be finalized");
1054  rmi.barrier();
1055 #ifdef _OPENMP
1056  #pragma omp parallel for
1057 #endif
1058  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1059  if (graph.l_vertex(i).owner() == rmi.procid()) {
1060  vertex_type vtx(graph.l_vertex(i));
1061  transform_functor(*context, vtx);
1062  }
1063  }
1064  rmi.barrier();
1065  graph.synchronize();
1066  }
1067 
1068 
1069  template <typename TransformType>
1070  void transform_edges(TransformType transform_functor) {
1071  ASSERT_MSG(graph.is_finalized(), "Graph must be finalized");
1072  rmi.barrier();
1073 #ifdef _OPENMP
1074  #pragma omp parallel for
1075 #endif
1076  for (int i = 0; i < (int)graph.num_local_vertices(); ++i) {
1077  foreach(const local_edge_type& e, graph.l_vertex(i).in_edges()) {
1078  edge_type edge(e);
1079  transform_functor(*context, edge);
1080  }
1081  }
1082  rmi.barrier();
1083  }
1084 
1085 
1086 
1087 
1088 
1089  ~distributed_aggregator() {
1090  delete context;
1091  }
1092  };
1093 
1094 
1095 }; // end of graphlab namespace
1096 #include <graphlab/macros_undef.hpp>
1097 
1098 #endif