GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
semi_synchronous_engine.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 #ifndef GRAPHLAB_SEMI_SYNCHRONOUS_ENGINE_HPP
26 #define GRAPHLAB_SEMI_SYNCHRONOUS_ENGINE_HPP
27 
28 #include <deque>
29 #include <algorithm>
30 #include <boost/bind.hpp>
31 
32 #include <graphlab/engine/iengine.hpp>
33 
34 #include <graphlab/vertex_program/ivertex_program.hpp>
35 #include <graphlab/vertex_program/icontext.hpp>
36 #include <graphlab/vertex_program/context.hpp>
37 
38 #include <graphlab/engine/execution_status.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
40 
41 
42 
43 
44 #include <graphlab/parallel/pthread_tools.hpp>
45 #include <graphlab/parallel/atomic_add_vector.hpp>
46 #include <graphlab/parallel/lockfree_push_back.hpp>
47 #include <graphlab/util/tracepoint.hpp>
48 #include <graphlab/util/memory_info.hpp>
49 
50 #include <graphlab/rpc/dc_dist_object.hpp>
51 #include <graphlab/rpc/distributed_event_log.hpp>
52 #include <graphlab/rpc/buffered_exchange.hpp>
53 
54 
55 
56 
57 
58 
59 #include <graphlab/macros_def.hpp>
60 
61 namespace graphlab {
62 
63 
64  /**
65  * \ingroup engines
66  *
67  * \brief The semi synchronous engine executes a fraction of all active
68  * vertex program synchronously in a sequence of super-step (iterations)
69  * in both the shared and distributed memory settings.
70  *
71  * \tparam VertexProgram The user defined vertex program which
72  * should implement the \ref graphlab::ivertex_program interface.
73  *
74  *
75  * ### Execution Semantics
76  *
77  * The semi synchronous engine is functionally "in between" the synchronous
78  * and the asynchronous engine. It behaves like the synchronous engine and
79  * runs vertex progams in super-steps, but it only runs a subset of the set of
80  * active vertices each round, thus achieving asynchronous-like operation
81  * using a synchronous execution.
82  *
83  * On start() the \ref graphlab::ivertex_program::init function is invoked
84  * on the subset of active vertex programs to initialize the vertex
85  * program, vertex data, and possibly signal vertices.
86  * The engine then proceeds to execute a sequence of
87  * super-steps (iterations) each of which is further decomposed into a
88  * sequence of minor-steps which are also executed synchronously:
89  * \li Receive all incoming messages (signals) by invoking the
90  * \ref graphlab::ivertex_program::init function on all
91  * vertex-programs that have incoming messages. If a
92  * vertex-program does not have any incoming messages then it is
93  * not active during this super-step.
94  * \li Execute all gathers for active vertex programs by invoking
95  * the user defined \ref graphlab::ivertex_program::gather function
96  * on the edge direction returned by the
97  * \ref graphlab::ivertex_program::gather_edges function. The gather
98  * functions can modify edge data but cannot modify the vertex
99  * program or vertex data and therefore can be executed on multiple
100  * edges in parallel. The gather type is used to accumulate (sum)
101  * the result of the gather function calls.
102  * \li Execute all apply functions for active vertex-programs by
103  * invoking the user defined \ref graphlab::ivertex_program::apply
104  * function passing the sum of the gather functions. If \ref
105  * graphlab::ivertex_program::gather_edges returns no edges then
106  * the default gather value is passed to apply. The apply function
107  * can modify the vertex program and vertex data.
108  * \li Execute all scatters for active vertex programs by invoking
109  * the user defined \ref graphlab::ivertex_program::scatter function
110  * on the edge direction returned by the
111  * \ref graphlab::ivertex_program::scatter_edges function. The scatter
112  * functions can modify edge data but cannot modify the vertex
113  * program or vertex data and therefore can be executed on multiple
114  * edges in parallel.
115  *
116  * ### Construction
117  *
118  * The semi-synchronous engine is constructed by passing in a
119  * \ref graphlab::distributed_control object which manages coordination
120  * between engine threads and a \ref graphlab::distributed_graph object
121  * which is the graph on which the engine should be run. The graph should
122  * already be populated and cannot change after the engine is constructed.
123  * In the distributed setting all program instances (running on each machine)
124  * should construct an instance of the engine at the same time.
125  *
126  * Computation is initiated by signaling vertices using either
127  * \ref graphlab::semi_synchronous_engine::signal or
128  * \ref graphlab::semi_synchronous_engine::signal_all. In either case all
129  * machines should invoke signal or signal all at the same time. Finally,
130  * computation is initiated by calling the
131  * \ref graphlab::semi_synchronous_engine::start function.
132  *
133  * ### Example Usage
134  *
135  * The following is a simple example demonstrating how to use the engine:
136  * \code
137  * #include <graphlab.hpp>
138  *
139  * struct vertex_data {
140  * // code
141  * };
142  * struct edge_data {
143  * // code
144  * };
145  * typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;
146  * typedef float gather_type;
147  * struct pagerank_vprog :
148  * public graphlab::ivertex_program<graph_type, gather_type> {
149  * // code
150  * };
151  *
152  * int main(int argc, char** argv) {
153  * // Initialize control plain using mpi
154  * graphlab::mpi_tools::init(argc, argv);
155  * graphlab::distributed_control dc;
156  * // Parse command line options
157  * graphlab::command_line_options clopts("PageRank algorithm.");
158  * std::string graph_dir;
159  * clopts.attach_option("graph", &graph_dir, graph_dir,
160  * "The graph file.");
161  * if(!clopts.parse(argc, argv)) {
162  * std::cout << "Error in parsing arguments." << std::endl;
163  * return EXIT_FAILURE;
164  * }
165  * graph_type graph(dc, clopts);
166  * graph.load_structure(graph_dir, "tsv");
167  * graph.finalize();
168  * std::cout << "#vertices: " << graph.num_vertices()
169  * << " #edges:" << graph.num_edges() << std::endl;
170  * graphlab::semi_synchronous_engine<pagerank_vprog> engine(dc, graph, clopts);
171  * engine.signal_all();
172  * engine.start();
173  * std::cout << "Runtime: " << engine.elapsed_time();
174  * graphlab::mpi_tools::finalize();
175  * }
176  * \endcode
177  *
178  *
179  *
180  * <a name=engineopts>Engine Options</a>
181  * =====================
182  * The semi synchronous engine supports several engine options which can
183  * be set as command line arguments using \c --engine_opts :
184  *
185  * \li <b>max_iterations</b>: (default: infinity) The maximum number
186  * of iterations (super-steps) to run.
187  *
188  * \li <b>timeout</b>: (default: infinity) The maximum time in
189  * seconds that the engine may run. When the time runs out the
190  * current iteration is completed and then the engine terminates.
191  *
192  * \li <b>use_cache</b>: (default: false) This is used to enable
193  * caching. When caching is enabled the gather phase is skipped for
194  * vertices that already have a cached value. To use caching the
195  * vertex program must either clear (\ref icontext::clear_gather_cache)
196  * or update (\ref icontext::post_delta) the cache values of
197  * neighboring vertices during the scatter phase.
198  *
199  * \li \b snapshot_interval If set to a positive value, a snapshot
200  * is taken every this number of iterations. If set to 0, a snapshot
201  * is taken before the first iteration. If set to a negative value,
202  * no snapshots are taken. Defaults to -1. A snapshot is a binary
203  * dump of the graph.
204  *
205  * \li \b snapshot_path If snapshot_interval is set to a value >=0,
206  * this option must be specified and should contain a target basename
207  * for the snapshot. The path including folder and file prefix in
208  * which the snapshots should be saved.
209  *
210  * \see graphlab::omni_engine
211  * \see graphlab::async_consistent_engine
212  * \see graphlab::synchronous_engine
213  */
214  template<typename VertexProgram>
216  public iengine<VertexProgram> {
217 
218  public:
219  /**
220  * \brief The user defined vertex program type. Equivalent to the
221  * VertexProgram template argument.
222  *
223  * The user defined vertex program type which should implement the
224  * \ref graphlab::ivertex_program interface.
225  */
226  typedef VertexProgram vertex_program_type;
227 
228  /**
229  * \brief The user defined type returned by the gather function.
230  *
231  * The gather type is defined in the \ref graphlab::ivertex_program
232  * interface and is the value returned by the
233  * \ref graphlab::ivertex_program::gather function. The
234  * gather type must have an <code>operator+=(const gather_type&
235  * other)</code> function and must be \ref sec_serializable.
236  */
237  typedef typename VertexProgram::gather_type gather_type;
238 
239 
240  /**
241  * \brief The user defined message type used to signal neighboring
242  * vertex programs.
243  *
244  * The message type is defined in the \ref graphlab::ivertex_program
245  * interface and used in the call to \ref graphlab::icontext::signal.
246  * The message type must have an
247  * <code>operator+=(const gather_type& other)</code> function and
248  * must be \ref sec_serializable.
249  */
250  typedef typename VertexProgram::message_type message_type;
251 
252  /**
253  * \brief The type of data associated with each vertex in the graph
254  *
255  * The vertex data type must be \ref sec_serializable.
256  */
257  typedef typename VertexProgram::vertex_data_type vertex_data_type;
258 
259  /**
260  * \brief The type of data associated with each edge in the graph
261  *
262  * The edge data type must be \ref sec_serializable.
263  */
264  typedef typename VertexProgram::edge_data_type edge_data_type;
265 
266  /**
267  * \brief The type of graph supported by this vertex program
268  *
269  * See graphlab::distributed_graph
270  */
271  typedef typename VertexProgram::graph_type graph_type;
272 
273  /**
274  * \brief The type used to represent a vertex in the graph.
275  * See \ref graphlab::distributed_graph::vertex_type for details
276  *
277  * The vertex type contains the function
278  * \ref graphlab::distributed_graph::vertex_type::data which
279  * returns a reference to the vertex data as well as other functions
280  * like \ref graphlab::distributed_graph::vertex_type::num_in_edges
281  * which returns the number of in edges.
282  *
283  */
285 
286  /**
287  * \brief The type used to represent an edge in the graph.
288  * See \ref graphlab::distributed_graph::edge_type for details.
289  *
290  * The edge type contains the function
291  * \ref graphlab::distributed_graph::edge_type::data which returns a
292  * reference to the edge data. In addition the edge type contains
293  * the function \ref graphlab::distributed_graph::edge_type::source and
294  * \ref graphlab::distributed_graph::edge_type::target.
295  *
296  */
298 
299  /**
300  * \brief The type of the callback interface passed by the engine to vertex
301  * programs. See \ref graphlab::icontext for details.
302  *
303  * The context callback is passed to the vertex program functions and is
304  * used to signal other vertices, get the current iteration, and access
305  * information about the engine.
306  */
308 
309  private:
310 
311  /**
312  * \brief Local vertex type used by the engine for fast indexing
313  */
314  typedef typename graph_type::local_vertex_type local_vertex_type;
315 
316  /**
317  * \brief Local edge type used by the engine for fast indexing
318  */
319  typedef typename graph_type::local_edge_type local_edge_type;
320 
321  /**
322  * \brief Local vertex id type used by the engine for fast indexing
323  */
324  typedef typename graph_type::lvid_type lvid_type;
325 
326  std::vector<double> per_thread_compute_time;
327  /**
328  * \brief The actual instance of the context type used by this engine.
329  */
331  friend class context<semi_synchronous_engine>;
332 
333 
334  /**
335  * \brief The type of the distributed aggregator inherited from iengine
336  */
338 
339  /**
340  * \brief The object used to communicate with remote copies of the
341  * synchronous engine.
342  */
344 
345  /**
346  * \brief A reference to the distributed graph on which this
347  * synchronous engine is running.
348  */
349  graph_type& graph;
350 
351  /**
352  * \brief The local worker threads used by this engine
353  */
354  thread_pool threads;
355 
356  /**
357  * \brief A thread barrier that is used to control the threads in the
358  * thread pool.
359  */
360  graphlab::barrier thread_barrier;
361 
362  /**
363  * \brief The maximum number of super-steps (iterations) to run
364  * before terminating. If the max iterations is reached the
365  * engine will terminate if their are no messages remaining.
366  */
367  size_t max_iterations;
368 
369  /**
370  * \brief A snapshot is taken every this number of iterations.
371  * If snapshot_interval == 0, a snapshot is only taken before the first
372  * iteration. If snapshot_interval < 0, no snapshots are taken.
373  */
374  int snapshot_interval;
375 
376  /// \brief The target base name the snapshot is saved in.
377  std::string snapshot_path;
378 
379  /**
380  * \brief A counter that tracks the current iteration number since
381  * start was last invoked.
382  */
383  size_t iteration_counter;
384 
385  /**
386  * \brief set to true if engine is running
387  */
388  bool started;
389 
390  /**
391  * \brief The time in seconds at which the engine started.
392  */
393  float start_time;
394 
395  /**
396  * \brief The timeout time in seconds
397  */
398  float timeout;
399 
400  /**
401  * \brief Used to stop the engine prematurely
402  */
403  bool force_abort;
404 
405 
406  /**
407  * \brief the minimum number of active vertices per round
408  */
409  size_t max_active_vertices;
410 
411  /**
412  * \brief The vertex locks protect access to vertex specific
413  * data-structures including
414  * \ref graphlab::semi_synchronous_engine::gather_accum
415  * and \ref graphlab::semi_synchronous_engine::messages.
416  */
417  std::vector<simple_spinlock> vlocks;
418 
419 
420  /**
421  * \brief The elocks protect individual edges during gather and
422  * scatter. Technically there is a potential race since gather
423  * and scatter can modify edge values and can overlap. The edge
424  * lock ensures that only one gather or scatter occurs on an edge
425  * at a time.
426  */
427  std::vector<simple_spinlock> elocks;
428 
429 
430 
431  /**
432  * \brief The vertex programs associated with each vertex on this
433  * machine.
434  */
435  std::vector<vertex_program_type> vertex_programs;
436 
437 
438  /// A pointer to the scheduler object
439  ischeduler<message_type>* scheduler_ptr;
440 
441 
442  dense_bitset has_remote_message;
443 
444  /** \brief used by transfer_scheduler_to_active. The number of vertices
445  * to use in the next superstep.
446  */
447  atomic<int> num_to_activate;
448 
449  /**
450  * \brief Gather accumulator used for each master vertex to merge
451  * the result of all the machine specific accumulators (or
452  * caches).
453  *
454  * The gather accumulator can be accessed by multiple threads at
455  * once and therefore must be guarded by a vertex locks in
456  * \ref graphlab::semi_synchronous_engine::vlocks
457  */
458  std::vector<gather_type> gather_accum;
459 
460  /**
461  * \brief Bit indicating if the gather has accumulator contains any
462  * values.
463  *
464  * While dense bitsets are thread safe the value of this bit must
465  * change concurrently with the
466  * \ref graphlab::semi_synchronous_engine::gather_accum and therefore is
467  * set while holding the lock in
468  * \ref graphlab::semi_synchronous_engine::vlocks.
469  */
470  dense_bitset has_gather_accum;
471 
472 
473  /**
474  * \brief This optional vector contains caches of previous gather
475  * contributions for each machine.
476  *
477  * Caching is done locally and therefore a high-degree vertex may
478  * have multiple caches (one per machine).
479  */
480  std::vector<gather_type> gather_cache;
481 
482  /**
483  * \brief A bit indicating if the local gather for that vertex is
484  * available.
485  */
486  dense_bitset has_cache;
487 
488  std::vector<lvid_type> active_superstep;
489  lockfree_push_back<std::vector<lvid_type> > active_superstep_pushback;
490 
491  /**
492  * \brief The number of local vertices (masters) that are active on this
493  * iteration.
494  */
495  atomic<size_t> num_active_vertices;
496 
497 
498  /**
499  * \brief A bit indicating (for all vertices) whether to
500  * participate in the current minor-step (gather or scatter).
501  */
502  std::vector<lvid_type> active_minorstep;
503  lockfree_push_back<std::vector<lvid_type> > active_minorstep_pushback;
504 
505  /**
506  * \brief A counter measuring the number of applys that have been completed
507  */
508  atomic<size_t> completed_applys;
509 
510 
511  /**
512  * \brief The shared counter used coordinate operations between
513  * threads.
514  */
515  atomic<size_t> shared_lvid_counter;
516 
517 
518  /**
519  * \brief The pair type used to synchronize vertex programs across machines.
520  */
521  typedef std::pair<vertex_id_type, vertex_program_type> vid_prog_pair_type;
522 
523  /**
524  * \brief The type of the exchange used to synchronize vertex programs
525  */
526  typedef buffered_exchange<vid_prog_pair_type> vprog_exchange_type;
527 
528  /**
529  * \brief The distributed exchange used to synchronize changes to
530  * vertex programs.
531  */
532  vprog_exchange_type vprog_exchange;
533 
534  /**
535  * \brief The pair type used to synchronize vertex across across machines.
536  */
537  typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
538 
539  /**
540  * \brief The type of the exchange used to synchronize vertex data
541  */
542  typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
543 
544  /**
545  * \brief The distributed exchange used to synchronize changes to
546  * vertex programs.
547  */
548  vdata_exchange_type vdata_exchange;
549 
550  /**
551  * \brief The pair type used to synchronize the results of the gather phase
552  */
553  typedef std::pair<vertex_id_type, gather_type> vid_gather_pair_type;
554 
555  /**
556  * \brief The type of the exchange used to synchronize gather
557  * accumulators
558  */
559  typedef buffered_exchange<vid_gather_pair_type> gather_exchange_type;
560 
561  /**
562  * \brief The distributed exchange used to synchronize gather
563  * accumulators.
564  */
565  gather_exchange_type gather_exchange;
566 
567  /**
568  * \brief The pair type used to synchronize messages
569  */
570  typedef std::pair<vertex_id_type, message_type> vid_message_pair_type;
571 
572  /**
573  * \brief The type of the exchange used to synchronize messages
574  */
575  typedef buffered_exchange<vid_message_pair_type> message_exchange_type;
576 
577  /**
578  * \brief The distributed exchange used to synchronize messages
579  */
580  message_exchange_type message_exchange;
581 
582 
583  /**
584  * \brief The distributed aggregator used to manage background
585  * aggregation.
586  */
587  aggregator_type aggregator;
588 
589  DECLARE_EVENT(EVENT_APPLIES);
590  DECLARE_EVENT(EVENT_GATHERS);
591  DECLARE_EVENT(EVENT_SCATTERS);
592  DECLARE_EVENT(EVENT_ACTIVE_CPUS);
593  public:
594 
595  /**
596  * \brief Construct a semi synchronous engine for a given graph and options.
597  *
598  * The semi synchronous engine should be constructed after the graph
599  * has been loaded (e.g., \ref graphlab::distributed_graph::load)
600  * and the graphlab options have been set
601  * (e.g., \ref graphlab::command_line_options).
602  *
603  * In the distributed engine the semi synchronous engine must be called
604  * on all machines at the same time (in the same order) passing
605  * the \ref graphlab::distributed_control object. Upon
606  * construction the semi synchronous engine allocates several
607  * data-structures to store messages, gather accumulants, and
608  * vertex programs and therefore may require considerable memory.
609  *
610  * The number of threads to create are read from
611  * \ref graphlab_options::get_ncpus "opts.get_ncpus()".
612  *
613  * See the <a href="#engineopts">main class documentation</a>
614  * for details on the available options.
615  *
616  * @param [in] dc Distributed controller to associate with
617  * @param [in,out] graph A reference to the graph object that this
618  * engine will modify. The graph must be fully constructed and
619  * finalized.
620  * @param [in] opts A graphlab::graphlab_options object specifying engine
621  * parameters. This is typically constructed using
622  * \ref graphlab::command_line_options.
623  */
625  const graphlab_options& opts = graphlab_options());
626 
627 
629  delete scheduler_ptr;
630  }
631 
632  /**
633  * \brief Start execution of the semi synchronous engine.
634  *
635  * The start function begins computation and does not return until
636  * there are no remaining messages or until max_iterations has
637  * been reached.
638  *
639  * The start() function modifies the data graph through the vertex
640  * programs and so upon return the data graph should contain the
641  * result of the computation.
642  *
643  * @return The reason for termination
644  */
646 
647  // documentation inherited from iengine
648  size_t num_updates() const;
649 
650  // documentation inherited from iengine
651  void signal(vertex_id_type vid,
652  const message_type& message = message_type());
653 
654  // documentation inherited from iengine
655  void signal_all(const message_type& message = message_type(),
656  const std::string& order = "shuffle");
657 
658  void signal_vset(const vertex_set& vset,
659  const message_type& message = message_type(),
660  const std::string& order = "shuffle");
661 
662 
663  // documentation inherited from iengine
664  float elapsed_seconds() const;
665 
666  /**
667  * \brief Get the current iteration number since start was last
668  * invoked.
669  *
670  * \return the current iteration
671  */
672  int iteration() const;
673 
674 
675  /**
676  * \brief Compute the total memory used by the entire distributed
677  * system.
678  *
679  * @return The total memory used in bytes.
680  */
681  size_t total_memory_usage() const;
682 
683  /**
684  * \brief Get a pointer to the distributed aggregator object.
685  *
686  * This is currently used by the \ref graphlab::iengine interface to
687  * implement the calls to aggregation.
688  *
689  * @return a pointer to the local aggregator.
690  */
691  aggregator_type* get_aggregator();
692 
693  private:
694 
695  /**
696  * \brief This internal stop function is called by the \ref graphlab::context to
697  * terminate execution of the engine.
698  */
699  void internal_stop();
700 
701  /**
702  * \brief This function is called remote by the rpc to force the
703  * engine to stop.
704  */
705  void rpc_stop();
706 
707  /**
708  * \brief Signal a vertex.
709  *
710  * This function is called by the \ref graphlab::context.
711  *
712  * @param [in] vertex the vertex to signal
713  * @param [in] message the message to send to that vertex.
714  */
715  void internal_signal(const vertex_type& vertex,
716  const message_type& message = message_type());
717 
718  /**
719  * \brief Called by the context to signal an arbitrary vertex.
720  * This must be done by finding the owner of that vertex.
721  *
722  * @param [in] gvid the global vertex id of the vertex to signal
723  * @param [in] message the message to send to that vertex.
724  */
725  void internal_signal_broadcast(vertex_id_type gvid,
726  const message_type& message = message_type());
727 
728  /**
729  * \brief This function tests if this machine is the master of
730  * gvid and signals if successful.
731  */
732  void internal_signal_rpc(vertex_id_type gvid,
733  const message_type& message = message_type());
734 
735 
736  /**
737  * \brief Post a to a previous gather for a give vertex.
738  *
739  * This function is called by the \ref graphlab::context.
740  *
741  * @param [in] vertex The vertex to which to post a change in the sum
742  * @param [in] delta The change in that sum
743  */
744  void internal_post_delta(const vertex_type& vertex,
745  const gather_type& delta);
746 
747  /**
748  * \brief Clear the cached gather for a vertex if one is
749  * available.
750  *
751  * This function is called by the \ref graphlab::context.
752  *
753  * @param [in] vertex the vertex for which to clear the cache
754  */
755  void internal_clear_gather_cache(const vertex_type& vertex);
756 
757 
758  // Program Steps ==========================================================
759 
760 
761  void thread_launch_wrapped_event_counter(size_t thread_id,
762  boost::function<void(void)> fn) {
763  INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
764  rmi.dc().stop_handler_threads(thread_id, threads.size());
765  fn();
766  rmi.dc().start_handler_threads(thread_id, threads.size());
767  DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
768  }
769 
770  /**
771  * \brief Executes ncpus copies of a member function each with a
772  * unique consecutive id (thread id).
773  *
774  * This function is used by the main loop to execute each of the
775  * stages in parallel.
776  *
777  * The member function must have the type:
778  *
779  * \code
780  * void semi_synchronous_engine::member_fun(size_t threadid);
781  * \endcode
782  *
783  * This function runs an rmi barrier after termination
784  *
785  * @tparam the type of the member function.
786  * @param [in] member_fun the function to call.
787  */
788  template<typename MemberFunction>
789  void run_synchronous(MemberFunction member_fun) {
790  shared_lvid_counter = 0;
791  // launch the initialization threads
792  for(size_t i = 0; i < threads.size(); ++i) {
793  boost::function<void(void)> invoke = boost::bind(member_fun, this, i);
794  threads.launch(boost::bind(
795  &semi_synchronous_engine::thread_launch_wrapped_event_counter,
796  this,
797  i,
798  invoke), i);
799  }
800  // Wait for all threads to finish
801  threads.join();
802  rmi.barrier();
803  } // end of run_synchronous
804 
805  // /**
806  // * \brief Initialize all vertex programs by invoking
807  // * \ref graphlab::ivertex_program::init on all vertices.
808  // *
809  // * @param thread_id the thread to run this as which determines
810  // * which vertices to process.
811  // */
812  // void initialize_vertex_programs(size_t thread_id);
813 
814  /**
815  * \brief Synchronize all message data.
816  *
817  * @param thread_id the thread to run this as which determines
818  * which vertices to process.
819  */
820  void exchange_messages(size_t thread_id);
821 
822  /**
823  * \brief Synchronize some message data and prepares some vertices for
824  * execution
825  *
826  * @param thread_id the thread to run this as which determines
827  * which vertices to process.
828  */
829  void transfer_scheduler_to_active(size_t thread_id);
830 
831 
832  /**
833  * \brief Execute the \ref graphlab::ivertex_program::gather function on all
834  * vertices that received messages for the edges specified by the
835  * \ref graphlab::ivertex_program::gather_edges.
836  *
837  * @param thread_id the thread to run this as which determines
838  * which vertices to process.
839  */
840  void execute_gathers(size_t thread_id);
841 
842 
843 
844 
845  /**
846  * \brief Execute the \ref graphlab::ivertex_program::apply function on all
847  * all vertices that received messages in this super-step (active).
848  *
849  * @param thread_id the thread to run this as which determines
850  * which vertices to process.
851  */
852  void execute_applys(size_t thread_id);
853 
854  /**
855  * \brief Execute the \ref graphlab::ivertex_program::scatter function on all
856  * vertices that received messages for the edges specified by the
857  * \ref graphlab::ivertex_program::scatter_edges.
858  *
859  * @param thread_id the thread to run this as which determines
860  * which vertices to process.
861  */
862  void execute_scatters(size_t thread_id);
863 
864  // Data Synchronization ===================================================
865  /**
866  * \brief Send the vertex program for the local vertex id to all
867  * of its mirrors.
868  *
869  * @param [in] lvid the vertex to sync. This muster must be the
870  * master of that vertex.
871  */
872  void sync_vertex_program(lvid_type lvid, size_t thread_id);
873 
874  /**
875  * \brief Receive all incoming vertex programs and update the
876  * local mirrors.
877  *
878  * This function returns when there are no more incoming vertex
879  * programs and should be called after a flush of the vertex
880  * program exchange.
881  */
882  void recv_vertex_programs(size_t threadid, const bool try_to_recv = false);
883 
884  /**
885  * \brief Send the vertex data for the local vertex id to all of
886  * its mirrors.
887  *
888  * @param [in] lvid the vertex to sync. This machine must be the master
889  * of that vertex.
890  */
891  void sync_vertex_data(lvid_type lvid, size_t thread_id);
892 
893  /**
894  * \brief Receive all incoming vertex data and update the local
895  * mirrors.
896  *
897  * This function returns when there are no more incoming vertex
898  * data and should be called after a flush of the vertex data
899  * exchange.
900  */
901  void recv_vertex_data(size_t threadid, const bool try_to_recv = false);
902 
903  /**
904  * \brief Send the gather value for the vertex id to its master.
905  *
906  * @param [in] lvid the vertex to send the gather value to
907  * @param [in] accum the locally computed gather value.
908  */
909  void sync_gather(lvid_type lvid, const gather_type& accum,
910  size_t thread_id);
911 
912 
913  /**
914  * \brief Receive the gather values from the buffered exchange.
915  *
916  * This function returns when there is nothing left in the
917  * buffered exchange and should be called after the buffered
918  * exchange has been flushed
919  */
920  void recv_gathers(size_t threadid, const bool try_to_recv = false);
921 
922  /**
923  * \brief Send the accumulated message for the local vertex to its
924  * master.
925  *
926  * @param [in] lvid the vertex to send
927  */
928  void sync_message(lvid_type lvid,
929  const size_t thread_id,
930  const message_type& msg);
931 
932  /**
933  * \brief Receive the messages from the buffered exchange.
934  *
935  * This function returns when there is nothing left in the
936  * buffered exchange and should be called after the buffered
937  * exchange has been flushed
938  */
939  void recv_messages(size_t threadid, const bool try_to_recv = false);
940 
941 
942  }; // end of class semi synchronous engine
943 
944 
945 
946 
947 
948 
949 
950 
951 
952 
953 
954 
955 
956 
957 
958 
959 
960 
961 
962 
963 
964 
965 
966 
967 
968  /**
969  * Constructs a semi synchronous distributed engine.
970  * The number of threads to create are read from
971  * opts::get_ncpus().
972  *
973  * Valid engine options (graphlab_options::get_engine_args()):
974  * \arg \c max_iterations Sets the maximum number of iterations the
975  * engine will run for.
976  * \arg \c use_cache If set to true, partial gathers are cached.
977  * See \ref gather_caching to understand the behavior of the
978  * gather caching model and how it may be used to accelerate program
979  * performance.
980  *
981  * \param dc Distributed controller to associate with
982  * \param graph The graph to schedule over. The graph must be fully
983  * constructed and finalized.
984  * \param opts A graphlab_options object containing options and parameters
985  * for the engine.
986  */
987  template<typename VertexProgram>
990  graph_type& graph,
991  const graphlab_options& opts) :
992  rmi(dc, this), graph(graph),
993  threads(opts.get_ncpus()),
994  thread_barrier(opts.get_ncpus()),
995  max_iterations(-1), snapshot_interval(-1), iteration_counter(0),
996  started(false), timeout(0),
997  max_active_vertices(1000),
998  scheduler_ptr(NULL),
999  active_superstep(128),
1000  active_superstep_pushback(active_superstep, 0),
1001  active_minorstep(128),
1002  active_minorstep_pushback(active_minorstep, 0),
1003  vprog_exchange(dc, opts.get_ncpus(), 65536),
1004  vdata_exchange(dc, opts.get_ncpus(), 65536),
1005  gather_exchange(dc, opts.get_ncpus(), 65536),
1006  message_exchange(dc, opts.get_ncpus(), 65536),
1007  aggregator(dc, graph, new context_type(*this, graph)) {
1008  // Process any additional options
1009  std::vector<std::string> keys = opts.get_engine_args().get_option_keys();
1010  per_thread_compute_time.resize(opts.get_ncpus());
1011  bool use_cache = false;
1012 
1013  graph.finalize();
1014 
1015  max_active_vertices = graph.num_local_vertices() * 0.1;
1016  max_active_vertices = std::max<size_t>(max_active_vertices, 1000);
1017 
1018  foreach(std::string opt, keys) {
1019  if (opt == "max_iterations") {
1020  opts.get_engine_args().get_option("max_iterations", max_iterations);
1021  if (rmi.procid() == 0)
1022  logstream(LOG_EMPH) << "Engine Option: max_iterations = "
1023  << max_iterations << std::endl;
1024  } else if (opt == "timeout") {
1025  opts.get_engine_args().get_option("timeout", timeout);
1026  if (rmi.procid() == 0)
1027  logstream(LOG_EMPH) << "Engine Option: timeout = "
1028  << timeout << std::endl;
1029  } else if (opt == "use_cache") {
1030  opts.get_engine_args().get_option("use_cache", use_cache);
1031  if (rmi.procid() == 0)
1032  logstream(LOG_EMPH) << "Engine Option: use_cache = "
1033  << use_cache << std::endl;
1034  } else if (opt == "snapshot_interval") {
1035  opts.get_engine_args().get_option("snapshot_interval", snapshot_interval);
1036  if (rmi.procid() == 0)
1037  logstream(LOG_EMPH) << "Engine Option: snapshot_interval = "
1038  << snapshot_interval << std::endl;
1039  } else if (opt == "snapshot_path") {
1040  opts.get_engine_args().get_option("snapshot_path", snapshot_path);
1041  if (rmi.procid() == 0)
1042  logstream(LOG_EMPH) << "Engine Option: snapshot_path = "
1043  << snapshot_path << std::endl;
1044  } else if (opt == "max_active_vertices") {
1045  opts.get_engine_args().get_option("max_active_vertices", max_active_vertices);
1046  if (rmi.procid() == 0)
1047  logstream(LOG_EMPH) << "Engine Option: max_active_vertices = "
1048  << max_active_vertices << std::endl;
1049  } else if (opt == "max_active_fraction") {
1050  float max_active_fraction = 0.1;
1051  opts.get_engine_args().get_option("max_active_fraction", max_active_fraction);
1052  if (max_active_fraction > 0) {
1053  max_active_vertices = graph.num_local_vertices() * max_active_fraction;
1054  max_active_vertices += (max_active_vertices == 0);
1055  }
1056  else {
1057  max_active_vertices = (size_t)(-1);
1058  }
1059  if (rmi.procid() == 0) {
1060  logstream(LOG_EMPH) << "Engine Option: max_active_fraction = "
1061  << max_active_fraction << std::endl;
1062  }
1063  } else {
1064  logstream(LOG_FATAL) << "Unexpected Engine Option: " << opt << std::endl;
1065  }
1066  }
1067 
1068  if (snapshot_interval >= 0 && snapshot_path.length() == 0) {
1069  logstream(LOG_FATAL)
1070  << "Snapshot interval specified, but no snapshot path" << std::endl;
1071  }
1072  INITIALIZE_EVENT_LOG(dc);
1073  ADD_CUMULATIVE_EVENT(EVENT_APPLIES, "Applies", "Calls");
1074  ADD_CUMULATIVE_EVENT(EVENT_GATHERS , "Gathers", "Calls");
1075  ADD_CUMULATIVE_EVENT(EVENT_SCATTERS , "Scatters", "Calls");
1076  ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS, "Active Threads", "Threads");
1077 
1078  // Finalize the graph
1079  memory_info::log_usage("Before Engine Initialization");
1080  // Allocate vertex locks and vertex programs
1081  active_superstep.resize(2 * max_active_vertices);
1082  active_minorstep.resize(2 * max_active_vertices);
1083  vlocks.resize(graph.num_local_vertices());
1084  vertex_programs.resize(graph.num_local_vertices());
1085  has_remote_message.resize(graph.num_local_vertices());
1086  has_remote_message.clear();
1087  // allocate the edge locks
1088  //elocks.resize(graph.num_local_edges());
1089  // Allocate gather accumulators and accumulator bitset
1090  gather_accum.resize(graph.num_local_vertices(), gather_type());
1091  has_gather_accum.resize(graph.num_local_vertices());
1092  has_gather_accum.clear();
1093  // If caching is used then allocate cache data-structures
1094  if (use_cache) {
1095  gather_cache.resize(graph.num_local_vertices(), gather_type());
1096  has_cache.resize(graph.num_local_vertices());
1097  has_cache.clear();
1098  }
1099  // Allocate bitset to track active vertices on each bitset.
1100  active_superstep.resize(opts.get_ncpus());
1101  // Print memory usage after initialization
1102  memory_info::log_usage("After Engine Initialization");
1103 
1104  graphlab_options opts_copy = opts;
1105 
1106  // set a default scheduler if none
1107  if (opts_copy.get_scheduler_type() == "") {
1108  opts_copy.set_scheduler_type("queued_fifo");
1109  }
1110 
1111  // construct scheduler
1112  scheduler_ptr = scheduler_factory<message_type>::
1113  new_scheduler(graph.num_local_vertices(),
1114  opts_copy);
1115 
1116  rmi.barrier();
1117  } // end of synchronous engine
1118 
1119 
1120 
1121 
1122 
1123 
1124 
1125  template<typename VertexProgram>
1126  typename semi_synchronous_engine<VertexProgram>::aggregator_type*
1128  return &aggregator;
1129  } // end of get_aggregator
1130 
1131 
1132 
1133  template<typename VertexProgram>
1135  for (size_t i = 0; i < rmi.numprocs(); ++i)
1137  } // end of internal_stop
1138 
1139  template<typename VertexProgram>
1140  void semi_synchronous_engine<VertexProgram>::rpc_stop() {
1141  force_abort = true;
1142  } // end of rpc_stop
1143 
1144 
1145  template<typename VertexProgram>
1147  signal(vertex_id_type gvid, const message_type& message) {
1148  rmi.barrier();
1149  internal_signal_rpc(gvid, message);
1150  rmi.barrier();
1151  } // end of signal
1152 
1153 
1154 
1155  template<typename VertexProgram>
1157  signal_all(const message_type& message, const std::string& order) {
1158  for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1159  if(graph.l_is_master(lvid)) {
1160  internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1161  }
1162  }
1163  } // end of signal all
1164 
1165 
1166  template<typename VertexProgram>
1169  const message_type& message, const std::string& order) {
1170  for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1171  if(graph.l_is_master(lvid) && vset.l_contains(lvid)) {
1172  internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1173  }
1174  }
1175  } // end of signal all
1176 
1177 
1178  template<typename VertexProgram>
1180  internal_signal(const vertex_type& vertex,
1181  const message_type& message) {
1182  const lvid_type lvid = vertex.local_id();
1183  if (!graph.l_is_master(lvid)) {
1184  scheduler_ptr->place(lvid, message);
1185  has_remote_message.set_bit(lvid);
1186  }
1187  else {
1188  if (started) {
1189  scheduler_ptr->schedule_from_execution_thread(thread::thread_id(),
1190  lvid, message);
1191  } else {
1192  scheduler_ptr->schedule(lvid, message);
1193  }
1194  }
1195  } // end of internal_signal
1196 
1197 
1198  template<typename VertexProgram>
1199  void semi_synchronous_engine<VertexProgram>::
1200  internal_signal_broadcast(vertex_id_type gvid, const message_type& message) {
1201  for (size_t i = 0; i < rmi.numprocs(); ++i) {
1202  if(i == rmi.procid()) internal_signal_rpc(gvid, message);
1203  else rmi.remote_call(i, &semi_synchronous_engine<VertexProgram>::internal_signal_rpc,
1204  gvid, message);
1205  }
1206  } // end of internal_signal_broadcast
1207 
1208  template<typename VertexProgram>
1209  void semi_synchronous_engine<VertexProgram>::
1210  internal_signal_rpc(vertex_id_type gvid,
1211  const message_type& message) {
1212  if (graph.is_master(gvid)) {
1213  internal_signal(graph.vertex(gvid), message);
1214  }
1215  } // end of internal_signal_rpc
1216 
1217 
1218 
1219 
1220 
1221  template<typename VertexProgram>
1222  void semi_synchronous_engine<VertexProgram>::
1223  internal_post_delta(const vertex_type& vertex, const gather_type& delta) {
1224  const bool caching_enabled = !gather_cache.empty();
1225  if(caching_enabled) {
1226  const lvid_type lvid = vertex.local_id();
1227  vlocks[lvid].lock();
1228  if( has_cache.get(lvid) ) {
1229  gather_cache[lvid] += delta;
1230  } else {
1231  // You cannot add a delta to an empty cache. A complete
1232  // gather must have been run.
1233  // gather_cache[lvid] = delta;
1234  // has_cache.set_bit(lvid);
1235  }
1236  vlocks[lvid].unlock();
1237  }
1238  } // end of post_delta
1239 
1240 
1241  template<typename VertexProgram>
1242  void semi_synchronous_engine<VertexProgram>::
1243  internal_clear_gather_cache(const vertex_type& vertex) {
1244  const bool caching_enabled = !gather_cache.empty();
1245  const lvid_type lvid = vertex.local_id();
1246  if(caching_enabled && has_cache.get(lvid)) {
1247  vlocks[lvid].lock();
1248  gather_cache[lvid] = gather_type();
1249  has_cache.clear_bit(lvid);
1250  vlocks[lvid].unlock();
1251  }
1252  } // end of clear_gather_cache
1253 
1254 
1255 
1256 
1257  template<typename VertexProgram>
1259  num_updates() const { return completed_applys.value; }
1260 
1261  template<typename VertexProgram>
1263  elapsed_seconds() const { return timer::approx_time_seconds() - start_time; }
1264 
1265  template<typename VertexProgram>
1267  iteration() const { return iteration_counter; }
1268 
1269 
1270 
1271  template<typename VertexProgram>
1273  size_t allocated_memory = memory_info::allocated_bytes();
1274  rmi.all_reduce(allocated_memory);
1275  return allocated_memory;
1276  } // compute the total memory usage of the GraphLab system
1277 
1278 
1279  template<typename VertexProgram> execution_status::status_enum
1281  rmi.barrier();
1282  graph.finalize();
1283  // Initialization code ==================================================
1284  // Reset event log counters?
1285  // Start the timer
1286  graphlab::timer timer; timer.start();
1287  start_time = timer::approx_time_seconds();
1288  iteration_counter = 0;
1289  force_abort = false;
1290  execution_status::status_enum termination_reason = execution_status::UNSET;
1291  aggregator.start();
1292  scheduler_ptr->start();
1293  started = true;
1294 
1295  rmi.barrier();
1296  if (snapshot_interval == 0) {
1297  graph.save_binary(snapshot_path);
1298  }
1299 
1300  // set to -5 to force it to print the first round
1301  float last_print = -5;
1302  if (rmi.procid() == 0) {
1303  logstream(LOG_EMPH) << "Iteration counter will only output every 5 seconds."
1304  << std::endl;
1305  }
1306 
1307 
1308  // Program Main loop ====================================================
1309  while(iteration_counter < max_iterations && !force_abort ) {
1310 
1311  // Check first to see if we are out of time
1312  if(timeout != 0 && timeout < elapsed_seconds()) {
1313  termination_reason = execution_status::TIMEOUT;
1314  break;
1315  }
1316 
1317  bool print_this_round = (elapsed_seconds() - last_print) >= 5;
1318 
1319  if(rmi.procid() == 0 && print_this_round) {
1320  logstream(LOG_EMPH)
1321  << rmi.procid() << ": Starting iteration: " << iteration_counter
1322  << std::endl;
1323  last_print = elapsed_seconds();
1324  }
1325 
1326  run_synchronous( &semi_synchronous_engine::exchange_messages);
1327  has_remote_message.clear();
1328 
1329  // Reset Active vertices ----------------------------------------------
1330  // Clear the active super-step and minor-step bits which will
1331  // be set upon receiving messages
1332  active_superstep_pushback.set_size(0);
1333  active_minorstep_pushback.set_size(0);
1334  has_gather_accum.clear();
1335 
1336 
1337  // how many to activate?
1338  num_to_activate = max_active_vertices;
1339  num_active_vertices = 0;
1340 
1341  rmi.barrier();
1342  // Exchange Messages --------------------------------------------------
1343  // Exchange any messages in the local message vectors
1344  // if (rmi.procid() == 0) std::cout << "Exchange messages..." << std::endl;
1345  run_synchronous( &semi_synchronous_engine::transfer_scheduler_to_active);
1346 
1347  /**
1348  * Post conditions:
1349  * 1) only master vertices have messages
1350  */
1351 
1352  /**
1353  * Post conditions:
1354  * 1) there are no messages remaining
1355  * 2) All masters that received messages have their
1356  * active_superstep bit set
1357  * 3) All masters and mirrors that are to participate in the
1358  * next gather phases have their active_minorstep bit
1359  * set.
1360  * 4) num_active_vertices is the number of vertices that
1361  * received messages.
1362  */
1363 
1364  // Check termination condition ---------------------------------------
1365  // TODO: optimize by joining the reduces
1366  size_t total_active_vertices = num_active_vertices;
1367  rmi.all_reduce(total_active_vertices);
1368  if (rmi.procid() == 0 && print_this_round)
1369  logstream(LOG_EMPH)
1370  << "\tActive vertices: " << total_active_vertices << std::endl;
1371  if(total_active_vertices == 0) {
1372  termination_reason = execution_status::TASK_DEPLETION;
1373  break;
1374  }
1375 
1376  // Execute gather operations-------------------------------------------
1377  // Execute the gather operation for all vertices that are active
1378  // in this minor-step (active-minorstep bit set).
1379  // if (rmi.procid() == 0) std::cout << "Gathering..." << std::endl;
1380  run_synchronous( &semi_synchronous_engine::execute_gathers );
1381  // Clear the minor step bit since only super-step vertices
1382  // (only master vertices are required to participate in the
1383  // apply step)
1384  active_minorstep_pushback.set_size(0);
1385  rmi.barrier();
1386 
1387  /**
1388  * Post conditions:
1389  * 1) gather_accum for all master vertices contains the
1390  * result of all the gathers (even if they are drawn from
1391  * cache)
1392  * 2) No minor-step bits are set
1393  */
1394 
1395  // Execute Apply Operations -------------------------------------------
1396  // Run the apply function on all active vertices
1397  // if (rmi.procid() == 0) std::cout << "Applying..." << std::endl;
1398  run_synchronous( &semi_synchronous_engine::execute_applys );
1399  /**
1400  * Post conditions:
1401  * 1) any changes to the vertex data have been synchronized
1402  * with all mirrors.
1403  * 2) all gather accumulators have been cleared
1404  * 3) If a vertex program is participating in the scatter
1405  * phase its minor-step bit has been set to active (both
1406  * masters and mirrors) and the vertex program has been
1407  * synchronized with the mirrors.
1408  */
1409 
1410 
1411  // Execute Scatter Operations -----------------------------------------
1412  // Execute each of the scatters on all minor-step active vertices.
1413 
1414 
1415 
1416  run_synchronous( &semi_synchronous_engine::execute_scatters );
1417 
1418  /**
1419  * Post conditions:
1420  * 1) NONE
1421  */
1422  if(rmi.procid() == 0 && print_this_round)
1423  logstream(LOG_EMPH) << "\t Running Aggregators" << std::endl;
1424  // probe the aggregator
1425  aggregator.tick_synchronous();
1426 
1427  ++iteration_counter;
1428 
1429  if (snapshot_interval > 0 && iteration_counter % snapshot_interval == 0) {
1430  graph.save_binary(snapshot_path);
1431  }
1432  }
1433 
1434  if (rmi.procid() == 0) {
1435  logstream(LOG_EMPH) << iteration_counter
1436  << " iterations completed." << std::endl;
1437  }
1438  // Final barrier to ensure that all engines terminate at the same time
1439  double total_compute_time = 0;
1440  for (size_t i = 0;i < per_thread_compute_time.size(); ++i) {
1441  total_compute_time += per_thread_compute_time[i];
1442  }
1443  std::vector<double> all_compute_time_vec(rmi.numprocs());
1444  all_compute_time_vec[rmi.procid()] = total_compute_time;
1445  rmi.all_gather(all_compute_time_vec);
1446 
1447  size_t global_completed = completed_applys;
1448  rmi.all_reduce(global_completed);
1449  completed_applys = global_completed;
1450  rmi.cout() << "Updates: " << completed_applys.value << "\n";
1451  if (rmi.procid() == 0) {
1452  logstream(LOG_INFO) << "Compute Balance: ";
1453  for (size_t i = 0;i < all_compute_time_vec.size(); ++i) {
1454  logstream(LOG_INFO) << all_compute_time_vec[i] << " ";
1455  }
1456  logstream(LOG_INFO) << std::endl;
1457  }
1458  rmi.full_barrier();
1459  // Stop the aggregator
1460  aggregator.stop();
1461  started = false;
1462  // return the final reason for termination
1463  return termination_reason;
1464  } // end of start
1465 
1466 
1467 
1468  template<typename VertexProgram>
1470  exchange_messages(const size_t thread_id) {
1471  context_type context(*this, graph);
1472  const bool TRY_TO_RECV = true;
1473  const size_t TRY_RECV_MOD = 100;
1474  size_t vcount = 0;
1476  // for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
1477  // lvid += threads.size()) {
1478  while (1) {
1479  // increment by a word at a time
1480  lvid_type lvid_block_start =
1481  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1482  if (lvid_block_start >= graph.num_local_vertices()) break;
1483  // get the bit field from has_message
1484  size_t lvid_bit_block = has_remote_message.containing_word(lvid_block_start);
1485  if (lvid_bit_block == 0) continue;
1486  // initialize a word sized bitfield
1487  local_bitset.clear();
1488  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1489  foreach(size_t lvid_block_offset, local_bitset) {
1490  lvid_type lvid = lvid_block_start + lvid_block_offset;
1491  if (lvid >= graph.num_local_vertices()) break;
1492  // if the vertex is not local and has a message send the
1493  // message and clear the bit
1494  message_type msg;
1495  ASSERT_FALSE(graph.l_is_master(lvid));
1496  if(scheduler_ptr->get_specific(lvid, msg) != sched_status::EMPTY) {
1497  sync_message(lvid, thread_id, msg);
1498  }
1499  if(++vcount % TRY_RECV_MOD == 0) recv_messages(thread_id, TRY_TO_RECV);
1500  }
1501  } // end of loop over vertices to send messages
1502  message_exchange.partial_flush(thread_id);
1503  // Finish sending and receiving all messages
1504  rmi.dc().start_handler_threads(thread_id, threads.size());
1505  thread_barrier.wait();
1506  if(thread_id == 0) message_exchange.flush();
1507  thread_barrier.wait();
1508  rmi.dc().stop_handler_threads(thread_id, threads.size());
1509  recv_messages(thread_id);
1510  } // end of exchange_messages
1511 
1512 
1513 
1514 
1515  template<typename VertexProgram>
1516  void semi_synchronous_engine<VertexProgram>::
1517  transfer_scheduler_to_active(const size_t thread_id) {
1518  context_type context(*this, graph);
1519  const bool TRY_TO_RECV = true;
1520  const size_t TRY_RECV_MOD = 100;
1521  size_t vcount = 0;
1522  size_t curthread_num_to_activate = num_to_activate / threads.size();
1523  curthread_num_to_activate += (curthread_num_to_activate == 0);
1524  size_t nactive_inc = 0;
1525  while (nactive_inc < curthread_num_to_activate) {
1526  lvid_type lvid;
1527  message_type msg;
1529  scheduler_ptr->get_next(thread_id, lvid, msg);
1530  bool has_sched_msg = stat != sched_status::EMPTY;
1531  if (has_sched_msg) {
1532  // if the vertex is not local and has a message send the
1533  // message and clear the bit
1534  ASSERT_TRUE(graph.l_is_master(lvid));
1535  nactive_inc++;
1536  // The vertex becomes active for this superstep
1537  // Pass the message to the vertex program
1538  active_superstep_pushback.push_back(lvid);
1539  vertex_type vertex = vertex_type(graph.l_vertex(lvid));
1540  vertex_programs[lvid].init(context, vertex, msg);
1541  // Determine if the gather should be run
1542  const vertex_program_type& const_vprog = vertex_programs[lvid];
1543  const vertex_type const_vertex = vertex;
1544  if(const_vprog.gather_edges(context, const_vertex) !=
1546  active_minorstep_pushback.push_back(lvid);
1547  sync_vertex_program(lvid, thread_id);
1548  }
1549  if (++vcount % TRY_RECV_MOD == 0) {
1550  // to avoid popping the same task multiple times, it is
1551  // of critical importance that we do not recv_message here.
1552  // but only recv_messages at the end of the phase
1553  // recv_messages(TRY_TO_RECV);
1554  recv_vertex_programs(thread_id, TRY_TO_RECV);
1555  }
1556  } else {
1557  break;
1558  }
1559  } // end of loop over vertices to send messages
1560  vprog_exchange.partial_flush(thread_id);
1561  num_active_vertices.inc(nactive_inc);
1562  // Finish sending and receiving all messages
1563  rmi.dc().start_handler_threads(thread_id, threads.size());
1564  thread_barrier.wait();
1565  if(thread_id == 0) {
1566  vprog_exchange.flush();
1567  }
1568  thread_barrier.wait();
1569  rmi.dc().stop_handler_threads(thread_id, threads.size());
1570  recv_vertex_programs(thread_id);
1571  } // end of exchange_messages
1572 
1573 
1574  template<typename VertexProgram>
1575  void semi_synchronous_engine<VertexProgram>::
1576  execute_gathers(const size_t thread_id) {
1577  context_type context(*this, graph);
1578  const bool TRY_TO_RECV = true;
1579  const size_t TRY_RECV_MOD = 1000;
1580  size_t vcount = 0;
1581  const bool caching_enabled = !gather_cache.empty();
1582  timer ti;
1583 
1584  size_t numminorstep = active_minorstep_pushback.size();
1585  while(1) {
1586  size_t i = shared_lvid_counter.inc_ret_last();
1587  if (i >= numminorstep) break;
1588  lvid_type lvid = active_minorstep[i];
1589 
1590  bool accum_is_set = false;
1591  gather_type accum = gather_type();
1592  // if caching is enabled and we have a cache entry then use
1593  // that as the accum
1594  if( caching_enabled && has_cache.get(lvid) ) {
1595  accum = gather_cache[lvid];
1596  accum_is_set = true;
1597  } else {
1598  // recompute the local contribution to the gather
1599  const vertex_program_type& vprog = vertex_programs[lvid];
1600  local_vertex_type local_vertex = graph.l_vertex(lvid);
1601  const vertex_type vertex(local_vertex);
1602  const edge_dir_type gather_dir = vprog.gather_edges(context, vertex);
1603  // Loop over in edges
1604  size_t edges_touched = 0;
1605  vprog.pre_local_gather(accum);
1606  if(gather_dir == IN_EDGES || gather_dir == ALL_EDGES) {
1607  foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1608  edge_type edge(local_edge);
1609  // elocks[local_edge.id()].lock();
1610  if(accum_is_set) { // \todo hint likely
1611  accum += vprog.gather(context, vertex, edge);
1612  } else {
1613  accum = vprog.gather(context, vertex, edge);
1614  accum_is_set = true;
1615  }
1616  ++edges_touched;
1617  // elocks[local_edge.id()].unlock();
1618  }
1619  } // end of if in_edges/all_edges
1620  // Loop over out edges
1621  if(gather_dir == OUT_EDGES || gather_dir == ALL_EDGES) {
1622  foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1623  edge_type edge(local_edge);
1624  // elocks[local_edge.id()].lock();
1625  if(accum_is_set) { // \todo hint likely
1626  accum += vprog.gather(context, vertex, edge);
1627  } else {
1628  accum = vprog.gather(context, vertex, edge);
1629  accum_is_set = true;
1630  }
1631  // elocks[local_edge.id()].unlock();
1632  ++edges_touched;
1633  }
1634  INCREMENT_EVENT(EVENT_GATHERS, edges_touched);
1635  } // end of if out_edges/all_edges
1636  vprog.post_local_gather(accum);
1637  // If caching is enabled then save the accumulator to the
1638  // cache for future iterations. Note that it is possible
1639  // that the accumulator was never set in which case we are
1640  // effectively "zeroing out" the cache.
1641  if(caching_enabled && accum_is_set) {
1642  gather_cache[lvid] = accum; has_cache.set_bit(lvid);
1643  } // end of if caching enabled
1644  }
1645  // If the accum contains a value for the local gather we put
1646  // that estimate in the gather exchange.
1647  if(accum_is_set) sync_gather(lvid, accum, thread_id);
1648  if(!graph.l_is_master(lvid)) {
1649  // if this is not the master clear the vertex program
1650  vertex_programs[lvid] = vertex_program_type();
1651  }
1652 
1653  // try to recv gathers if there are any in the buffer
1654  if(++vcount % TRY_RECV_MOD == 0) recv_gathers(thread_id, TRY_TO_RECV);
1655  } // end of loop over vertices to compute gather accumulators
1656  per_thread_compute_time[thread_id] += ti.current_time();
1657  gather_exchange.partial_flush(thread_id);
1658  // Finish sending and receiving all gather operations
1659  rmi.dc().start_handler_threads(thread_id, threads.size());
1660  thread_barrier.wait();
1661  if(thread_id == 0) gather_exchange.flush();
1662  thread_barrier.wait();
1663  rmi.dc().stop_handler_threads(thread_id, threads.size());
1664  recv_gathers(thread_id);
1665  } // end of execute_gathers
1666 
1667 
1668  template<typename VertexProgram>
1669  void semi_synchronous_engine<VertexProgram>::
1670  execute_applys(const size_t thread_id) {
1671  context_type context(*this, graph);
1672  const bool TRY_TO_RECV = true;
1673  const size_t TRY_RECV_MOD = 1000;
1674  size_t vcount = 0;
1675  timer ti;
1676 
1677  size_t numactive = active_superstep_pushback.size();
1678  while(1) {
1679  size_t i = shared_lvid_counter.inc_ret_last();
1680  if (i >= numactive) break;
1681  lvid_type lvid = active_superstep[i];
1682  // Only master vertices can be active in a super-step
1683  ASSERT_TRUE(graph.l_is_master(lvid));
1684  vertex_type vertex(graph.l_vertex(lvid));
1685  // Get the local accumulator. Note that it is possible that
1686  // the gather_accum was not set during the gather.
1687  const gather_type& accum = gather_accum[lvid];
1688  INCREMENT_EVENT(EVENT_APPLIES, 1);
1689  vertex_programs[lvid].apply(context, vertex, accum);
1690  // record an apply as a completed task
1691  ++completed_applys;
1692  // Clear the accumulator to save some memory
1693  gather_accum[lvid] = gather_type();
1694  // synchronize the changed vertex data with all mirrors
1695  sync_vertex_data(lvid, thread_id);
1696  // determine if a scatter operation is needed
1697  const vertex_program_type& const_vprog = vertex_programs[lvid];
1698  const vertex_type const_vertex = vertex;
1699  if(const_vprog.scatter_edges(context, const_vertex) !=
1701  active_minorstep_pushback.push_back(lvid);
1702  sync_vertex_program(lvid, thread_id);
1703  } else { // we are done so clear the vertex program
1704  vertex_programs[lvid] = vertex_program_type();
1705  }
1706  // try to receive vertex data
1707  if(++vcount % TRY_RECV_MOD == 0) {
1708  recv_vertex_programs(thread_id, TRY_TO_RECV);
1709  recv_vertex_data(thread_id, TRY_TO_RECV);
1710  }
1711  } // end of loop over vertices to run apply
1712 
1713  per_thread_compute_time[thread_id] += ti.current_time();
1714  vprog_exchange.partial_flush(thread_id);
1715  vdata_exchange.partial_flush(thread_id);
1716  // Finish sending and receiving all changes due to apply operations
1717  rmi.dc().start_handler_threads(thread_id, threads.size());
1718  thread_barrier.wait();
1719  if(thread_id == 0) {
1720  vprog_exchange.flush(); vdata_exchange.flush();
1721  }
1722  thread_barrier.wait();
1723  rmi.dc().stop_handler_threads(thread_id, threads.size());
1724  recv_vertex_programs(thread_id);
1725  recv_vertex_data(thread_id);
1726 
1727  } // end of execute_applys
1728 
1729 
1730 
1731 
1732  template<typename VertexProgram>
1733  void semi_synchronous_engine<VertexProgram>::
1734  execute_scatters(const size_t thread_id) {
1735  context_type context(*this, graph);
1736  timer ti;
1737 
1738  size_t numminorstep = active_minorstep_pushback.size();
1739  while(1) {
1740  size_t i = shared_lvid_counter.inc_ret_last();
1741  if (i >= numminorstep) break;
1742  lvid_type lvid = active_minorstep[i];
1743 
1744  const vertex_program_type& vprog = vertex_programs[lvid];
1745  local_vertex_type local_vertex = graph.l_vertex(lvid);
1746  const vertex_type vertex(local_vertex);
1747  const edge_dir_type scatter_dir = vprog.scatter_edges(context, vertex);
1748  size_t edges_touched = 0;
1749  // Loop over in edges
1750  if(scatter_dir == IN_EDGES || scatter_dir == ALL_EDGES) {
1751  foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1752  edge_type edge(local_edge);
1753  // elocks[local_edge.id()].lock();
1754  vprog.scatter(context, vertex, edge);
1755  // elocks[local_edge.id()].unlock();
1756  }
1757  ++edges_touched;
1758  } // end of if in_edges/all_edges
1759  // Loop over out edges
1760  if(scatter_dir == OUT_EDGES || scatter_dir == ALL_EDGES) {
1761  foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1762  edge_type edge(local_edge);
1763  // elocks[local_edge.id()].lock();
1764  vprog.scatter(context, vertex, edge);
1765  // elocks[local_edge.id()].unlock();
1766  }
1767  ++edges_touched;
1768  } // end of if out_edges/all_edges
1769  INCREMENT_EVENT(EVENT_SCATTERS, edges_touched);
1770  // Clear the vertex program
1771  vertex_programs[lvid] = vertex_program_type();
1772  } // end of loop over vertices to complete scatter operation
1773 
1774  per_thread_compute_time[thread_id] += ti.current_time();
1775  } // end of execute_scatters
1776 
1777 
1778 
1779  // Data Synchronization ===================================================
1780  template<typename VertexProgram>
1781  void semi_synchronous_engine<VertexProgram>::
1782  sync_vertex_program(lvid_type lvid, const size_t thread_id) {
1783  ASSERT_TRUE(graph.l_is_master(lvid));
1784  const vertex_id_type vid = graph.global_vid(lvid);
1785  local_vertex_type vertex = graph.l_vertex(lvid);
1786  foreach(const procid_t& mirror, vertex.mirrors()) {
1787  vprog_exchange.send(mirror,
1788  std::make_pair(vid, vertex_programs[lvid]),
1789  thread_id);
1790  }
1791  } // end of sync_vertex_program
1792 
1793 
1794 
1795  template<typename VertexProgram>
1796  void semi_synchronous_engine<VertexProgram>::
1797  recv_vertex_programs(size_t threadid, const bool try_to_recv) {
1798  rmi.dc().handle_incoming_calls(threadid, threads.size());
1799  procid_t procid(-1);
1800  typename vprog_exchange_type::buffer_type buffer;
1801  while(vprog_exchange.recv(procid, buffer, try_to_recv)) {
1802  foreach(const vid_prog_pair_type& pair, buffer) {
1803  const lvid_type lvid = graph.local_vid(pair.first);
1804  // ASSERT_FALSE(graph.l_is_master(lvid));
1805  vertex_programs[lvid] = pair.second;
1806  active_minorstep_pushback.push_back(lvid);
1807  }
1808  }
1809  } // end of recv vertex programs
1810 
1811 
1812  template<typename VertexProgram>
1813  void semi_synchronous_engine<VertexProgram>::
1814  sync_vertex_data(lvid_type lvid, const size_t thread_id) {
1815  ASSERT_TRUE(graph.l_is_master(lvid));
1816  const vertex_id_type vid = graph.global_vid(lvid);
1817  local_vertex_type vertex = graph.l_vertex(lvid);
1818  foreach(const procid_t& mirror, vertex.mirrors()) {
1819  vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()), thread_id);
1820  }
1821  } // end of sync_vertex_data
1822 
1823 
1824 
1825 
1826 
1827  template<typename VertexProgram>
1828  void semi_synchronous_engine<VertexProgram>::
1829  recv_vertex_data(size_t threadid, bool try_to_recv) {
1830  rmi.dc().handle_incoming_calls(threadid, threads.size());
1831  procid_t procid(-1);
1832  typename vdata_exchange_type::buffer_type buffer;
1833  while(vdata_exchange.recv(procid, buffer, try_to_recv)) {
1834  foreach(const vid_vdata_pair_type& pair, buffer) {
1835  const lvid_type lvid = graph.local_vid(pair.first);
1836  ASSERT_FALSE(graph.l_is_master(lvid));
1837  graph.l_vertex(lvid).data() = pair.second;
1838  }
1839  }
1840  } // end of recv vertex data
1841 
1842 
1843  template<typename VertexProgram>
1844  void semi_synchronous_engine<VertexProgram>::
1845  sync_gather(lvid_type lvid, const gather_type& accum, const size_t thread_id) {
1846  if(graph.l_is_master(lvid)) {
1847  vlocks[lvid].lock();
1848  if(has_gather_accum.get(lvid)) {
1849  gather_accum[lvid] += accum;
1850  } else {
1851  gather_accum[lvid] = accum;
1852  has_gather_accum.set_bit(lvid);
1853  }
1854  vlocks[lvid].unlock();
1855  } else {
1856  const procid_t master = graph.l_master(lvid);
1857  const vertex_id_type vid = graph.global_vid(lvid);
1858  gather_exchange.send(master, std::make_pair(vid, accum), thread_id);
1859  }
1860  } // end of sync_gather
1861 
1862  template<typename VertexProgram>
1863  void semi_synchronous_engine<VertexProgram>::
1864  recv_gathers(size_t threadid, const bool try_to_recv) {
1865  rmi.dc().handle_incoming_calls(threadid, threads.size());
1866  procid_t procid(-1);
1867  typename gather_exchange_type::buffer_type buffer;
1868  while(gather_exchange.recv(procid, buffer, try_to_recv)) {
1869  foreach(const vid_gather_pair_type& pair, buffer) {
1870  const lvid_type lvid = graph.local_vid(pair.first);
1871  const gather_type& accum = pair.second;
1872  ASSERT_TRUE(graph.l_is_master(lvid));
1873  vlocks[lvid].lock();
1874  if( has_gather_accum.get(lvid) ) {
1875  gather_accum[lvid] += accum;
1876  } else {
1877  gather_accum[lvid] = accum;
1878  has_gather_accum.set_bit(lvid);
1879  }
1880  vlocks[lvid].unlock();
1881  }
1882  }
1883  } // end of recv_gather
1884 
1885 
1886  template<typename VertexProgram>
1887  void semi_synchronous_engine<VertexProgram>::
1888  sync_message(lvid_type lvid, const size_t thread_id, const message_type& msg) {
1889  ASSERT_FALSE(graph.l_is_master(lvid));
1890  const procid_t master = graph.l_master(lvid);
1891  const vertex_id_type vid = graph.global_vid(lvid);
1892  message_exchange.send(master, std::make_pair(vid, msg), thread_id);
1893  } // end of send_message
1894 
1895 
1896 
1897 
1898  template<typename VertexProgram>
1899  void semi_synchronous_engine<VertexProgram>::
1900  recv_messages(size_t threadid, const bool try_to_recv) {
1901  rmi.dc().handle_incoming_calls(threadid, threads.size());
1902  procid_t procid(-1);
1903  typename message_exchange_type::buffer_type buffer;
1904  while(message_exchange.recv(procid, buffer, try_to_recv)) {
1905  foreach(const vid_message_pair_type& pair, buffer) {
1906  internal_signal(graph.vertex(pair.first), pair.second);
1907  }
1908  }
1909  } // end of recv_messages
1910 
1911 
1912 
1913 
1914 
1915 
1916 
1917 
1918 
1919 
1920 
1921 }; // namespace
1922 
1923 
1924 #include <graphlab/macros_undef.hpp>
1925 
1926 #endif
1927