GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
iengine.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 /**
26  * Also contains code that is Copyright 2011 Yahoo! Inc. All rights
27  * reserved.
28  *
29  * Contributed under the iCLA for:
30  * Joseph Gonzalez ([email protected])
31  *
32  */
33 
34 #ifndef GRAPHLAB_IENGINE_HPP
35 #define GRAPHLAB_IENGINE_HPP
36 
37 #include <boost/bind.hpp>
38 #include <boost/functional.hpp>
39 
40 #include <graphlab/vertex_program/icontext.hpp>
41 #include <graphlab/engine/execution_status.hpp>
42 #include <graphlab/options/graphlab_options.hpp>
43 #include <graphlab/serialization/serialization_includes.hpp>
44 #include <graphlab/aggregation/distributed_aggregator.hpp>
45 #include <graphlab/vertex_program/op_plus_eq_concept.hpp>
46 #include <graphlab/graph/vertex_set.hpp>
47 
48 
49 #if defined(__cplusplus) && __cplusplus >= 201103L
50 // for whatever reason boost concept is broken under C++11.
51 // Temporary workaround. TOFIX
52 #undef BOOST_CONCEPT_ASSERT
53 #define BOOST_CONCEPT_ASSERT(unused)
54 #endif
55 
56 namespace graphlab {
57 
58 
59  /**
60  * \ingroup engine
61  *
62  * \brief The abstract interface of a GraphLab engine.
63  *
64  * A GraphLab engine is responsible for executing vertex programs in
65  * parallel on one or more machines. GraphLab has a collection of
66  * different engines with different guarantees on how
67  * vertex-programs are executed. However each engine must implement
68  * the iengine interface to allow them to be used "interchangeably."
69  *
70  * In addition to executing vertex programs GraphLab engines also
71  * expose a synchronous aggregation framework. This allows users to
72  * attach "map-reduce" style jobs that are run periodically on all
73  * edges or vertices while GraphLab programs are actively running.
74  *
75  * Example Usage
76  * =================
77  *
78  * One can use the iengine interface to select between different
79  * engines at runtime:
80  *
81  * \code
82  * iengine<pagerank>* engine_ptr = NULL;
83  * if(cmdline_arg == "synchronous") {
84  * engine_ptr = new synchronous_engine<pagerank>(dc, graph, cmdopts);
85  * } else {
86  * engine_ptr = new async_consistent_engine<pagerank>(dc, graph, cmdopts);
87  * }
88  * // Attach an aggregator
89  * engine_ptr->add_edge_aggregator<float>("edge_map",
90  * edge_map_fun, finalize_fun);
91  * // Make it run every 3 seconds
92  * engine_ptr->aggregate_periodic("edge_map");
93  * // Signal all vertices
94  * engine_ptr->signal_all();
95  * // Run the engine
96  * engine_ptr->start();
97  * // do something interesting
98  * delete engine_ptr; engine_ptr = NULL;
99  * \endcode
100  *
101  * @tparam VertexProgram The user defined vertex program which should extend the
102  * \ref ivertex_program interface.
103  */
104  template<typename VertexProgram>
105  class iengine {
106  public:
107  /**
108  * \brief The user defined vertex program type which should extend
109  * ivertex_program.
110  */
111  typedef VertexProgram vertex_program_type;
112 
113  /**
114  * \cond GRAPHLAB_INTERNAL
115  * \brief GraphLab Requires that vertex programs be default
116  * constructible.
117  *
118  * \code
119  * class vertex_program {
120  * public:
121  * vertex_program() { }
122  * };
123  * \endcode
124  */
125  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<vertex_program_type>));
126  /// \endcond
127 
128 
129 
130  /**
131  * \cond GRAPHLAB_INTERNAL
132  *
133  * \brief GraphLab requires that the vertex programx type be
134  * Serializable. See \ref sec_serializable for detials.
135  */
136  BOOST_CONCEPT_ASSERT((graphlab::Serializable<vertex_program_type>));
137  /// \endcond
138 
139 
140  /**
141  * \brief The user defined message type which is defined in
142  * ivertex_program::message_type.
143  *
144  */
145  typedef typename vertex_program_type::message_type message_type;
146 
147  /**
148  * \brief The graph type which is defined in
149  * ivertex_program::graph_type and will typically be
150  * \ref distributed_graph.
151  */
152  typedef typename vertex_program_type::graph_type graph_type;
153 
154  /**
155  * \brief The vertex identifier type defined in
156  * \ref graphlab::vertex_id_type.
157  */
158  typedef typename graph_type::vertex_id_type vertex_id_type;
159 
160  /**
161  * \brief the vertex object type which contains a reference to the
162  * vertex data and is defined in the iengine::graph_type
163  * (see for example \ref distributed_graph::vertex_type).
164  */
166 
167  /**
168  * \brief the edge object type which contains a reference to the
169  * edge data and is defined in the iengine::graph_type (see for
170  * example \ref distributed_graph::edge_type).
171  */
173 
174  /**
175  * \brief The context type which is passed into vertex programs as
176  * a callback to the engine.
177  *
178  * Most engines use the \ref graphlab::context implementation.
179  */
180  typedef typename vertex_program_type::icontext_type icontext_type;
181 
182  /**
183  * \brief The type of the distributed aggregator used by each engine to
184  * implement distributed aggregation.
185  */
186  typedef distributed_aggregator<graph_type, icontext_type> aggregator_type;
187 
188 
189  /**
190  * \internal
191  * \brief Virtual destructor required for inheritance
192  */
193  virtual ~iengine() {};
194 
195  /**
196  * \brief Start the engine execution.
197  *
198  * Behavior details depend on the engine implementation. See the
199  * implementation documentation for specifics.
200  *
201  * @return the reason for termination
202  */
203  virtual execution_status::status_enum start() = 0;
204 
205  /**
206  * \brief Compute the total number of updates (calls to apply)
207  * executed since start was last invoked.
208  *
209  * \return Total number of updates
210  */
211  virtual size_t num_updates() const = 0;
212 
213  /**
214  * \brief Get the elapsed time in seconds since start was last
215  * called.
216  *
217  * \return elapsed time in seconds
218  */
219  virtual float elapsed_seconds() const = 0;
220 
221  /**
222  * \brief get the current iteration number. This is not defined
223  * for all engines in which case -1 is returned.
224  *
225  * \return the current iteration or -1 if not supported.
226  */
227  virtual int iteration() const { return -1; }
228 
229 
230  /**
231  * \brief Signals single a vertex with an optional message.
232  *
233  * This function sends a message to particular vertex which will
234  * receive that message on start. The signal function must be
235  * invoked on all machines simultaneously. For example:
236  *
237  * \code
238  * graphlab::synchronous_engine<vprog> engine(dc, graph, opts);
239  * engine.signal(0); // signal vertex zero
240  * \endcode
241  *
242  * and _not_:
243  *
244  * \code
245  * graphlab::synchronous_engine<vprog> engine(dc, graph, opts);
246  * if(dc.procid() == 0) engine.signal(0); // signal vertex zero
247  * \endcode
248  *
249  * Since signal is executed synchronously on all machines it
250  * should only be used to schedule a small set of vertices. The
251  * preferred method to signal a large set of vertices (e.g., all
252  * vertices that are a certain type) is to use either the vertex
253  * program init function or the aggregation framework. For
254  * example to signal all vertices that have a particular value one
255  * could write:
256  *
257  * \code
258  * struct bipartite_opt :
259  * public graphlab::ivertex_program<graph_type, gather_type> {
260  * // The user defined init function
261  * void init(icontext_type& context, vertex_type& vertex) {
262  * // Signal myself if I am a certain type
263  * if(vertex.data().on_left) context.signal(vertex);
264  * }
265  * // other vastly more interesting code
266  * };
267  * \endcode
268  *
269  * @param [in] vid the vertex id to signal
270  * @param [in] message the message to send to that vertex. The
271  * default message is sent if no message is provided.
272  * (See ivertex_program::message_type for details about the
273  * message_type).
274  */
275  virtual void signal(vertex_id_type vertex,
276  const message_type& message = message_type()) = 0;
277 
278  /**
279  * \brief Signal all vertices with a particular message.
280  *
281  * This function sends the same message to all vertices which will
282  * receive that message on start. The signal_all function must be
283  * invoked on all machines simultaneously. For example:
284  *
285  * \code
286  * graphlab::synchronous_engine<vprog> engine(dc, graph, opts);
287  * engine.signal_all(); // signal all vertices
288  * \endcode
289  *
290  * and _not_:
291  *
292  * \code
293  * graphlab::synchronous_engine<vprog> engine(dc, graph, opts);
294  * if(dc.procid() == 0) engine.signal_all(); // signal vertex zero
295  * \endcode
296  *
297  * The signal_all function is the most common way to send messages
298  * to the engine. For example in the pagerank application we want
299  * all vertices to be active on the first round. Therefore we
300  * would write:
301  *
302  * \code
303  * graphlab::synchronous_engine<pagerank> engine(dc, graph, opts);
304  * engine.signal_all();
305  * engine.start();
306  * \endcode
307  *
308  * @param [in] message the message to send to all vertices. The
309  * default message is sent if no message is provided
310  * (See ivertex_program::message_type for details about the
311  * message_type).
312  */
313  virtual void signal_all(const message_type& message = message_type(),
314  const std::string& order = "shuffle") = 0;
315 
316  /**
317  * \brief Signal a set of vertices with a particular message.
318  *
319  * This function sends the same message to a set of vertices which will
320  * receive that message on start. The signal_vset function must be
321  * invoked on all machines simultaneously. For example:
322  *
323  * \code
324  * graphlab::synchronous_engine<vprog> engine(dc, graph, opts);
325  * engine.signal_vset(vset); // signal a subset of vertices
326  * \endcode
327  *
328  * signal_all() is conceptually equivalent to:
329  *
330  * \code
331  * engine.signal_vset(graph.complete_set());
332  * \endcode
333  *
334  * @param [in] vset The set of vertices to signal
335  * @param [in] message the message to send to all vertices. The
336  * default message is sent if no message is provided
337  * (See ivertex_program::message_type for details about the
338  * message_type).
339  */
340  virtual void signal_vset(const vertex_set& vset,
341  const message_type& message = message_type(),
342  const std::string& order = "shuffle") = 0;
343 
344 
345  /**
346  * \brief Creates a vertex aggregator. Returns true on success.
347  * Returns false if an aggregator of the same name already
348  * exists.
349  *
350  * Creates a vertex aggregator associated to a particular key.
351  * The map_function is called over every vertex in the graph, and the
352  * return value of the map is summed. The finalize_function is then called
353  * on the result of the reduction. The finalize_function is called on
354  * all machines. The map_function should only read the graph data,
355  * and should not make any modifications.
356  *
357  * ### Basic Usage
358  * For instance, if the graph has float vertex data, and float edge data:
359  * \code
360  * typedef graphlab::distributed_graph<float, float> graph_type;
361  * \endcode
362  *
363  * An aggregator can be constructed to compute the absolute sum of all the
364  * vertex data. To do this, we define two functions.
365  * \code
366  * float absolute_vertex_data(engine_type::icontext_type& context,
367  * graph_type::vertex_type vertex) {
368  * return std::fabs(vertex.data());
369  * }
370  *
371  * void print_finalize(engine_type::icontext_type& context,
372  * float total) {
373  * std::cout << total << "\n";
374  * }
375  * \endcode
376  *
377  * Next, we define the aggregator in the engine by calling
378  * add_vertex_aggregator(). We must assign it a unique
379  * name which will be used to reference this particular aggregate
380  * operation. We shall call it "absolute_vertex_sum".
381  * \code
382  * engine.add_vertex_aggregator<float>("absolute_vertex_sum",
383  * absolute_vertex_data,
384  * print_finalize);
385  * \endcode
386  *
387  * When executed, the engine execute <code>absolute_vertex_data()</code>
388  * on each vertex in the graph. <code>absolute_vertex_data()</code>
389  * reads the vertex data, and returns its absolute value. All return
390  * values are then summing them together using the float's += operator.
391  * The final result is than passed to the <code>print_finalize</code>
392  * function. The template argument <code><float></code> is necessary to
393  * provide information about the return type of
394  * <code>absolute_vertex_data</code>.
395  *
396  *
397  * This aggregator can be run immediately by calling
398  * aggregate_now() with the name of the aggregator.
399  * \code
400  * engine.aggregate_now("absolute_vertex_sum");
401  * \endcode
402  *
403  * Or can be arranged to run periodically together with the engine
404  * execution (in this example, every 1.5 seconds).
405  * \code
406  * engine.aggregate_periodic("absolute_vertex_sum", 1.5);
407  * \endcode
408  *
409  * Note that since finalize is called on <b>all machines</b>, multiple
410  * copies of the total will be printed. If only one copy is desired,
411  * see \ref graphlab::icontext::cout() "context.cout()" or to get
412  * the actual process ID using
413  * \ref graphlab::icontext::procid() "context.procid()"
414  *
415  * In practice, the reduction type can be any arbitrary user-defined type
416  * as long as a += operator is defined. This permits great flexibility
417  * in the type of operations the aggregator can perform.
418  *
419  * ### Details
420  * The add_vertex_aggregator() function is also templatized over both
421  * function types and there is no strong enforcement of the exact argument
422  * types of the map function and the reduce function. For instance, in the
423  * above example, the following print_finalize() variants may also be
424  * accepted.
425  *
426  * \code
427  * void print_finalize(engine_type::icontext_type& context, double total) {
428  * std::cout << total << "\n";
429  * }
430  *
431  * void print_finalize(engine_type::icontext_type& context, float& total) {
432  * std::cout << total << "\n";
433  * }
434  *
435  * void print_finalize(engine_type::icontext_type& context, const float& total) {
436  * std::cout << total << "\n";
437  * }
438  * \endcode
439  * In particlar, the last variation may be useful for performance reasons
440  * if the reduction type is large.
441  *
442  * ### Distributed Behavior
443  * To obtain consistent distributed behavior in the distributed setting,
444  * we designed the aggregator to minimize the amount of asymmetry among
445  * the machines. In particular, the finalize operation is guaranteed to be
446  * called on all machines. This therefore permits global variables to be
447  * modified on finalize since all machines are ensured to be eventually
448  * consistent.
449  *
450  * For instance, in the above example, print_finalize could
451  * store the result in a global variable:
452  * \code
453  * void print_finalize(engine_type::icontext_type& context, float total) {
454  * GLOBAL_TOTAL = total;
455  * }
456  * \endcode
457  * which will make it accessible to all other running update functions.
458  *
459  * \tparam ReductionType The output of the map function. Must have
460  * operator+= defined, and must be \ref sec_serializable.
461  * \tparam VertexMapperType The type of the map function.
462  * Not generally needed.
463  * Can be inferred by the compiler.
464  * \tparam FinalizerType The type of the finalize function.
465  * Not generally needed.
466  * Can be inferred by the compiler.
467  *
468  * \param [in] map_function The Map function to use. Must take an
469  * \param [in] key The name of this aggregator. Must be unique.
470  * \ref icontext_type& as its first argument, and
471  * a \ref vertex_type, or a reference to a
472  * \ref vertex_type as its second argument.
473  * Returns a ReductionType which must be summable
474  * and \ref sec_serializable .
475  * \param [in] finalize_function The Finalize function to use. Must take
476  * an \ref icontext_type& as its first
477  * argument and a ReductionType, or a
478  * reference to a ReductionType as its second
479  * argument.
480  */
481  template <typename ReductionType,
482  typename VertexMapType,
483  typename FinalizerType>
484  bool add_vertex_aggregator(const std::string& key,
485  VertexMapType map_function,
486  FinalizerType finalize_function) {
487  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
488  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
489 
490  aggregator_type* aggregator = get_aggregator();
491  if(aggregator == NULL) {
492  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
493  << std::endl;
494  return false; // does not return
495  }
496  return aggregator->template add_vertex_aggregator<ReductionType>(key, map_function,
497  finalize_function);
498  } // end of add vertex aggregator
499 
500 #if defined(__cplusplus) && __cplusplus >= 201103L
501  /**
502  * \brief An overload of add_vertex_aggregator for C++11 which does not
503  * require the user to provide the reduction type.
504  *
505  * This function is available only if the compiler has C++11 support.
506  * Specifically, it uses C++11's decltype operation to infer the
507  * reduction type, thus eliminating the need for the function
508  * call to be templatized over the reduction type. For instance,
509  * in the add_vertex_aggregator() example, it allows the following
510  * code to be written:
511  * \code
512  * engine.add_vertex_aggregator("absolute_vertex_sum",
513  * absolute_vertex_data,
514  * print_finalize);
515  * \endcode
516  *
517  * \tparam VertexMapperType The type of the map function.
518  * Not generally needed.
519  * Can be inferred by the compiler.
520  * \tparam FinalizerType The type of the finalize function.
521  * Not generally needed.
522  * Can be inferred by the compiler.
523  *
524  * \param [in] key The name of this aggregator. Must be unique.
525  * \param [in] map_function The Map function to use. Must take an
526  * \ref icontext_type& as its first argument, and
527  * a \ref vertex_type, or a reference to a
528  * \ref vertex_type as its second argument.
529  * Returns a ReductionType which must be summable
530  * and \ref sec_serializable .
531  * \param [in] finalize_function The Finalize function to use. Must take
532  * an \ref icontext_type& as its first
533  * argument and a ReductionType, or a
534  * reference to a ReductionType as its second
535  * argument.
536  */
537  template <typename VertexMapType,
538  typename FinalizerType>
539  bool add_vertex_aggregator(const std::string& key,
540  VertexMapType map_function,
541  FinalizerType finalize_function) {
542  aggregator_type* aggregator = get_aggregator();
543  if(aggregator == NULL) {
544  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
545  << std::endl;
546  return false; // does not return
547  }
548  return aggregator->add_vertex_aggregator(key, map_function,
549  finalize_function);
550  } // end of add vertex aggregator
551 
552 #endif
553 
554 
555  /**
556  * \brief Creates an edge aggregator. Returns true on success.
557  * Returns false if an aggregator of the same name already
558  * exists.
559  *
560  * Creates a edge aggregator associated to a particular key.
561  * The map_function is called over every edge in the graph, and the
562  * return value of the map is summed. The finalize_function is then called
563  * on the result of the reduction. The finalize_function is called on
564  * all machines. The map_function should only read the graph data,
565  * and should not make any modifications.
566 
567  *
568  * ### Basic Usage
569  * For instance, if the graph has float vertex data, and float edge data:
570  * \code
571  * typedef graphlab::distributed_graph<float, float> graph_type;
572  * \endcode
573  *
574  * An aggregator can be constructed to compute the absolute sum of all the
575  * edge data. To do this, we define two functions.
576  * \code
577  * float absolute_edge_data(engine_type::icontext_type& context,
578  * graph_type::edge_type edge) {
579  * return std::fabs(edge.data());
580  * }
581  *
582  * void print_finalize(engine_type::icontext_type& context, float total) {
583  * std::cout << total << "\n";
584  * }
585  * \endcode
586  *
587  * Next, we define the aggregator in the engine by calling
588  * add_edge_aggregator(). We must assign it a unique
589  * name which will be used to reference this particular aggregate
590  * operation. We shall call it "absolute_edge_sum".
591  * \code
592  * engine.add_edge_aggregator<float>("absolute_edge_sum",
593  * absolute_edge_data,
594  * print_finalize);
595  * \endcode
596  *
597  *
598  * When executed, the engine execute <code>absolute_edge_data()</code>
599  * on each edge in the graph. <code>absolute_edge_data()</code>
600  * reads the edge data, and returns its absolute value. All return
601  * values are then summing them together using the float's += operator.
602  * The final result is than passed to the <code>print_finalize</code>
603  * function. The template argument <code><float></code> is necessary to
604  * provide information about the return type of
605  * <code>absolute_edge_data</code>.
606  *
607  *
608  * This aggregator can be run immediately by calling
609  * aggregate_now() with the name of the aggregator.
610  * \code
611  * engine.aggregate_now("absolute_edge_sum");
612  * \endcode
613  *
614  * Or can be arranged to run periodically together with the engine
615  * execution (in this example, every 1.5 seconds).
616  * \code
617  * engine.aggregate_periodic("absolute_edge_sum", 1.5);
618  * \endcode
619  *
620  * Note that since finalize is called on <b>all machines</b>, multiple
621  * copies of the total will be printed. If only one copy is desired,
622  * see \ref graphlab::icontext::cout() "context.cout()" or to get
623  * the actual process ID using
624  * \ref graphlab::icontext::procid() "context.procid()"
625  *
626  * ### Details
627  * The add_edge_aggregator() function is also templatized over both
628  * function types and there is no strong enforcement of the exact argument
629  * types of the map function and the reduce function. For instance, in the
630  * above example, the following print_finalize() variants may also be
631  * accepted.
632  *
633  * \code
634  * void print_finalize(engine_type::icontext_type& context, double total) {
635  * std::cout << total << "\n";
636  * }
637  *
638  * void print_finalize(engine_type::icontext_type& context, float& total) {
639  * std::cout << total << "\n";
640  * }
641  *
642  * void print_finalize(engine_type::icontext_type& context, const float& total) {
643  * std::cout << total << "\n";
644  * }
645  * \endcode
646  * In particlar, the last variation may be useful for performance reasons
647  * if the reduction type is large.
648  *
649  * ### Distributed Behavior
650  * To obtain consistent distributed behavior in the distributed setting,
651  * we designed the aggregator to minimize the amount of asymmetry among
652  * the machines. In particular, the finalize operation is guaranteed to be
653  * called on all machines. This therefore permits global variables to be
654  * modified on finalize since all machines are ensured to be eventually
655  * consistent.
656  *
657  * For instance, in the above example, print_finalize could
658  * store the result in a global variable:
659  * \code
660  * void print_finalize(engine_type::icontext_type& context, float total) {
661  * GLOBAL_TOTAL = total;
662  * }
663  * \endcode
664  * which will make it accessible to all other running update functions.
665  *
666  * \tparam ReductionType The output of the map function. Must have
667  * operator+= defined, and must be \ref sec_serializable.
668  * \tparam EdgeMapperType The type of the map function.
669  * Not generally needed.
670  * Can be inferred by the compiler.
671  * \tparam FinalizerType The type of the finalize function.
672  * Not generally needed.
673  * Can be inferred by the compiler.
674  *
675  * \param [in] key The name of this aggregator. Must be unique.
676  * \param [in] map_function The Map function to use. Must take an
677  * \ref icontext_type& as its first argument, and
678  * a \ref edge_type, or a reference to a
679  * \ref edge_type as its second argument.
680  * Returns a ReductionType which must be summable
681  * and \ref sec_serializable .
682  * \param [in] finalize_function The Finalize function to use. Must take
683  * an \ref icontext_type& as its first
684  * argument and a ReductionType, or a
685  * reference to a ReductionType as its second
686  * argument.
687  */
688 
689  template <typename ReductionType,
690  typename EdgeMapType,
691  typename FinalizerType>
692  bool add_edge_aggregator(const std::string& key,
693  EdgeMapType map_function,
694  FinalizerType finalize_function) {
695  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
696  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
697  aggregator_type* aggregator = get_aggregator();
698  if(aggregator == NULL) {
699  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
700  << std::endl;
701  return false; // does not return
702  }
703  return aggregator->template add_edge_aggregator<ReductionType>
704  (key, map_function, finalize_function);
705  } // end of add edge aggregator
706 
707 
708 #if defined(__cplusplus) && __cplusplus >= 201103L
709 
710  /**
711  * \brief An overload of add_edge_aggregator for C++11 which does not
712  * require the user to provide the reduction type.
713  *
714  * This function is available only if the compiler has C++11 support.
715  * Specifically, it uses C++11's decltype operation to infer the
716  * reduction type, thus eliminating the need for the function
717  * call to be templatized over the reduction type. For instance,
718  * in the add_edge_aggregator() example, it allows the following
719  * code to be written:
720  * \code
721  * engine.add_edge_aggregator("absolute_edge_sum",
722  * absolute_edge_data,
723  * print_finalize);
724  * \endcode
725  *
726  * \tparam EdgeMapperType The type of the map function.
727  * Not generally needed.
728  * Can be inferred by the compiler.
729  * \tparam FinalizerType The type of the finalize function.
730  * Not generally needed.
731  * Can be inferred by the compiler.
732  *
733  * \param [in] key The name of this aggregator. Must be unique.
734  * \param [in] map_function The Map function to use. Must take an
735  * \ref icontext_type& as its first argument, and
736  * a \ref vertex_type, or a reference to a
737  * \ref vertex_type as its second argument.
738  * Returns a ReductionType which must be summable
739  * and \ref sec_serializable .
740  * \param [in] finalize_function The Finalize function to use. Must take
741  * an \ref icontext_type& as its first
742  * argument and a ReductionType, or a
743  * reference to a ReductionType as its second
744  * argument.
745  */
746  template <typename EdgeMapType,
747  typename FinalizerType>
748  bool add_edge_aggregator(const std::string& key,
749  EdgeMapType map_function,
750  FinalizerType finalize_function) {
751  aggregator_type* aggregator = get_aggregator();
752  if(aggregator == NULL) {
753  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
754  << std::endl;
755  return false; // does not return
756  }
757  return aggregator->add_edge_aggregator(key, map_function,
758  finalize_function);
759  } // end of add edge aggregator
760 #endif
761 
762  /**
763  * \brief Performs an immediate aggregation on a key
764  *
765  * Performs an immediate aggregation on a key. All machines must
766  * call this simultaneously. If the key is not found,
767  * false is returned. Otherwise returns true on success.
768  *
769  * For instance, the following code will run the aggregator
770  * with the name "absolute_vertex_sum" immediately.
771  * \code
772  * engine.aggregate_now("absolute_vertex_sum");
773  * \endcode
774  *
775  * \param[in] key Key to aggregate now. Must be a key
776  * previously created by add_vertex_aggregator()
777  * or add_edge_aggregator().
778  * \return False if key not found, True on success.
779  */
780  bool aggregate_now(const std::string& key) {
781  aggregator_type* aggregator = get_aggregator();
782  if(aggregator == NULL) {
783  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
784  << std::endl;
785  return false; // does not return
786  }
787  return aggregator->aggregate_now(key);
788  } // end of aggregate_now
789 
790 
791  /**
792  * \brief Performs a map-reduce operation on each vertex in the
793  * graph returning the result.
794  *
795  * Given a map function, map_reduce_vertices() call the map function on all
796  * vertices in the graph. The return values are then summed together and the
797  * final result returned. The map function should only read the vertex data
798  * and should not make any modifications. map_reduce_vertices() must be
799  * called on all machines simultaneously.
800  *
801  * ### Basic Usage
802  * For instance, if the graph has float vertex data, and float edge data:
803  * \code
804  * typedef graphlab::distributed_graph<float, float> graph_type;
805  * \endcode
806  *
807  * To compute an absolute sum over all the vertex data, we would write
808  * a function which reads in each a vertex, and returns the absolute
809  * value of the data on the vertex.
810  * \code
811  * float absolute_vertex_data(engine_type::icontext_type& context,
812  * graph_type::vertex_type vertex) {
813  * return std::fabs(vertex.data());
814  * }
815  * \endcode
816  * After which calling:
817  * \code
818  * float sum = engine.map_reduce_vertices<float>(absolute_vertex_data);
819  * \endcode
820  * will call the <code>absolute_vertex_data()</code> function
821  * on each vertex in the graph. <code>absolute_vertex_data()</code>
822  * reads the value of the vertex and returns the absolute result.
823  * This return values are then summed together and returned.
824  * All machines see the same result.
825  *
826  * The template argument <code><float></code> is needed to inform
827  * the compiler regarding the return type of the mapfunction.
828  *
829  * ### Signalling
830  * Another common use for the map_reduce_vertices() function is
831  * in signalling. Since the map function is passed a context, it
832  * can be used to perform signalling of vertices for execution
833  * during a later \ref start() "engine.start()" call.
834  *
835  * For instance, the following code will signal all vertices
836  * with value >= 1
837  * \code
838  * graphlab::empty signal_vertices(engine_type::icontext_type& context,
839  * graph_type::vertex_type vertex) {
840  * if (vertex.data() >= 1) context.signal(vertex);
841  * return graphlab::empty()
842  * }
843  * \endcode
844  * Note that in this case, we are not interested in a reduction
845  * operation, and thus we return a graphlab::empty object.
846  * Calling:
847  * \code
848  * engine.map_reduce_vertices<graphlab::empty>(signal_vertices);
849  * \endcode
850  * will run <code>signal_vertices()</code> on all vertices,
851  * signalling all vertices with value <= 1
852  *
853  * ### Relations
854  * The map function has the same structure as that in
855  * add_vertex_aggregator() and may be reused in an aggregator.
856  * This function is also very similar to
857  * graphlab::distributed_graph::map_reduce_vertices()
858  * with the difference that this takes a context and thus
859  * can be used to perform signalling.
860  * Finally transform_vertices() can be used to perform a similar
861  * but may also make modifications to graph data.
862  *
863  * \tparam ReductionType The output of the map function. Must have
864  * operator+= defined, and must be \ref sec_serializable.
865  * \tparam VertexMapperType The type of the map function.
866  * Not generally needed.
867  * Can be inferred by the compiler.
868  * \param mapfunction The map function to use. Must take an
869  * \ref icontext_type& as its first argument, and
870  * a \ref vertex_type, or a reference to a
871  * \ref vertex_type as its second argument.
872  * Returns a ReductionType which must be summable
873  * and \ref sec_serializable .
874  */
875  template <typename ReductionType, typename VertexMapperType>
876  ReductionType map_reduce_vertices(VertexMapperType mapfunction) {
877  aggregator_type* aggregator = get_aggregator();
878  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
879  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
880 
881  if(aggregator == NULL) {
882  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
883  << std::endl;
884  return ReductionType(); // does not return
885  }
886  return aggregator->template map_reduce_vertices<ReductionType>(mapfunction);
887  }
888 
889  /**
890  * \brief Performs a map-reduce operation on each edge in the
891  * graph returning the result.
892  *
893  * Given a map function, map_reduce_edges() call the map function on all
894  * edges in the graph. The return values are then summed together and the
895  * final result returned. The map function should only read data
896  * and should not make any modifications. map_reduce_edges() must be
897  * called on all machines simultaneously.
898  *
899  * ### Basic Usage
900  * For instance, if the graph has float vertex data, and float edge data:
901  * \code
902  * typedef graphlab::distributed_graph<float, float> graph_type;
903  * \endcode
904  *
905  * To compute an absolute sum over all the edge data, we would write
906  * a function which reads in each a edge, and returns the absolute
907  * value of the data on the edge.
908  * \code
909  * float absolute_edge_data(engine_type::icontext_type& context,
910  * graph_type::edge_type edge) {
911  * return std::fabs(edge.data());
912  * }
913  * \endcode
914  * After which calling:
915  * \code
916  * float sum = engine.map_reduce_edges<float>(absolute_edge_data);
917  * \endcode
918  * will call the <code>absolute_edge_data()</code> function
919  * on each edge in the graph. <code>absolute_edge_data()</code>
920  * reads the value of the edge and returns the absolute result.
921  * This return values are then summed together and returned.
922  * All machines see the same result.
923  *
924  * The template argument <code><float></code> is needed to inform
925  * the compiler regarding the return type of the mapfunction.
926  *
927  * ### Signalling
928  * Another common use for the map_reduce_edges() function is
929  * in signalling. Since the map function is passed a context, it
930  * can be used to perform signalling of edges for execution
931  * during a later \ref start() "engine.start()" call.
932  *
933  * For instance, the following code will signal the source
934  * vertex of each edge.
935  * \code
936  * graphlab::empty signal_source(engine_type::icontext_type& context,
937  * graph_type::edge_type edge) {
938  * context.signal(edge.source());
939  * return graphlab::empty()
940  * }
941  * \endcode
942  * Note that in this case, we are not interested in a reduction
943  * operation, and thus we return a graphlab::empty object.
944  * Calling:
945  * \code
946  * engine.map_reduce_edges<graphlab::empty>(signal_source);
947  * \endcode
948  * will run <code>signal_source()</code> on all edges,
949  * signalling all source vertices.
950  *
951  * ### Relations
952  * The map function has the same structure as that in
953  * add_edge_aggregator() and may be reused in an aggregator.
954  * This function is also very similar to
955  * graphlab::distributed_graph::map_reduce_edges()
956  * with the difference that this takes a context and thus
957  * can be used to perform signalling.
958  * Finally transform_edges() can be used to perform a similar
959  * but may also make modifications to graph data.
960  *
961  * \tparam ReductionType The output of the map function. Must have
962  * operator+= defined, and must be \ref sec_serializable.
963  * \tparam EdgeMapperType The type of the map function.
964  * Not generally needed.
965  * Can be inferred by the compiler.
966  * \param mapfunction The map function to use. Must take an
967  * \ref icontext_type& as its first argument, and
968  * a \ref edge_type, or a reference to a
969  * \ref edge_type as its second argument.
970  * Returns a ReductionType which must be summable
971  * and \ref sec_serializable .
972  */
973  template <typename ReductionType, typename EdgeMapperType>
974  ReductionType map_reduce_edges(EdgeMapperType mapfunction) {
975  aggregator_type* aggregator = get_aggregator();
976  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
977  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
978  if(aggregator == NULL) {
979  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
980  << std::endl;
981  return ReductionType(); // does not return
982  }
983  return aggregator->template map_reduce_edges<ReductionType>(mapfunction);
984  }
985 
986 
987  /**
988  * \brief Performs a transformation operation on each vertex in the graph.
989  *
990  * Given a mapfunction, transform_vertices() calls mapfunction on
991  * every vertex in graph. The map function may make modifications
992  * to the data on the vertex. transform_vertices() must be called by all
993  * machines simultaneously.
994  *
995  * ### Basic Usage
996  * For instance, if the graph has integer vertex data, and integer edge
997  * data:
998  * \code
999  * typedef graphlab::distributed_graph<size_t, size_t> graph_type;
1000  * \endcode
1001  *
1002  * To set each vertex value to be the number of out-going edges,
1003  * we may write the following function:
1004  * \code
1005  * void set_vertex_value(engine_type::icontext_type& context,
1006  * graph_type::vertex_type vertex) {
1007  * vertex.data() = vertex.num_out_edges();
1008  * }
1009  * \endcode
1010  *
1011  * Calling transform_vertices():
1012  * \code
1013  * engine.transform_vertices(set_vertex_value);
1014  * \endcode
1015  * will run the <code>set_vertex_value()</code> function
1016  * on each vertex in the graph, setting its new value.
1017  *
1018  * ### Signalling
1019  * Since the mapfunction is provided with a context, the mapfunction
1020  * can also be used to perform signalling. For instance, the
1021  * <code>set_vertex_value</code> function above may be modified to set
1022  * the value of the vertex, but to also signal the vertex if
1023  * it has more than 5 outgoing edges.
1024  *
1025  * \code
1026  * void set_vertex_value(engine_type::icontext_type& context,
1027  * graph_type::vertex_type vertex) {
1028  * vertex.data() = vertex.num_out_edges();
1029  * if (vertex.num_out_edges() > 5) context.signal(vertex);
1030  * }
1031  * \endcode
1032  *
1033  * However, if the purpose of the function is to only signal
1034  * without making modifications, map_reduce_vertices() will be
1035  * more efficient as this function will additionally perform
1036  * distributed synchronization of modified data.
1037  *
1038  * ### Relations
1039  * map_reduce_vertices() provide similar signalling functionality,
1040  * but should not make modifications to graph data.
1041  * graphlab::distributed_graph::transform_vertices() provide
1042  * the same graph modification capabilities, but without a context
1043  * and thus cannot perform signalling.
1044  *
1045  * \tparam VertexMapperType The type of the map function.
1046  * Not generally needed.
1047  * Can be inferred by the compiler.
1048  * \param mapfunction The map function to use. Must take an
1049  * \ref icontext_type& as its first argument, and
1050  * a \ref vertex_type, or a reference to a
1051  * \ref vertex_type as its second argument.
1052  * Returns void.
1053  */
1054  template <typename VertexMapperType>
1055  void transform_vertices(VertexMapperType mapfunction) {
1056  aggregator_type* aggregator = get_aggregator();
1057  if(aggregator == NULL) {
1058  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
1059  << std::endl;
1060  return; // does not return
1061  }
1062  aggregator->transform_vertices(mapfunction);
1063  }
1064 
1065  /**
1066  * \brief Performs a transformation operation on each edge in the graph.
1067  *
1068  * Given a mapfunction, transform_edges() calls mapfunction on
1069  * every edge in graph. The map function may make modifications
1070  * to the data on the edge. transform_edges() must be called on
1071  * all machines simultaneously.
1072  *
1073  * ### Basic Usage
1074  * For instance, if the graph has integer vertex data, and integer edge
1075  * data:
1076  * \code
1077  * typedef graphlab::distributed_graph<size_t, size_t> graph_type;
1078  * \endcode
1079  *
1080  * To set each edge value to be the number of out-going edges
1081  * of the target vertex, we may write the following:
1082  * \code
1083  * void set_edge_value(engine_type::icontext_type& context,
1084  * graph_type::edge_type edge) {
1085  * edge.data() = edge.target().num_out_edges();
1086  * }
1087  * \endcode
1088  *
1089  * Calling transform_edges():
1090  * \code
1091  * engine.transform_edges(set_edge_value);
1092  * \endcode
1093  * will run the <code>set_edge_value()</code> function
1094  * on each edge in the graph, setting its new value.
1095  *
1096  * ### Signalling
1097  * Since the mapfunction is provided with a context, the mapfunction
1098  * can also be used to perform signalling. For instance, the
1099  * <code>set_edge_value</code> function above may be modified to set
1100  * the value of the edge, but to also signal the target vertex.
1101  *
1102  * \code
1103  * void set_edge_value(engine_type::icontext_type& context,
1104  * graph_type::edge_type edge) {
1105  * edge.data() = edge.target().num_out_edges();
1106  * context.signal(edge.target());
1107  * }
1108  * \endcode
1109  *
1110  * However, if the purpose of the function is to only signal
1111  * without making modifications, map_reduce_edges() will be
1112  * more efficient as this function will additionally perform
1113  * distributed synchronization of modified data.
1114  *
1115  * ### Relations
1116  * map_reduce_edges() provide similar signalling functionality,
1117  * but should not make modifications to graph data.
1118  * graphlab::distributed_graph::transform_edges() provide
1119  * the same graph modification capabilities, but without a context
1120  * and thus cannot perform signalling.
1121  *
1122  * \tparam EdgeMapperType The type of the map function.
1123  * Not generally needed.
1124  * Can be inferred by the compiler.
1125  * \param mapfunction The map function to use. Must take an
1126  * \ref icontext_type& as its first argument, and
1127  * a \ref edge_type, or a reference to a
1128  * \ref edge_type as its second argument.
1129  * Returns void.
1130  */
1131  template <typename EdgeMapperType>
1132  void transform_edges(EdgeMapperType mapfunction) {
1133  aggregator_type* aggregator = get_aggregator();
1134  if(aggregator == NULL) {
1135  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
1136  << std::endl;
1137  return; // does not return
1138  }
1139  aggregator->transform_edges(mapfunction);
1140  }
1141 
1142  /**
1143  * \brief Requests that a particular aggregation key
1144  * be recomputed periodically when the engine is running.
1145  *
1146  * Requests that the aggregator with a given key be aggregated
1147  * every certain number of seconds when the engine is running.
1148  * Note that the period is prescriptive: in practice the actual
1149  * period will be larger than the requested period.
1150  * Seconds must be >= 0;
1151  *
1152  * For instance, the following code will schedule the aggregator
1153  * with the name "absolute_vertex_sum" to run every 1.5 seconds.
1154  * \code
1155  * engine.aggregate_periodic("absolute_vertex_sum", 1.5);
1156  * \endcode
1157  *
1158  * \param [in] key Key to schedule. Must be a key
1159  * previously created by add_vertex_aggregator()
1160  * or add_edge_aggregator().
1161  * \param [in] seconds How frequently to schedule. Must be >=
1162  * 0. seconds == 0 will ensure that this key is continously
1163  * recomputed.
1164  *
1165  * All machines must call simultaneously.
1166  * \return Returns true if key is found and seconds >= 0,
1167  * and false otherwise.
1168  */
1169  bool aggregate_periodic(const std::string& key, float seconds) {
1170  aggregator_type* aggregator = get_aggregator();
1171  if(aggregator == NULL) {
1172  logstream(LOG_FATAL) << "Aggregation not supported by this engine!"
1173  << std::endl;
1174  return false; // does not return
1175  }
1176  return aggregator->aggregate_periodic(key, seconds);
1177  } // end of aggregate_periodic
1178 
1179 
1180 
1181  /**
1182  * \cond GRAPHLAB_INTERNAL
1183  * \internal
1184  * \brief This is used by iengine to get the
1185  * \ref distributed_aggregator from the derived class to support
1186  * the local templated aggregator interface.
1187  *
1188  * \return a pointer to the distributed aggregator for that
1189  * engine. If no aggregator is available or aggregation is not
1190  * supported then return NULL.
1191  */
1192  virtual aggregator_type* get_aggregator() = 0;
1193  /// \endcond
1194  }; // end of iengine interface
1195 
1196 } // end of namespace graphlab
1197 
1198 #endif
1199