GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
synchronous_engine.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 #ifndef GRAPHLAB_SYNCHRONOUS_ENGINE_HPP
26 #define GRAPHLAB_SYNCHRONOUS_ENGINE_HPP
27 
28 #include <deque>
29 #include <boost/bind.hpp>
30 
31 #include <graphlab/engine/iengine.hpp>
32 
33 #include <graphlab/vertex_program/ivertex_program.hpp>
34 #include <graphlab/vertex_program/icontext.hpp>
35 #include <graphlab/vertex_program/context.hpp>
36 
37 #include <graphlab/engine/execution_status.hpp>
38 #include <graphlab/options/graphlab_options.hpp>
39 
40 
41 
42 
43 #include <graphlab/parallel/pthread_tools.hpp>
44 #include <graphlab/parallel/atomic_add_vector.hpp>
45 #include <graphlab/util/tracepoint.hpp>
46 #include <graphlab/util/memory_info.hpp>
47 
48 #include <graphlab/rpc/dc_dist_object.hpp>
49 #include <graphlab/rpc/distributed_event_log.hpp>
50 #include <graphlab/rpc/buffered_exchange.hpp>
51 
52 
53 
54 
55 
56 
57 #include <graphlab/macros_def.hpp>
58 
59 namespace graphlab {
60 
61 
62  /**
63  * \ingroup engines
64  *
65  * \brief The synchronous engine executes all active vertex program
66  * synchronously in a sequence of super-step (iterations) in both the
67  * shared and distributed memory settings.
68  *
69  * \tparam VertexProgram The user defined vertex program which
70  * should implement the \ref graphlab::ivertex_program interface.
71  *
72  *
73  * ### Execution Semantics
74  *
75  * On start() the \ref graphlab::ivertex_program::init function is invoked
76  * on all vertex programs in parallel to initialize the vertex program,
77  * vertex data, and possibly signal vertices.
78  * The engine then proceeds to execute a sequence of
79  * super-steps (iterations) each of which is further decomposed into a
80  * sequence of minor-steps which are also executed synchronously:
81  * \li Receive all incoming messages (signals) by invoking the
82  * \ref graphlab::ivertex_program::init function on all
83  * vertex-programs that have incoming messages. If a
84  * vertex-program does not have any incoming messages then it is
85  * not active during this super-step.
86  * \li Execute all gathers for active vertex programs by invoking
87  * the user defined \ref graphlab::ivertex_program::gather function
88  * on the edge direction returned by the
89  * \ref graphlab::ivertex_program::gather_edges function. The gather
90  * functions can modify edge data but cannot modify the vertex
91  * program or vertex data and therefore can be executed on multiple
92  * edges in parallel. The gather type is used to accumulate (sum)
93  * the result of the gather function calls.
94  * \li Execute all apply functions for active vertex-programs by
95  * invoking the user defined \ref graphlab::ivertex_program::apply
96  * function passing the sum of the gather functions. If \ref
97  * graphlab::ivertex_program::gather_edges returns no edges then
98  * the default gather value is passed to apply. The apply function
99  * can modify the vertex program and vertex data.
100  * \li Execute all scatters for active vertex programs by invoking
101  * the user defined \ref graphlab::ivertex_program::scatter function
102  * on the edge direction returned by the
103  * \ref graphlab::ivertex_program::scatter_edges function. The scatter
104  * functions can modify edge data but cannot modify the vertex
105  * program or vertex data and therefore can be executed on multiple
106  * edges in parallel.
107  *
108  * ### Construction
109  *
110  * The synchronous engine is constructed by passing in a
111  * \ref graphlab::distributed_control object which manages coordination
112  * between engine threads and a \ref graphlab::distributed_graph object
113  * which is the graph on which the engine should be run. The graph should
114  * already be populated and cannot change after the engine is constructed.
115  * In the distributed setting all program instances (running on each machine)
116  * should construct an instance of the engine at the same time.
117  *
118  * Computation is initiated by signaling vertices using either
119  * \ref graphlab::synchronous_engine::signal or
120  * \ref graphlab::synchronous_engine::signal_all. In either case all
121  * machines should invoke signal or signal all at the same time. Finally,
122  * computation is initiated by calling the
123  * \ref graphlab::synchronous_engine::start function.
124  *
125  * ### Example Usage
126  *
127  * The following is a simple example demonstrating how to use the engine:
128  * \code
129  * #include <graphlab.hpp>
130  *
131  * struct vertex_data {
132  * // code
133  * };
134  * struct edge_data {
135  * // code
136  * };
137  * typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;
138  * typedef float gather_type;
139  * struct pagerank_vprog :
140  * public graphlab::ivertex_program<graph_type, gather_type> {
141  * // code
142  * };
143  *
144  * int main(int argc, char** argv) {
145  * // Initialize control plain using mpi
146  * graphlab::mpi_tools::init(argc, argv);
147  * graphlab::distributed_control dc;
148  * // Parse command line options
149  * graphlab::command_line_options clopts("PageRank algorithm.");
150  * std::string graph_dir;
151  * clopts.attach_option("graph", &graph_dir, graph_dir,
152  * "The graph file.");
153  * if(!clopts.parse(argc, argv)) {
154  * std::cout << "Error in parsing arguments." << std::endl;
155  * return EXIT_FAILURE;
156  * }
157  * graph_type graph(dc, clopts);
158  * graph.load_structure(graph_dir, "tsv");
159  * graph.finalize();
160  * std::cout << "#vertices: " << graph.num_vertices()
161  * << " #edges:" << graph.num_edges() << std::endl;
162  * graphlab::synchronous_engine<pagerank_vprog> engine(dc, graph, clopts);
163  * engine.signal_all();
164  * engine.start();
165  * std::cout << "Runtime: " << engine.elapsed_time();
166  * graphlab::mpi_tools::finalize();
167  * }
168  * \endcode
169  *
170  *
171  *
172  * <a name=engineopts>Engine Options</a>
173  * =====================
174  * The synchronous engine supports several engine options which can
175  * be set as command line arguments using \c --engine_opts :
176  *
177  * \li <b>max_iterations</b>: (default: infinity) The maximum number
178  * of iterations (super-steps) to run.
179  *
180  * \li <b>timeout</b>: (default: infinity) The maximum time in
181  * seconds that the engine may run. When the time runs out the
182  * current iteration is completed and then the engine terminates.
183  *
184  * \li <b>use_cache</b>: (default: false) This is used to enable
185  * caching. When caching is enabled the gather phase is skipped for
186  * vertices that already have a cached value. To use caching the
187  * vertex program must either clear (\ref icontext::clear_gather_cache)
188  * or update (\ref icontext::post_delta) the cache values of
189  * neighboring vertices during the scatter phase.
190  *
191  * \li \b snapshot_interval If set to a positive value, a snapshot
192  * is taken every this number of iterations. If set to 0, a snapshot
193  * is taken before the first iteration. If set to a negative value,
194  * no snapshots are taken. Defaults to -1. A snapshot is a binary
195  * dump of the graph.
196  *
197  * \li \b snapshot_path If snapshot_interval is set to a value >=0,
198  * this option must be specified and should contain a target basename
199  * for the snapshot. The path including folder and file prefix in
200  * which the snapshots should be saved.
201  *
202  * \see graphlab::omni_engine
203  * \see graphlab::async_consistent_engine
204  * \see graphlab::semi_synchronous_engine
205  */
206  template<typename VertexProgram>
208  public iengine<VertexProgram> {
209 
210  public:
211  /**
212  * \brief The user defined vertex program type. Equivalent to the
213  * VertexProgram template argument.
214  *
215  * The user defined vertex program type which should implement the
216  * \ref graphlab::ivertex_program interface.
217  */
218  typedef VertexProgram vertex_program_type;
219 
220  /**
221  * \brief The user defined type returned by the gather function.
222  *
223  * The gather type is defined in the \ref graphlab::ivertex_program
224  * interface and is the value returned by the
225  * \ref graphlab::ivertex_program::gather function. The
226  * gather type must have an <code>operator+=(const gather_type&
227  * other)</code> function and must be \ref sec_serializable.
228  */
229  typedef typename VertexProgram::gather_type gather_type;
230 
231 
232  /**
233  * \brief The user defined message type used to signal neighboring
234  * vertex programs.
235  *
236  * The message type is defined in the \ref graphlab::ivertex_program
237  * interface and used in the call to \ref graphlab::icontext::signal.
238  * The message type must have an
239  * <code>operator+=(const gather_type& other)</code> function and
240  * must be \ref sec_serializable.
241  */
242  typedef typename VertexProgram::message_type message_type;
243 
244  /**
245  * \brief The type of data associated with each vertex in the graph
246  *
247  * The vertex data type must be \ref sec_serializable.
248  */
249  typedef typename VertexProgram::vertex_data_type vertex_data_type;
250 
251  /**
252  * \brief The type of data associated with each edge in the graph
253  *
254  * The edge data type must be \ref sec_serializable.
255  */
256  typedef typename VertexProgram::edge_data_type edge_data_type;
257 
258  /**
259  * \brief The type of graph supported by this vertex program
260  *
261  * See graphlab::distributed_graph
262  */
263  typedef typename VertexProgram::graph_type graph_type;
264 
265  /**
266  * \brief The type used to represent a vertex in the graph.
267  * See \ref graphlab::distributed_graph::vertex_type for details
268  *
269  * The vertex type contains the function
270  * \ref graphlab::distributed_graph::vertex_type::data which
271  * returns a reference to the vertex data as well as other functions
272  * like \ref graphlab::distributed_graph::vertex_type::num_in_edges
273  * which returns the number of in edges.
274  *
275  */
277 
278  /**
279  * \brief The type used to represent an edge in the graph.
280  * See \ref graphlab::distributed_graph::edge_type for details.
281  *
282  * The edge type contains the function
283  * \ref graphlab::distributed_graph::edge_type::data which returns a
284  * reference to the edge data. In addition the edge type contains
285  * the function \ref graphlab::distributed_graph::edge_type::source and
286  * \ref graphlab::distributed_graph::edge_type::target.
287  *
288  */
290 
291  /**
292  * \brief The type of the callback interface passed by the engine to vertex
293  * programs. See \ref graphlab::icontext for details.
294  *
295  * The context callback is passed to the vertex program functions and is
296  * used to signal other vertices, get the current iteration, and access
297  * information about the engine.
298  */
300 
301  private:
302 
303  /**
304  * \brief Local vertex type used by the engine for fast indexing
305  */
306  typedef typename graph_type::local_vertex_type local_vertex_type;
307 
308  /**
309  * \brief Local edge type used by the engine for fast indexing
310  */
311  typedef typename graph_type::local_edge_type local_edge_type;
312 
313  /**
314  * \brief Local vertex id type used by the engine for fast indexing
315  */
316  typedef typename graph_type::lvid_type lvid_type;
317 
318  std::vector<double> per_thread_compute_time;
319  /**
320  * \brief The actual instance of the context type used by this engine.
321  */
323  friend class context<synchronous_engine>;
324 
325 
326  /**
327  * \brief The type of the distributed aggregator inherited from iengine
328  */
330 
331  /**
332  * \brief The object used to communicate with remote copies of the
333  * synchronous engine.
334  */
336 
337  /**
338  * \brief A reference to the distributed graph on which this
339  * synchronous engine is running.
340  */
341  graph_type& graph;
342 
343  /**
344  * \brief The local worker threads used by this engine
345  */
346  thread_pool threads;
347 
348  /**
349  * \brief A thread barrier that is used to control the threads in the
350  * thread pool.
351  */
352  graphlab::barrier thread_barrier;
353 
354  /**
355  * \brief The maximum number of super-steps (iterations) to run
356  * before terminating. If the max iterations is reached the
357  * engine will terminate if their are no messages remaining.
358  */
359  size_t max_iterations;
360 
361  /**
362  * \brief A snapshot is taken every this number of iterations.
363  * If snapshot_interval == 0, a snapshot is only taken before the first
364  * iteration. If snapshot_interval < 0, no snapshots are taken.
365  */
366  int snapshot_interval;
367 
368  /// \brief The target base name the snapshot is saved in.
369  std::string snapshot_path;
370 
371  /**
372  * \brief A counter that tracks the current iteration number since
373  * start was last invoked.
374  */
375  size_t iteration_counter;
376 
377  /**
378  * \brief The time in seconds at which the engine started.
379  */
380  float start_time;
381 
382  /**
383  * \brief The timeout time in seconds
384  */
385  float timeout;
386 
387  /**
388  * \brief Schedules all vertices every iteration
389  */
390  bool sched_allv;
391 
392  /**
393  * \brief Used to stop the engine prematurely
394  */
395  bool force_abort;
396 
397  /**
398  * \brief The vertex locks protect access to vertex specific
399  * data-structures including
400  * \ref graphlab::synchronous_engine::gather_accum
401  * and \ref graphlab::synchronous_engine::messages.
402  */
403  std::vector<simple_spinlock> vlocks;
404 
405 
406  /**
407  * \brief The elocks protect individual edges during gather and
408  * scatter. Technically there is a potential race since gather
409  * and scatter can modify edge values and can overlap. The edge
410  * lock ensures that only one gather or scatter occurs on an edge
411  * at a time.
412  */
413  std::vector<simple_spinlock> elocks;
414 
415 
416 
417  /**
418  * \brief The vertex programs associated with each vertex on this
419  * machine.
420  */
421  std::vector<vertex_program_type> vertex_programs;
422 
423  /**
424  * \brief Vector of messages associated with each vertex.
425  */
426  std::vector<message_type> messages;
427 
428  /**
429  * \brief Bit indicating whether a message is present for each vertex.
430  */
431  dense_bitset has_message;
432 
433 
434  /**
435  * \brief Gather accumulator used for each master vertex to merge
436  * the result of all the machine specific accumulators (or
437  * caches).
438  *
439  * The gather accumulator can be accessed by multiple threads at
440  * once and therefore must be guarded by a vertex locks in
441  * \ref graphlab::synchronous_engine::vlocks
442  */
443  std::vector<gather_type> gather_accum;
444 
445  /**
446  * \brief Bit indicating if the gather has accumulator contains any
447  * values.
448  *
449  * While dense bitsets are thread safe the value of this bit must
450  * change concurrently with the
451  * \ref graphlab::synchronous_engine::gather_accum and therefore is
452  * set while holding the lock in
453  * \ref graphlab::synchronous_engine::vlocks.
454  */
455  dense_bitset has_gather_accum;
456 
457 
458  /**
459  * \brief This optional vector contains caches of previous gather
460  * contributions for each machine.
461  *
462  * Caching is done locally and therefore a high-degree vertex may
463  * have multiple caches (one per machine).
464  */
465  std::vector<gather_type> gather_cache;
466 
467  /**
468  * \brief A bit indicating if the local gather for that vertex is
469  * available.
470  */
471  dense_bitset has_cache;
472 
473  /**
474  * \brief A bit (for master vertices) indicating if that vertex is active
475  * (received a message on this iteration).
476  */
477  dense_bitset active_superstep;
478 
479  /**
480  * \brief The number of local vertices (masters) that are active on this
481  * iteration.
482  */
483  atomic<size_t> num_active_vertices;
484 
485  /**
486  * \brief A bit indicating (for all vertices) whether to
487  * participate in the current minor-step (gather or scatter).
488  */
489  dense_bitset active_minorstep;
490 
491  /**
492  * \brief A counter measuring the number of applys that have been completed
493  */
494  atomic<size_t> completed_applys;
495 
496 
497  /**
498  * \brief The shared counter used coordinate operations between
499  * threads.
500  */
501  atomic<size_t> shared_lvid_counter;
502 
503 
504  /**
505  * \brief The pair type used to synchronize vertex programs across machines.
506  */
507  typedef std::pair<vertex_id_type, vertex_program_type> vid_prog_pair_type;
508 
509  /**
510  * \brief The type of the exchange used to synchronize vertex programs
511  */
512  typedef buffered_exchange<vid_prog_pair_type> vprog_exchange_type;
513 
514  /**
515  * \brief The distributed exchange used to synchronize changes to
516  * vertex programs.
517  */
518  vprog_exchange_type vprog_exchange;
519 
520  /**
521  * \brief The pair type used to synchronize vertex across across machines.
522  */
523  typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
524 
525  /**
526  * \brief The type of the exchange used to synchronize vertex data
527  */
528  typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
529 
530  /**
531  * \brief The distributed exchange used to synchronize changes to
532  * vertex programs.
533  */
534  vdata_exchange_type vdata_exchange;
535 
536  /**
537  * \brief The pair type used to synchronize the results of the gather phase
538  */
539  typedef std::pair<vertex_id_type, gather_type> vid_gather_pair_type;
540 
541  /**
542  * \brief The type of the exchange used to synchronize gather
543  * accumulators
544  */
545  typedef buffered_exchange<vid_gather_pair_type> gather_exchange_type;
546 
547  /**
548  * \brief The distributed exchange used to synchronize gather
549  * accumulators.
550  */
551  gather_exchange_type gather_exchange;
552 
553  /**
554  * \brief The pair type used to synchronize messages
555  */
556  typedef std::pair<vertex_id_type, message_type> vid_message_pair_type;
557 
558  /**
559  * \brief The type of the exchange used to synchronize messages
560  */
561  typedef buffered_exchange<vid_message_pair_type> message_exchange_type;
562 
563  /**
564  * \brief The distributed exchange used to synchronize messages
565  */
566  message_exchange_type message_exchange;
567 
568 
569  /**
570  * \brief The distributed aggregator used to manage background
571  * aggregation.
572  */
573  aggregator_type aggregator;
574 
575  DECLARE_EVENT(EVENT_APPLIES);
576  DECLARE_EVENT(EVENT_GATHERS);
577  DECLARE_EVENT(EVENT_SCATTERS);
578  DECLARE_EVENT(EVENT_ACTIVE_CPUS);
579  public:
580 
581  /**
582  * \brief Construct a synchronous engine for a given graph and options.
583  *
584  * The synchronous engine should be constructed after the graph
585  * has been loaded (e.g., \ref graphlab::distributed_graph::load)
586  * and the graphlab options have been set
587  * (e.g., \ref graphlab::command_line_options).
588  *
589  * In the distributed engine the synchronous engine must be called
590  * on all machines at the same time (in the same order) passing
591  * the \ref graphlab::distributed_control object. Upon
592  * construction the synchronous engine allocates several
593  * data-structures to store messages, gather accumulants, and
594  * vertex programs and therefore may require considerable memory.
595  *
596  * The number of threads to create are read from
597  * \ref graphlab_options::get_ncpus "opts.get_ncpus()".
598  *
599  * See the <a href="#engineopts">main class documentation</a>
600  * for details on the available options.
601  *
602  * @param [in] dc Distributed controller to associate with
603  * @param [in,out] graph A reference to the graph object that this
604  * engine will modify. The graph must be fully constructed and
605  * finalized.
606  * @param [in] opts A graphlab::graphlab_options object specifying engine
607  * parameters. This is typically constructed using
608  * \ref graphlab::command_line_options.
609  */
611  const graphlab_options& opts = graphlab_options());
612 
613 
614  /**
615  * \brief Start execution of the synchronous engine.
616  *
617  * The start function begins computation and does not return until
618  * there are no remaining messages or until max_iterations has
619  * been reached.
620  *
621  * The start() function modifies the data graph through the vertex
622  * programs and so upon return the data graph should contain the
623  * result of the computation.
624  *
625  * @return The reason for termination
626  */
628 
629  // documentation inherited from iengine
630  size_t num_updates() const;
631 
632  // documentation inherited from iengine
633  void signal(vertex_id_type vid,
634  const message_type& message = message_type());
635 
636  // documentation inherited from iengine
637  void signal_all(const message_type& message = message_type(),
638  const std::string& order = "shuffle");
639 
640  void signal_vset(const vertex_set& vset,
641  const message_type& message = message_type(),
642  const std::string& order = "shuffle");
643 
644 
645  // documentation inherited from iengine
646  float elapsed_seconds() const;
647 
648  /**
649  * \brief Get the current iteration number since start was last
650  * invoked.
651  *
652  * \return the current iteration
653  */
654  int iteration() const;
655 
656 
657  /**
658  * \brief Compute the total memory used by the entire distributed
659  * system.
660  *
661  * @return The total memory used in bytes.
662  */
663  size_t total_memory_usage() const;
664 
665  /**
666  * \brief Get a pointer to the distributed aggregator object.
667  *
668  * This is currently used by the \ref graphlab::iengine interface to
669  * implement the calls to aggregation.
670  *
671  * @return a pointer to the local aggregator.
672  */
674 
675  private:
676 
677  /**
678  * \brief This internal stop function is called by the \ref graphlab::context to
679  * terminate execution of the engine.
680  */
681  void internal_stop();
682 
683  /**
684  * \brief This function is called remote by the rpc to force the
685  * engine to stop.
686  */
687  void rpc_stop();
688 
689  /**
690  * \brief Signal a vertex.
691  *
692  * This function is called by the \ref graphlab::context.
693  *
694  * @param [in] vertex the vertex to signal
695  * @param [in] message the message to send to that vertex.
696  */
697  void internal_signal(const vertex_type& vertex,
698  const message_type& message = message_type());
699 
700  /**
701  * \brief Called by the context to signal an arbitrary vertex.
702  * This must be done by finding the owner of that vertex.
703  *
704  * @param [in] gvid the global vertex id of the vertex to signal
705  * @param [in] message the message to send to that vertex.
706  */
707  void internal_signal_broadcast(vertex_id_type gvid,
708  const message_type& message = message_type());
709 
710  /**
711  * \brief This function tests if this machine is the master of
712  * gvid and signals if successful.
713  */
714  void internal_signal_rpc(vertex_id_type gvid,
715  const message_type& message = message_type());
716 
717 
718  /**
719  * \brief Post a to a previous gather for a give vertex.
720  *
721  * This function is called by the \ref graphlab::context.
722  *
723  * @param [in] vertex The vertex to which to post a change in the sum
724  * @param [in] delta The change in that sum
725  */
726  void internal_post_delta(const vertex_type& vertex,
727  const gather_type& delta);
728 
729  /**
730  * \brief Clear the cached gather for a vertex if one is
731  * available.
732  *
733  * This function is called by the \ref graphlab::context.
734  *
735  * @param [in] vertex the vertex for which to clear the cache
736  */
737  void internal_clear_gather_cache(const vertex_type& vertex);
738 
739 
740  // Program Steps ==========================================================
741 
742 
743  void thread_launch_wrapped_event_counter(boost::function<void(void)> fn) {
744  INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
745  fn();
746  DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
747  }
748 
749  /**
750  * \brief Executes ncpus copies of a member function each with a
751  * unique consecutive id (thread id).
752  *
753  * This function is used by the main loop to execute each of the
754  * stages in parallel.
755  *
756  * The member function must have the type:
757  *
758  * \code
759  * void synchronous_engine::member_fun(size_t threadid);
760  * \endcode
761  *
762  * This function runs an rmi barrier after termination
763  *
764  * @tparam the type of the member function.
765  * @param [in] member_fun the function to call.
766  */
767  template<typename MemberFunction>
768  void run_synchronous(MemberFunction member_fun) {
769  shared_lvid_counter = 0;
770  if (threads.size() <= 1) {
771  INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
772  ( (this)->*(member_fun))(0);
773  DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
774  }
775  else {
776  // launch the initialization threads
777  for(size_t i = 0; i < threads.size(); ++i) {
778  boost::function<void(void)> invoke = boost::bind(member_fun, this, i);
779  threads.launch(boost::bind(
780  &synchronous_engine::thread_launch_wrapped_event_counter,
781  this,
782  invoke), i);
783  }
784  }
785  // Wait for all threads to finish
786  threads.join();
787  rmi.barrier();
788  } // end of run_synchronous
789 
790  // /**
791  // * \brief Initialize all vertex programs by invoking
792  // * \ref graphlab::ivertex_program::init on all vertices.
793  // *
794  // * @param thread_id the thread to run this as which determines
795  // * which vertices to process.
796  // */
797  // void initialize_vertex_programs(size_t thread_id);
798 
799  /**
800  * \brief Synchronize all message data.
801  *
802  * @param thread_id the thread to run this as which determines
803  * which vertices to process.
804  */
805  void exchange_messages(size_t thread_id);
806 
807 
808  /**
809  * \brief Invoke the \ref graphlab::ivertex_program::init function
810  * on all vertex programs that have inbound messages.
811  *
812  * @param thread_id the thread to run this as which determines
813  * which vertices to process.
814  */
815  void receive_messages(size_t thread_id);
816 
817 
818  /**
819  * \brief Execute the \ref graphlab::ivertex_program::gather function on all
820  * vertices that received messages for the edges specified by the
821  * \ref graphlab::ivertex_program::gather_edges.
822  *
823  * @param thread_id the thread to run this as which determines
824  * which vertices to process.
825  */
826  void execute_gathers(size_t thread_id);
827 
828 
829 
830 
831  /**
832  * \brief Execute the \ref graphlab::ivertex_program::apply function on all
833  * all vertices that received messages in this super-step (active).
834  *
835  * @param thread_id the thread to run this as which determines
836  * which vertices to process.
837  */
838  void execute_applys(size_t thread_id);
839 
840  /**
841  * \brief Execute the \ref graphlab::ivertex_program::scatter function on all
842  * vertices that received messages for the edges specified by the
843  * \ref graphlab::ivertex_program::scatter_edges.
844  *
845  * @param thread_id the thread to run this as which determines
846  * which vertices to process.
847  */
848  void execute_scatters(size_t thread_id);
849 
850  // Data Synchronization ===================================================
851  /**
852  * \brief Send the vertex program for the local vertex id to all
853  * of its mirrors.
854  *
855  * @param [in] lvid the vertex to sync. This muster must be the
856  * master of that vertex.
857  */
858  void sync_vertex_program(lvid_type lvid, size_t thread_id);
859 
860  /**
861  * \brief Receive all incoming vertex programs and update the
862  * local mirrors.
863  *
864  * This function returns when there are no more incoming vertex
865  * programs and should be called after a flush of the vertex
866  * program exchange.
867  */
868  void recv_vertex_programs(const bool try_to_recv = false);
869 
870  /**
871  * \brief Send the vertex data for the local vertex id to all of
872  * its mirrors.
873  *
874  * @param [in] lvid the vertex to sync. This machine must be the master
875  * of that vertex.
876  */
877  void sync_vertex_data(lvid_type lvid, size_t thread_id);
878 
879  /**
880  * \brief Receive all incoming vertex data and update the local
881  * mirrors.
882  *
883  * This function returns when there are no more incoming vertex
884  * data and should be called after a flush of the vertex data
885  * exchange.
886  */
887  void recv_vertex_data(const bool try_to_recv = false);
888 
889  /**
890  * \brief Send the gather value for the vertex id to its master.
891  *
892  * @param [in] lvid the vertex to send the gather value to
893  * @param [in] accum the locally computed gather value.
894  */
895  void sync_gather(lvid_type lvid, const gather_type& accum,
896  size_t thread_id);
897 
898 
899  /**
900  * \brief Receive the gather values from the buffered exchange.
901  *
902  * This function returns when there is nothing left in the
903  * buffered exchange and should be called after the buffered
904  * exchange has been flushed
905  */
906  void recv_gathers(const bool try_to_recv = false);
907 
908  /**
909  * \brief Send the accumulated message for the local vertex to its
910  * master.
911  *
912  * @param [in] lvid the vertex to send
913  */
914  void sync_message(lvid_type lvid, const size_t thread_id);
915 
916  /**
917  * \brief Receive the messages from the buffered exchange.
918  *
919  * This function returns when there is nothing left in the
920  * buffered exchange and should be called after the buffered
921  * exchange has been flushed
922  */
923  void recv_messages(const bool try_to_recv = false);
924 
925 
926  }; // end of class synchronous engine
927 
928 
929 
930 
931 
932 
933 
934 
935 
936 
937 
938 
939 
940 
941 
942 
943 
944 
945 
946 
947 
948 
949 
950 
951 
952  /**
953  * Constructs an synchronous distributed engine.
954  * The number of threads to create are read from
955  * opts::get_ncpus().
956  *
957  * Valid engine options (graphlab_options::get_engine_args()):
958  * \arg \c max_iterations Sets the maximum number of iterations the
959  * engine will run for.
960  * \arg \c use_cache If set to true, partial gathers are cached.
961  * See \ref gather_caching to understand the behavior of the
962  * gather caching model and how it may be used to accelerate program
963  * performance.
964  *
965  * \param dc Distributed controller to associate with
966  * \param graph The graph to schedule over. The graph must be fully
967  * constructed and finalized.
968  * \param opts A graphlab_options object containing options and parameters
969  * for the engine.
970  */
971  template<typename VertexProgram>
974  graph_type& graph,
975  const graphlab_options& opts) :
976  rmi(dc, this), graph(graph),
977  threads(opts.get_ncpus()),
978  thread_barrier(opts.get_ncpus()),
979  max_iterations(-1), snapshot_interval(-1), iteration_counter(0),
980  timeout(0), sched_allv(false),
981  vprog_exchange(dc, opts.get_ncpus(), 64 * 1024),
982  vdata_exchange(dc, opts.get_ncpus(), 64 * 1024),
983  gather_exchange(dc, opts.get_ncpus(), 64 * 1024),
984  message_exchange(dc, opts.get_ncpus(), 64 * 1024),
985  aggregator(dc, graph, new context_type(*this, graph)) {
986  // Process any additional options
987  std::vector<std::string> keys = opts.get_engine_args().get_option_keys();
988  per_thread_compute_time.resize(opts.get_ncpus());
989  bool use_cache = false;
990  foreach(std::string opt, keys) {
991  if (opt == "max_iterations") {
992  opts.get_engine_args().get_option("max_iterations", max_iterations);
993  if (rmi.procid() == 0)
994  logstream(LOG_EMPH) << "Engine Option: max_iterations = "
995  << max_iterations << std::endl;
996  } else if (opt == "timeout") {
997  opts.get_engine_args().get_option("timeout", timeout);
998  if (rmi.procid() == 0)
999  logstream(LOG_EMPH) << "Engine Option: timeout = "
1000  << timeout << std::endl;
1001  } else if (opt == "use_cache") {
1002  opts.get_engine_args().get_option("use_cache", use_cache);
1003  if (rmi.procid() == 0)
1004  logstream(LOG_EMPH) << "Engine Option: use_cache = "
1005  << use_cache << std::endl;
1006  } else if (opt == "snapshot_interval") {
1007  opts.get_engine_args().get_option("snapshot_interval", snapshot_interval);
1008  if (rmi.procid() == 0)
1009  logstream(LOG_EMPH) << "Engine Option: snapshot_interval = "
1010  << snapshot_interval << std::endl;
1011  } else if (opt == "snapshot_path") {
1012  opts.get_engine_args().get_option("snapshot_path", snapshot_path);
1013  if (rmi.procid() == 0)
1014  logstream(LOG_EMPH) << "Engine Option: snapshot_path = "
1015  << snapshot_path << std::endl;
1016  } else if (opt == "sched_allv") {
1017  opts.get_engine_args().get_option("sched_allv", sched_allv);
1018  if (rmi.procid() == 0)
1019  logstream(LOG_EMPH) << "Engine Option: sched_allv = "
1020  << sched_allv << std::endl;
1021  } else {
1022  logstream(LOG_FATAL) << "Unexpected Engine Option: " << opt << std::endl;
1023  }
1024  }
1025 
1026  if (snapshot_interval >= 0 && snapshot_path.length() == 0) {
1027  logstream(LOG_FATAL)
1028  << "Snapshot interval specified, but no snapshot path" << std::endl;
1029  }
1030  INITIALIZE_EVENT_LOG(dc);
1031  ADD_CUMULATIVE_EVENT(EVENT_APPLIES, "Applies", "Calls");
1032  ADD_CUMULATIVE_EVENT(EVENT_GATHERS , "Gathers", "Calls");
1033  ADD_CUMULATIVE_EVENT(EVENT_SCATTERS , "Scatters", "Calls");
1034  ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS, "Active Threads", "Threads");
1035 
1036  // Finalize the graph
1037  graph.finalize();
1038  memory_info::log_usage("Before Engine Initialization");
1039  // Allocate vertex locks and vertex programs
1040  vlocks.resize(graph.num_local_vertices());
1041  vertex_programs.resize(graph.num_local_vertices());
1042  // allocate the edge locks
1043  //elocks.resize(graph.num_local_edges());
1044  // Allocate messages and message bitset
1045  messages.resize(graph.num_local_vertices(), message_type());
1046  has_message.resize(graph.num_local_vertices());
1047  has_message.clear();
1048  // Allocate gather accumulators and accumulator bitset
1049  gather_accum.resize(graph.num_local_vertices(), gather_type());
1050  has_gather_accum.resize(graph.num_local_vertices());
1051  has_gather_accum.clear();
1052  // If caching is used then allocate cache data-structures
1053  if (use_cache) {
1054  gather_cache.resize(graph.num_local_vertices(), gather_type());
1055  has_cache.resize(graph.num_local_vertices());
1056  has_cache.clear();
1057  }
1058  // Allocate bitset to track active vertices on each bitset.
1059  active_superstep.resize(graph.num_local_vertices());
1060  active_superstep.clear();
1061  active_minorstep.resize(graph.num_local_vertices());
1062  active_minorstep.clear();
1063  // Print memory usage after initialization
1064  memory_info::log_usage("After Engine Initialization");
1065  rmi.barrier();
1066  } // end of synchronous engine
1067 
1068 
1069 
1070 
1071 
1072 
1073 
1074  template<typename VertexProgram>
1075  typename synchronous_engine<VertexProgram>::aggregator_type*
1077  return &aggregator;
1078  } // end of get_aggregator
1079 
1080 
1081 
1082  template<typename VertexProgram>
1084  for (size_t i = 0; i < rmi.numprocs(); ++i)
1085  rmi.remote_call(i, &synchronous_engine<VertexProgram>::rpc_stop);
1086  } // end of internal_stop
1087 
1088  template<typename VertexProgram>
1089  void synchronous_engine<VertexProgram>::rpc_stop() {
1090  force_abort = true;
1091  } // end of rpc_stop
1092 
1093 
1094  template<typename VertexProgram>
1095  void synchronous_engine<VertexProgram>::
1096  signal(vertex_id_type gvid, const message_type& message) {
1097  rmi.barrier();
1098  internal_signal_rpc(gvid, message);
1099  rmi.barrier();
1100  } // end of signal
1101 
1102 
1103 
1104  template<typename VertexProgram>
1105  void synchronous_engine<VertexProgram>::
1106  signal_all(const message_type& message, const std::string& order) {
1107  for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1108  if(graph.l_is_master(lvid)) {
1109  internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1110  }
1111  }
1112  } // end of signal all
1113 
1114 
1115  template<typename VertexProgram>
1116  void synchronous_engine<VertexProgram>::
1117  signal_vset(const vertex_set& vset,
1118  const message_type& message, const std::string& order) {
1119  for(lvid_type lvid = 0; lvid < graph.num_local_vertices(); ++lvid) {
1120  if(graph.l_is_master(lvid) && vset.l_contains(lvid)) {
1121  internal_signal(vertex_type(graph.l_vertex(lvid)), message);
1122  }
1123  }
1124  } // end of signal all
1125 
1126 
1127  template<typename VertexProgram>
1128  void synchronous_engine<VertexProgram>::
1129  internal_signal(const vertex_type& vertex,
1130  const message_type& message) {
1131  const lvid_type lvid = vertex.local_id();
1132  vlocks[lvid].lock();
1133  if( has_message.get(lvid) ) {
1134  messages[lvid] += message;
1135  } else {
1136  messages[lvid] = message;
1137  has_message.set_bit(lvid);
1138  }
1139  vlocks[lvid].unlock();
1140  } // end of internal_signal
1141 
1142 
1143  template<typename VertexProgram>
1144  void synchronous_engine<VertexProgram>::
1145  internal_signal_broadcast(vertex_id_type gvid, const message_type& message) {
1146  for (size_t i = 0; i < rmi.numprocs(); ++i) {
1147  if(i == rmi.procid()) internal_signal_rpc(gvid, message);
1148  else rmi.remote_call(i, &synchronous_engine<VertexProgram>::internal_signal_rpc,
1149  gvid, message);
1150  }
1151  } // end of internal_signal_broadcast
1152 
1153  template<typename VertexProgram>
1154  void synchronous_engine<VertexProgram>::
1155  internal_signal_rpc(vertex_id_type gvid,
1156  const message_type& message) {
1157  if (graph.is_master(gvid)) {
1158  internal_signal(graph.vertex(gvid), message);
1159  }
1160  } // end of internal_signal_rpc
1161 
1162 
1163 
1164 
1165 
1166  template<typename VertexProgram>
1167  void synchronous_engine<VertexProgram>::
1168  internal_post_delta(const vertex_type& vertex, const gather_type& delta) {
1169  const bool caching_enabled = !gather_cache.empty();
1170  if(caching_enabled) {
1171  const lvid_type lvid = vertex.local_id();
1172  vlocks[lvid].lock();
1173  if( has_cache.get(lvid) ) {
1174  gather_cache[lvid] += delta;
1175  } else {
1176  // You cannot add a delta to an empty cache. A complete
1177  // gather must have been run.
1178  // gather_cache[lvid] = delta;
1179  // has_cache.set_bit(lvid);
1180  }
1181  vlocks[lvid].unlock();
1182  }
1183  } // end of post_delta
1184 
1185 
1186  template<typename VertexProgram>
1187  void synchronous_engine<VertexProgram>::
1188  internal_clear_gather_cache(const vertex_type& vertex) {
1189  const bool caching_enabled = !gather_cache.empty();
1190  const lvid_type lvid = vertex.local_id();
1191  if(caching_enabled && has_cache.get(lvid)) {
1192  vlocks[lvid].lock();
1193  gather_cache[lvid] = gather_type();
1194  has_cache.clear_bit(lvid);
1195  vlocks[lvid].unlock();
1196  }
1197  } // end of clear_gather_cache
1198 
1199 
1200 
1201 
1202  template<typename VertexProgram>
1204  num_updates() const { return completed_applys.value; }
1205 
1206  template<typename VertexProgram>
1208  elapsed_seconds() const { return timer::approx_time_seconds() - start_time; }
1209 
1210  template<typename VertexProgram>
1212  iteration() const { return iteration_counter; }
1213 
1214 
1215 
1216  template<typename VertexProgram>
1218  size_t allocated_memory = memory_info::allocated_bytes();
1219  rmi.all_reduce(allocated_memory);
1220  return allocated_memory;
1221  } // compute the total memory usage of the GraphLab system
1222 
1223 
1224  template<typename VertexProgram> execution_status::status_enum
1226  rmi.barrier();
1227  graph.finalize();
1228  // Initialization code ==================================================
1229  // Reset event log counters?
1230  // Start the timer
1231  graphlab::timer timer; timer.start();
1232  start_time = timer::approx_time_seconds();
1233  iteration_counter = 0;
1234  force_abort = false;
1235  execution_status::status_enum termination_reason =
1236  execution_status::UNSET;
1237  // if (perform_init_vtx_program) {
1238  // // Initialize all vertex programs
1239  // run_synchronous( &synchronous_engine::initialize_vertex_programs );
1240  // }
1241  aggregator.start();
1242  rmi.barrier();
1243  if (snapshot_interval == 0) {
1244  graph.save_binary(snapshot_path);
1245  }
1246 
1247  float last_print = -5;
1248  if (rmi.procid() == 0) {
1249  logstream(LOG_EMPH) << "Iteration counter will only output every 5 seconds."
1250  << std::endl;
1251  }
1252  // Program Main loop ====================================================
1253  while(iteration_counter < max_iterations && !force_abort ) {
1254 
1255  // Check first to see if we are out of time
1256  if(timeout != 0 && timeout < elapsed_seconds()) {
1257  termination_reason = execution_status::TIMEOUT;
1258  break;
1259  }
1260 
1261  bool print_this_round = (elapsed_seconds() - last_print) >= 5;
1262 
1263  if(rmi.procid() == 0 && print_this_round) {
1264  logstream(LOG_EMPH)
1265  << rmi.procid() << ": Starting iteration: " << iteration_counter
1266  << std::endl;
1267  last_print = elapsed_seconds();
1268  }
1269  // Reset Active vertices ----------------------------------------------
1270  // Clear the active super-step and minor-step bits which will
1271  // be set upon receiving messages
1272  active_superstep.clear(); active_minorstep.clear();
1273  has_gather_accum.clear();
1274  rmi.barrier();
1275 
1276  // Exchange Messages --------------------------------------------------
1277  // Exchange any messages in the local message vectors
1278  // if (rmi.procid() == 0) std::cout << "Exchange messages..." << std::endl;
1279  run_synchronous( &synchronous_engine::exchange_messages );
1280  /**
1281  * Post conditions:
1282  * 1) only master vertices have messages
1283  */
1284 
1285  // Receive Messages ---------------------------------------------------
1286  // Receive messages to master vertices and then synchronize
1287  // vertex programs with mirrors if gather is required
1288  //
1289 
1290  // if (rmi.procid() == 0) std::cout << "Receive messages..." << std::endl;
1291  num_active_vertices = 0;
1292  run_synchronous( &synchronous_engine::receive_messages );
1293  if (sched_allv) {
1294  active_minorstep.fill();
1295  }
1296  has_message.clear();
1297  /**
1298  * Post conditions:
1299  * 1) there are no messages remaining
1300  * 2) All masters that received messages have their
1301  * active_superstep bit set
1302  * 3) All masters and mirrors that are to participate in the
1303  * next gather phases have their active_minorstep bit
1304  * set.
1305  * 4) num_active_vertices is the number of vertices that
1306  * received messages.
1307  */
1308 
1309  // Check termination condition ---------------------------------------
1310  size_t total_active_vertices = num_active_vertices;
1311  rmi.all_reduce(total_active_vertices);
1312  if (rmi.procid() == 0 && print_this_round)
1313  logstream(LOG_EMPH)
1314  << "\tActive vertices: " << total_active_vertices << std::endl;
1315  if(total_active_vertices == 0 ) {
1316  termination_reason = execution_status::TASK_DEPLETION;
1317  break;
1318  }
1319 
1320 
1321  // Execute gather operations-------------------------------------------
1322  // Execute the gather operation for all vertices that are active
1323  // in this minor-step (active-minorstep bit set).
1324  // if (rmi.procid() == 0) std::cout << "Gathering..." << std::endl;
1325  run_synchronous( &synchronous_engine::execute_gathers );
1326  // Clear the minor step bit since only super-step vertices
1327  // (only master vertices are required to participate in the
1328  // apply step)
1329  active_minorstep.clear(); // rmi.barrier();
1330  /**
1331  * Post conditions:
1332  * 1) gather_accum for all master vertices contains the
1333  * result of all the gathers (even if they are drawn from
1334  * cache)
1335  * 2) No minor-step bits are set
1336  */
1337 
1338  // Execute Apply Operations -------------------------------------------
1339  // Run the apply function on all active vertices
1340  // if (rmi.procid() == 0) std::cout << "Applying..." << std::endl;
1341  run_synchronous( &synchronous_engine::execute_applys );
1342  /**
1343  * Post conditions:
1344  * 1) any changes to the vertex data have been synchronized
1345  * with all mirrors.
1346  * 2) all gather accumulators have been cleared
1347  * 3) If a vertex program is participating in the scatter
1348  * phase its minor-step bit has been set to active (both
1349  * masters and mirrors) and the vertex program has been
1350  * synchronized with the mirrors.
1351  */
1352 
1353 
1354  // Execute Scatter Operations -----------------------------------------
1355  // Execute each of the scatters on all minor-step active vertices.
1356  run_synchronous( &synchronous_engine::execute_scatters );
1357  /**
1358  * Post conditions:
1359  * 1) NONE
1360  */
1361  if(rmi.procid() == 0 && print_this_round)
1362  logstream(LOG_EMPH) << "\t Running Aggregators" << std::endl;
1363  // probe the aggregator
1364  aggregator.tick_synchronous();
1365 
1366  ++iteration_counter;
1367 
1368  if (snapshot_interval > 0 && iteration_counter % snapshot_interval == 0) {
1369  graph.save_binary(snapshot_path);
1370  }
1371  }
1372 
1373  if (rmi.procid() == 0) {
1374  logstream(LOG_EMPH) << iteration_counter
1375  << " iterations completed." << std::endl;
1376  }
1377  // Final barrier to ensure that all engines terminate at the same time
1378  double total_compute_time = 0;
1379  for (size_t i = 0;i < per_thread_compute_time.size(); ++i) {
1380  total_compute_time += per_thread_compute_time[i];
1381  }
1382  std::vector<double> all_compute_time_vec(rmi.numprocs());
1383  all_compute_time_vec[rmi.procid()] = total_compute_time;
1384  rmi.all_gather(all_compute_time_vec);
1385 
1386  size_t global_completed = completed_applys;
1387  rmi.all_reduce(global_completed);
1388  completed_applys = global_completed;
1389  rmi.cout() << "Updates: " << completed_applys.value << "\n";
1390  if (rmi.procid() == 0) {
1391  logstream(LOG_INFO) << "Compute Balance: ";
1392  for (size_t i = 0;i < all_compute_time_vec.size(); ++i) {
1393  logstream(LOG_INFO) << all_compute_time_vec[i] << " ";
1394  }
1395  logstream(LOG_INFO) << std::endl;
1396  }
1397  rmi.full_barrier();
1398  // Stop the aggregator
1399  aggregator.stop();
1400  // return the final reason for termination
1401  return termination_reason;
1402  } // end of start
1403 
1404 
1405 
1406  template<typename VertexProgram>
1408  exchange_messages(const size_t thread_id) {
1409  context_type context(*this, graph);
1410  const bool TRY_TO_RECV = true;
1411  const size_t TRY_RECV_MOD = 100;
1412  size_t vcount = 0;
1414  // for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
1415  // lvid += threads.size()) {
1416  while (1) {
1417  // increment by a word at a time
1418  lvid_type lvid_block_start =
1419  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1420  if (lvid_block_start >= graph.num_local_vertices()) break;
1421  // get the bit field from has_message
1422  size_t lvid_bit_block = has_message.containing_word(lvid_block_start);
1423  if (lvid_bit_block == 0) continue;
1424  // initialize a word sized bitfield
1425  local_bitset.clear();
1426  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1427  foreach(size_t lvid_block_offset, local_bitset) {
1428  lvid_type lvid = lvid_block_start + lvid_block_offset;
1429  if (lvid >= graph.num_local_vertices()) break;
1430  // if the vertex is not local and has a message send the
1431  // message and clear the bit
1432  if(!graph.l_is_master(lvid)) {
1433  sync_message(lvid, thread_id);
1434  has_message.clear_bit(lvid);
1435  // clear the message to save memory
1436  messages[lvid] = message_type();
1437  }
1438  if(++vcount % TRY_RECV_MOD == 0) recv_messages(TRY_TO_RECV);
1439  }
1440  } // end of loop over vertices to send messages
1441  message_exchange.partial_flush(thread_id);
1442  // Finish sending and receiving all messages
1443  thread_barrier.wait();
1444  if(thread_id == 0) message_exchange.flush();
1445  thread_barrier.wait();
1446  recv_messages();
1447  } // end of exchange_messages
1448 
1449 
1450 
1451  template<typename VertexProgram>
1452  void synchronous_engine<VertexProgram>::
1453  receive_messages(const size_t thread_id) {
1454  context_type context(*this, graph);
1455  const bool TRY_TO_RECV = true;
1456  const size_t TRY_RECV_MOD = 100;
1457  size_t vcount = 0;
1458  size_t nactive_inc = 0;
1459  fixed_dense_bitset<sizeof(size_t)> local_bitset;
1460  while (1) {
1461  // increment by a word at a time
1462  lvid_type lvid_block_start =
1463  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1464  if (lvid_block_start >= graph.num_local_vertices()) break;
1465  // get the bit field from has_message
1466  size_t lvid_bit_block = has_message.containing_word(lvid_block_start);
1467  if (lvid_bit_block == 0) continue;
1468  // initialize a word sized bitfield
1469  local_bitset.clear();
1470  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1471 
1472  foreach(size_t lvid_block_offset, local_bitset) {
1473  lvid_type lvid = lvid_block_start + lvid_block_offset;
1474  if (lvid >= graph.num_local_vertices()) break;
1475 
1476  // if this is the master of lvid and we have a message
1477  if(graph.l_is_master(lvid)) {
1478  // The vertex becomes active for this superstep
1479  active_superstep.set_bit(lvid);
1480  ++nactive_inc;
1481  // Pass the message to the vertex program
1482  vertex_type vertex = vertex_type(graph.l_vertex(lvid));
1483  vertex_programs[lvid].init(context, vertex, messages[lvid]);
1484  // clear the message to save memory
1485  messages[lvid] = message_type();
1486  if (sched_allv) continue;
1487  // Determine if the gather should be run
1488  const vertex_program_type& const_vprog = vertex_programs[lvid];
1489  const vertex_type const_vertex = vertex;
1490  if(const_vprog.gather_edges(context, const_vertex) !=
1492  active_minorstep.set_bit(lvid);
1493  sync_vertex_program(lvid, thread_id);
1494  }
1495  }
1496  if(++vcount % TRY_RECV_MOD == 0) recv_vertex_programs(TRY_TO_RECV);
1497  }
1498  }
1499 
1500  num_active_vertices += nactive_inc;
1501  vprog_exchange.partial_flush(thread_id);
1502  // Flush the buffer and finish receiving any remaining vertex
1503  // programs.
1504  thread_barrier.wait();
1505  if(thread_id == 0) {
1506  vprog_exchange.flush();
1507  }
1508  thread_barrier.wait();
1509 
1510  recv_vertex_programs();
1511 
1512  } // end of receive messages
1513 
1514 
1515  template<typename VertexProgram>
1516  void synchronous_engine<VertexProgram>::
1517  execute_gathers(const size_t thread_id) {
1518  context_type context(*this, graph);
1519  const bool TRY_TO_RECV = true;
1520  const size_t TRY_RECV_MOD = 1000;
1521  size_t vcount = 0;
1522  const bool caching_enabled = !gather_cache.empty();
1523  // for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
1524  // lvid += threads.size()) {
1525  timer ti;
1526 
1527  fixed_dense_bitset<sizeof(size_t)> local_bitset;
1528  while (1) {
1529  // increment by a word at a time
1530  lvid_type lvid_block_start =
1531  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1532  if (lvid_block_start >= graph.num_local_vertices()) break;
1533  // get the bit field from has_message
1534  size_t lvid_bit_block = active_minorstep.containing_word(lvid_block_start);
1535  if (lvid_bit_block == 0) continue;
1536  // initialize a word sized bitfield
1537  local_bitset.clear();
1538  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1539 
1540  foreach(size_t lvid_block_offset, local_bitset) {
1541  lvid_type lvid = lvid_block_start + lvid_block_offset;
1542  if (lvid >= graph.num_local_vertices()) break;
1543 
1544  bool accum_is_set = false;
1545  gather_type accum = gather_type();
1546  // if caching is enabled and we have a cache entry then use
1547  // that as the accum
1548  if( caching_enabled && has_cache.get(lvid) ) {
1549  accum = gather_cache[lvid];
1550  accum_is_set = true;
1551  } else {
1552  // recompute the local contribution to the gather
1553  const vertex_program_type& vprog = vertex_programs[lvid];
1554  local_vertex_type local_vertex = graph.l_vertex(lvid);
1555  const vertex_type vertex(local_vertex);
1556  const edge_dir_type gather_dir = vprog.gather_edges(context, vertex);
1557  // Loop over in edges
1558  size_t edges_touched = 0;
1559  vprog.pre_local_gather(accum);
1560  if(gather_dir == IN_EDGES || gather_dir == ALL_EDGES) {
1561  foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1562  edge_type edge(local_edge);
1563  // elocks[local_edge.id()].lock();
1564  if(accum_is_set) { // \todo hint likely
1565  accum += vprog.gather(context, vertex, edge);
1566  } else {
1567  accum = vprog.gather(context, vertex, edge);
1568  accum_is_set = true;
1569  }
1570  ++edges_touched;
1571  // elocks[local_edge.id()].unlock();
1572  }
1573  } // end of if in_edges/all_edges
1574  // Loop over out edges
1575  if(gather_dir == OUT_EDGES || gather_dir == ALL_EDGES) {
1576  foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1577  edge_type edge(local_edge);
1578  // elocks[local_edge.id()].lock();
1579  if(accum_is_set) { // \todo hint likely
1580  accum += vprog.gather(context, vertex, edge);
1581  } else {
1582  accum = vprog.gather(context, vertex, edge);
1583  accum_is_set = true;
1584  }
1585  // elocks[local_edge.id()].unlock();
1586  ++edges_touched;
1587  }
1588  INCREMENT_EVENT(EVENT_GATHERS, edges_touched);
1589  } // end of if out_edges/all_edges
1590  vprog.post_local_gather(accum);
1591  // If caching is enabled then save the accumulator to the
1592  // cache for future iterations. Note that it is possible
1593  // that the accumulator was never set in which case we are
1594  // effectively "zeroing out" the cache.
1595  if(caching_enabled && accum_is_set) {
1596  gather_cache[lvid] = accum; has_cache.set_bit(lvid);
1597  } // end of if caching enabled
1598  }
1599  // If the accum contains a value for the local gather we put
1600  // that estimate in the gather exchange.
1601  if(accum_is_set) sync_gather(lvid, accum, thread_id);
1602  if(!graph.l_is_master(lvid)) {
1603  // if this is not the master clear the vertex program
1604  vertex_programs[lvid] = vertex_program_type();
1605  }
1606 
1607  // try to recv gathers if there are any in the buffer
1608  if(++vcount % TRY_RECV_MOD == 0) recv_gathers(TRY_TO_RECV);
1609  }
1610  } // end of loop over vertices to compute gather accumulators
1611  per_thread_compute_time[thread_id] += ti.current_time();
1612  gather_exchange.partial_flush(thread_id);
1613  // Finish sending and receiving all gather operations
1614  thread_barrier.wait();
1615  if(thread_id == 0) gather_exchange.flush();
1616  thread_barrier.wait();
1617  recv_gathers();
1618  } // end of execute_gathers
1619 
1620 
1621  template<typename VertexProgram>
1622  void synchronous_engine<VertexProgram>::
1623  execute_applys(const size_t thread_id) {
1624  context_type context(*this, graph);
1625  const bool TRY_TO_RECV = true;
1626  const size_t TRY_RECV_MOD = 1000;
1627  size_t vcount = 0;
1628  //for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
1629  // lvid += threads.size()) {
1630  timer ti;
1631 
1632  fixed_dense_bitset<sizeof(size_t)> local_bitset;
1633  while (1) {
1634  // increment by a word at a time
1635  lvid_type lvid_block_start =
1636  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1637  if (lvid_block_start >= graph.num_local_vertices()) break;
1638  // get the bit field from has_message
1639  size_t lvid_bit_block = active_superstep.containing_word(lvid_block_start);
1640  if (lvid_bit_block == 0) continue;
1641  // initialize a word sized bitfield
1642  local_bitset.clear();
1643  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1644  foreach(size_t lvid_block_offset, local_bitset) {
1645  lvid_type lvid = lvid_block_start + lvid_block_offset;
1646  if (lvid >= graph.num_local_vertices()) break;
1647 
1648  // Only master vertices can be active in a super-step
1649  ASSERT_TRUE(graph.l_is_master(lvid));
1650  vertex_type vertex(graph.l_vertex(lvid));
1651  // Get the local accumulator. Note that it is possible that
1652  // the gather_accum was not set during the gather.
1653  const gather_type& accum = gather_accum[lvid];
1654  INCREMENT_EVENT(EVENT_APPLIES, 1);
1655  vertex_programs[lvid].apply(context, vertex, accum);
1656  // record an apply as a completed task
1657  ++completed_applys;
1658  // Clear the accumulator to save some memory
1659  gather_accum[lvid] = gather_type();
1660  // synchronize the changed vertex data with all mirrors
1661  sync_vertex_data(lvid, thread_id);
1662  // determine if a scatter operation is needed
1663  const vertex_program_type& const_vprog = vertex_programs[lvid];
1664  const vertex_type const_vertex = vertex;
1665  if(const_vprog.scatter_edges(context, const_vertex) !=
1667  active_minorstep.set_bit(lvid);
1668  sync_vertex_program(lvid, thread_id);
1669  } else { // we are done so clear the vertex program
1670  vertex_programs[lvid] = vertex_program_type();
1671  }
1672  // try to receive vertex data
1673  if(++vcount % TRY_RECV_MOD == 0) {
1674  recv_vertex_programs(TRY_TO_RECV);
1675  recv_vertex_data(TRY_TO_RECV);
1676  }
1677  }
1678  } // end of loop over vertices to run apply
1679 
1680  per_thread_compute_time[thread_id] += ti.current_time();
1681  vprog_exchange.partial_flush(thread_id);
1682  vdata_exchange.partial_flush(thread_id);
1683  // Finish sending and receiving all changes due to apply operations
1684  thread_barrier.wait();
1685  if(thread_id == 0) { vprog_exchange.flush(); vdata_exchange.flush(); }
1686  thread_barrier.wait();
1687  recv_vertex_programs();
1688  recv_vertex_data();
1689 
1690  } // end of execute_applys
1691 
1692 
1693 
1694 
1695  template<typename VertexProgram>
1696  void synchronous_engine<VertexProgram>::
1697  execute_scatters(const size_t thread_id) {
1698  context_type context(*this, graph);
1699  // for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
1700  // lvid += threads.size()) {
1701  timer ti;
1702  fixed_dense_bitset<sizeof(size_t)> local_bitset;
1703  while (1) {
1704  // increment by a word at a time
1705  lvid_type lvid_block_start =
1706  shared_lvid_counter.inc_ret_last(8 * sizeof(size_t));
1707  if (lvid_block_start >= graph.num_local_vertices()) break;
1708  // get the bit field from has_message
1709  size_t lvid_bit_block = active_minorstep.containing_word(lvid_block_start);
1710  if (lvid_bit_block == 0) continue;
1711  // initialize a word sized bitfield
1712  local_bitset.clear();
1713  local_bitset.initialize_from_mem(&lvid_bit_block, sizeof(size_t));
1714  foreach(size_t lvid_block_offset, local_bitset) {
1715  lvid_type lvid = lvid_block_start + lvid_block_offset;
1716  if (lvid >= graph.num_local_vertices()) break;
1717 
1718  const vertex_program_type& vprog = vertex_programs[lvid];
1719  local_vertex_type local_vertex = graph.l_vertex(lvid);
1720  const vertex_type vertex(local_vertex);
1721  const edge_dir_type scatter_dir = vprog.scatter_edges(context, vertex);
1722  size_t edges_touched = 0;
1723  // Loop over in edges
1724  if(scatter_dir == IN_EDGES || scatter_dir == ALL_EDGES) {
1725  foreach(local_edge_type local_edge, local_vertex.in_edges()) {
1726  edge_type edge(local_edge);
1727  // elocks[local_edge.id()].lock();
1728  vprog.scatter(context, vertex, edge);
1729  // elocks[local_edge.id()].unlock();
1730  }
1731  ++edges_touched;
1732  } // end of if in_edges/all_edges
1733  // Loop over out edges
1734  if(scatter_dir == OUT_EDGES || scatter_dir == ALL_EDGES) {
1735  foreach(local_edge_type local_edge, local_vertex.out_edges()) {
1736  edge_type edge(local_edge);
1737  // elocks[local_edge.id()].lock();
1738  vprog.scatter(context, vertex, edge);
1739  // elocks[local_edge.id()].unlock();
1740  }
1741  ++edges_touched;
1742  } // end of if out_edges/all_edges
1743  INCREMENT_EVENT(EVENT_SCATTERS, edges_touched);
1744  // Clear the vertex program
1745  vertex_programs[lvid] = vertex_program_type();
1746  } // end of if active on this minor step
1747  } // end of loop over vertices to complete scatter operation
1748 
1749  per_thread_compute_time[thread_id] += ti.current_time();
1750  } // end of execute_scatters
1751 
1752 
1753 
1754  // Data Synchronization ===================================================
1755  template<typename VertexProgram>
1756  void synchronous_engine<VertexProgram>::
1757  sync_vertex_program(lvid_type lvid, const size_t thread_id) {
1758  ASSERT_TRUE(graph.l_is_master(lvid));
1759  const vertex_id_type vid = graph.global_vid(lvid);
1760  local_vertex_type vertex = graph.l_vertex(lvid);
1761  foreach(const procid_t& mirror, vertex.mirrors()) {
1762  vprog_exchange.send(mirror,
1763  std::make_pair(vid, vertex_programs[lvid]),
1764  thread_id);
1765  }
1766  } // end of sync_vertex_program
1767 
1768 
1769 
1770  template<typename VertexProgram>
1771  void synchronous_engine<VertexProgram>::
1772  recv_vertex_programs(const bool try_to_recv) {
1773  procid_t procid(-1);
1774  typename vprog_exchange_type::buffer_type buffer;
1775  while(vprog_exchange.recv(procid, buffer, try_to_recv)) {
1776  foreach(const vid_prog_pair_type& pair, buffer) {
1777  const lvid_type lvid = graph.local_vid(pair.first);
1778  // ASSERT_FALSE(graph.l_is_master(lvid));
1779  vertex_programs[lvid] = pair.second;
1780  active_minorstep.set_bit(lvid);
1781  }
1782  }
1783  } // end of recv vertex programs
1784 
1785 
1786  template<typename VertexProgram>
1787  void synchronous_engine<VertexProgram>::
1788  sync_vertex_data(lvid_type lvid, const size_t thread_id) {
1789  ASSERT_TRUE(graph.l_is_master(lvid));
1790  const vertex_id_type vid = graph.global_vid(lvid);
1791  local_vertex_type vertex = graph.l_vertex(lvid);
1792  foreach(const procid_t& mirror, vertex.mirrors()) {
1793  vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()), thread_id);
1794  }
1795  } // end of sync_vertex_data
1796 
1797 
1798 
1799 
1800 
1801  template<typename VertexProgram>
1802  void synchronous_engine<VertexProgram>::
1803  recv_vertex_data(bool try_to_recv) {
1804  procid_t procid(-1);
1805  typename vdata_exchange_type::buffer_type buffer;
1806  while(vdata_exchange.recv(procid, buffer, try_to_recv)) {
1807  foreach(const vid_vdata_pair_type& pair, buffer) {
1808  const lvid_type lvid = graph.local_vid(pair.first);
1809  ASSERT_FALSE(graph.l_is_master(lvid));
1810  graph.l_vertex(lvid).data() = pair.second;
1811  }
1812  }
1813  } // end of recv vertex data
1814 
1815 
1816  template<typename VertexProgram>
1817  void synchronous_engine<VertexProgram>::
1818  sync_gather(lvid_type lvid, const gather_type& accum, const size_t thread_id) {
1819  if(graph.l_is_master(lvid)) {
1820  vlocks[lvid].lock();
1821  if(has_gather_accum.get(lvid)) {
1822  gather_accum[lvid] += accum;
1823  } else {
1824  gather_accum[lvid] = accum;
1825  has_gather_accum.set_bit(lvid);
1826  }
1827  vlocks[lvid].unlock();
1828  } else {
1829  const procid_t master = graph.l_master(lvid);
1830  const vertex_id_type vid = graph.global_vid(lvid);
1831  gather_exchange.send(master, std::make_pair(vid, accum), thread_id);
1832  }
1833  } // end of sync_gather
1834 
1835  template<typename VertexProgram>
1836  void synchronous_engine<VertexProgram>::
1837  recv_gathers(const bool try_to_recv) {
1838  procid_t procid(-1);
1839  typename gather_exchange_type::buffer_type buffer;
1840  while(gather_exchange.recv(procid, buffer, try_to_recv)) {
1841  foreach(const vid_gather_pair_type& pair, buffer) {
1842  const lvid_type lvid = graph.local_vid(pair.first);
1843  const gather_type& accum = pair.second;
1844  ASSERT_TRUE(graph.l_is_master(lvid));
1845  vlocks[lvid].lock();
1846  if( has_gather_accum.get(lvid) ) {
1847  gather_accum[lvid] += accum;
1848  } else {
1849  gather_accum[lvid] = accum;
1850  has_gather_accum.set_bit(lvid);
1851  }
1852  vlocks[lvid].unlock();
1853  }
1854  }
1855  } // end of recv_gather
1856 
1857 
1858  template<typename VertexProgram>
1859  void synchronous_engine<VertexProgram>::
1860  sync_message(lvid_type lvid, const size_t thread_id) {
1861  ASSERT_FALSE(graph.l_is_master(lvid));
1862  const procid_t master = graph.l_master(lvid);
1863  const vertex_id_type vid = graph.global_vid(lvid);
1864  message_exchange.send(master, std::make_pair(vid, messages[lvid]), thread_id);
1865  } // end of send_message
1866 
1867 
1868 
1869 
1870  template<typename VertexProgram>
1871  void synchronous_engine<VertexProgram>::
1872  recv_messages(const bool try_to_recv) {
1873  procid_t procid(-1);
1874  typename message_exchange_type::buffer_type buffer;
1875  while(message_exchange.recv(procid, buffer, try_to_recv)) {
1876  foreach(const vid_message_pair_type& pair, buffer) {
1877  const lvid_type lvid = graph.local_vid(pair.first);
1878  ASSERT_TRUE(graph.l_is_master(lvid));
1879  vlocks[lvid].lock();
1880  if( has_message.get(lvid) ) {
1881  messages[lvid] += pair.second;
1882  } else {
1883  messages[lvid] = pair.second;
1884  has_message.set_bit(lvid);
1885  }
1886  vlocks[lvid].unlock();
1887  }
1888  }
1889  } // end of recv_messages
1890 
1891 
1892 
1893 
1894 
1895 
1896 
1897 
1898 
1899 
1900 
1901 }; // namespace
1902 
1903 
1904 #include <graphlab/macros_undef.hpp>
1905 
1906 #endif
1907