GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
async_consistent_engine.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 
26 #ifndef GRAPHLAB_ASYNC_CONSISTENT_ENGINE
27 #define GRAPHLAB_ASYNC_CONSISTENT_ENGINE
28 
29 #include <deque>
30 #include <boost/bind.hpp>
31 
32 #include <graphlab/scheduler/ischeduler.hpp>
33 #include <graphlab/scheduler/scheduler_factory.hpp>
34 #include <graphlab/vertex_program/ivertex_program.hpp>
35 #include <graphlab/vertex_program/icontext.hpp>
36 #include <graphlab/vertex_program/context.hpp>
37 #include <graphlab/engine/iengine.hpp>
38 #include <graphlab/engine/execution_status.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
40 #include <graphlab/rpc/dc_dist_object.hpp>
41 #include <graphlab/engine/distributed_chandy_misra.hpp>
42 
43 #include <graphlab/util/tracepoint.hpp>
44 #include <graphlab/util/memory_info.hpp>
45 #include <graphlab/rpc/distributed_event_log.hpp>
46 #include <graphlab/rpc/async_consensus.hpp>
47 #include <graphlab/engine/fake_chandy_misra.hpp>
48 #include <graphlab/aggregation/distributed_aggregator.hpp>
49 
50 #include <graphlab/macros_def.hpp>
51 
52 
53 #define ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
54 
55 namespace graphlab {
56 
57 
58  /**
59  * \ingroup engines
60  *
61  * \brief The asynchronous consistent engine executed vertex programs
62  * asynchronously but ensures mutual exclusion such that adjacent vertices
63  * are never executed simultaneously. Mutual exclusion can be weakened
64  * to "factorized" consistency in which case only individual gathers/applys/
65  * scatters are guaranteed to be consistent.
66  *
67  * \tparam VertexProgram
68  * The user defined vertex program type which should implement the
69  * \ref graphlab::ivertex_program interface.
70  *
71  * ### Execution Semantics
72  *
73  * On start() the \ref graphlab::ivertex_program::init function is invoked
74  * on all vertex programs in parallel to initialize the vertex program,
75  * vertex data, and possibly signal vertices.
76  *
77  * After which, the engine spawns a collection of threads where each thread
78  * individually performs the following tasks:
79  * \li Extract a message from the scheduler.
80  * \li Perform distributed lock acquisition on the vertex which is supposed
81  * to receive the message. The lock system enforces that no neighboring
82  * vertex is executing at the same time. The implementation is based
83  * on the Chandy-Misra solution to the dining philosophers problem.
84  * (Chandy, K.M.; Misra, J. (1984). The Drinking Philosophers Problem.
85  * ACM Trans. Program. Lang. Syst)
86  * \li Once lock acquisition is complete,
87  * \ref graphlab::ivertex_program::init is called on the vertex
88  * program. As an optimization, any messages sent to this vertex
89  * before completion of lock acquisition is merged into original message
90  * extracted from the scheduler.
91  * \li Execute the gather on the vertex program by invoking
92  * the user defined \ref graphlab::ivertex_program::gather function
93  * on the edge direction returned by the
94  * \ref graphlab::ivertex_program::gather_edges function. The gather
95  * functions can modify edge data but cannot modify the vertex
96  * program or vertex data and can be executed on multiple
97  * edges in parallel.
98  * * \li Execute the apply function on the vertex-program by
99  * invoking the user defined \ref graphlab::ivertex_program::apply
100  * function passing the sum of the gather functions. If \ref
101  * graphlab::ivertex_program::gather_edges returns no edges then
102  * the default gather value is passed to apply. The apply function
103  * can modify the vertex program and vertex data.
104  * \li Execute the scatter on the vertex program by invoking
105  * the user defined \ref graphlab::ivertex_program::scatter function
106  * on the edge direction returned by the
107  * \ref graphlab::ivertex_program::scatter_edges function. The scatter
108  * functions can modify edge data but cannot modify the vertex
109  * program or vertex data and can be executed on multiple
110  * edges in parallel.
111  * \li Release all locks acquired in the lock acquisition stage,
112  * and repeat until the scheduler is empty.
113  *
114  * The engine threads multiplexes the above procedure through a secondary
115  * internal queue, allowing an arbitrary large number of vertices to
116  * begin processing at the same time.
117  *
118  * ### Construction
119  *
120  * The asynchronous consistent engine is constructed by passing in a
121  * \ref graphlab::distributed_control object which manages coordination
122  * between engine threads and a \ref graphlab::distributed_graph object
123  * which is the graph on which the engine should be run. The graph should
124  * already be populated and cannot change after the engine is constructed.
125  * In the distributed setting all program instances (running on each machine)
126  * should construct an instance of the engine at the same time.
127  *
128  * Computation is initiated by signaling vertices using either
129  * \ref graphlab::async_consistent_engine::signal or
130  * \ref graphlab::async_consistent_engine::signal_all. In either case all
131  * machines should invoke signal or signal all at the same time. Finally,
132  * computation is initiated by calling the
133  * \ref graphlab::async_consistent_engine::start function.
134  *
135  * ### Example Usage
136  *
137  * The following is a simple example demonstrating how to use the engine:
138  * \code
139  * #include <graphlab.hpp>
140  *
141  * struct vertex_data {
142  * // code
143  * };
144  * struct edge_data {
145  * // code
146  * };
147  * typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;
148  * typedef float gather_type;
149  * struct pagerank_vprog :
150  * public graphlab::ivertex_program<graph_type, gather_type> {
151  * // code
152  * };
153  *
154  * int main(int argc, char** argv) {
155  * // Initialize control plain using mpi
156  * graphlab::mpi_tools::init(argc, argv);
157  * graphlab::distributed_control dc;
158  * // Parse command line options
159  * graphlab::command_line_options clopts("PageRank algorithm.");
160  * std::string graph_dir;
161  * clopts.attach_option("graph", &graph_dir, graph_dir,
162  * "The graph file.");
163  * if(!clopts.parse(argc, argv)) {
164  * std::cout << "Error in parsing arguments." << std::endl;
165  * return EXIT_FAILURE;
166  * }
167  * graph_type graph(dc, clopts);
168  * graph.load_structure(graph_dir, "tsv");
169  * graph.finalize();
170  * std::cout << "#vertices: " << graph.num_vertices()
171  * << " #edges:" << graph.num_edges() << std::endl;
172  * graphlab::async_consistent_engine<pagerank_vprog> engine(dc, graph, clopts);
173  * engine.signal_all();
174  * engine.start();
175  * std::cout << "Runtime: " << engine.elapsed_time();
176  * graphlab::mpi_tools::finalize();
177  * }
178  * \endcode
179  *
180  * \see graphlab::omni_engine
181  * \see graphlab::synchronous_engine
182  *
183  * <a name=engineopts>Engine Options</a>
184  * =========================
185  * The asynchronous engine supports several engine options which can
186  * be set as command line arguments using \c --engine_opts :
187  *
188  * \li \b max_clean_fraction (default: 0.2)
189  * The maximum proportion of edges which can be locked at any one time.
190  * (This is a simplification of the actual clean/dirty concept in the Chandy
191  * Misra locking algorithm used in this implementation.
192  * \li \b timeout (default: infinity) Maximum time in seconds the engine will
193  * run for. The actual runtime may be marginally greater as the engine
194  * waits for all threads and processes to flush all active tasks before
195  * returning.
196  * \li \b factorized (default: true) Set to true to weaken the consistency
197  * model to factorized consistency where only individual gather/apply/scatter
198  * calls are guaranteed to be locally consistent. Can produce massive
199  * increases in throughput at a consistency penalty.
200  * \li \b use_cache: (default: false) This is used to enable
201  * caching. When caching is enabled the gather phase is skipped for
202  * vertices that already have a cached value. To use caching the
203  * vertex program must either clear (\ref icontext::clear_gather_cache)
204  * or update (\ref icontext::post_delta) the cache values of
205  * neighboring vertices during the scatter phase.
206  */
207  template<typename VertexProgram>
208  class async_consistent_engine: public iengine<VertexProgram> {
209 
210  public:
211  /**
212  * \brief The user defined vertex program type. Equivalent to the
213  * VertexProgram template argument.
214  *
215  * The user defined vertex program type which should implement the
216  * \ref graphlab::ivertex_program interface.
217  */
218  typedef VertexProgram vertex_program_type;
219 
220  /**
221  * \brief The user defined type returned by the gather function.
222  *
223  * The gather type is defined in the \ref graphlab::ivertex_program
224  * interface and is the value returned by the
225  * \ref graphlab::ivertex_program::gather function. The
226  * gather type must have an <code>operator+=(const gather_type&
227  * other)</code> function and must be \ref sec_serializable.
228  */
229  typedef typename VertexProgram::gather_type gather_type;
230 
231  /**
232  * \brief The user defined message type used to signal neighboring
233  * vertex programs.
234  *
235  * The message type is defined in the \ref graphlab::ivertex_program
236  * interface and used in the call to \ref graphlab::icontext::signal.
237  * The message type must have an
238  * <code>operator+=(const gather_type& other)</code> function and
239  * must be \ref sec_serializable.
240  */
241  typedef typename VertexProgram::message_type message_type;
242 
243  /**
244  * \brief The type of data associated with each vertex in the graph
245  *
246  * The vertex data type must be \ref sec_serializable.
247  */
248  typedef typename VertexProgram::vertex_data_type vertex_data_type;
249 
250  /**
251  * \brief The type of data associated with each edge in the graph
252  *
253  * The edge data type must be \ref sec_serializable.
254  */
255  typedef typename VertexProgram::edge_data_type edge_data_type;
256 
257  /**
258  * \brief The type of graph supported by this vertex program
259  *
260  * See graphlab::distributed_graph
261  */
262  typedef typename VertexProgram::graph_type graph_type;
263 
264  /**
265  * \brief The type used to represent a vertex in the graph.
266  * See \ref graphlab::distributed_graph::vertex_type for details
267  *
268  * The vertex type contains the function
269  * \ref graphlab::distributed_graph::vertex_type::data which
270  * returns a reference to the vertex data as well as other functions
271  * like \ref graphlab::distributed_graph::vertex_type::num_in_edges
272  * which returns the number of in edges.
273  *
274  */
276 
277  /**
278  * \brief The type used to represent an edge in the graph.
279  * See \ref graphlab::distributed_graph::edge_type for details.
280  *
281  * The edge type contains the function
282  * \ref graphlab::distributed_graph::edge_type::data which returns a
283  * reference to the edge data. In addition the edge type contains
284  * the function \ref graphlab::distributed_graph::edge_type::source and
285  * \ref graphlab::distributed_graph::edge_type::target.
286  *
287  */
289 
290  /**
291  * \brief The type of the callback interface passed by the engine to vertex
292  * programs. See \ref graphlab::icontext for details.
293  *
294  * The context callback is passed to the vertex program functions and is
295  * used to signal other vertices, get the current iteration, and access
296  * information about the engine.
297  */
299 
300  private:
301  /**
302  * \internal
303  * \brief A wrapper around the gather_type to automatically manage
304  * an "empty" state.
305  *
306  * \see conditional_addition_wrapper
307  */
308  typedef conditional_addition_wrapper<gather_type> conditional_gather_type;
309 
310  /// \internal \brief The base type of all schedulers
312 
313  /** \internal
314  * \brief The true type of the callback context interface which
315  * implements icontext. \see graphlab::icontext graphlab::context
316  */
318 
319  // context needs access to internal functions
320  friend class context<async_consistent_engine>;
321 
322  /// \internal \brief The type used to refer to vertices in the local graph
323  typedef typename graph_type::local_vertex_type local_vertex_type;
324  /// \internal \brief The type used to refer to edges in the local graph
325  typedef typename graph_type::local_edge_type local_edge_type;
326  /// \internal \brief The type used to refer to vertex IDs in the local graph
327  typedef typename graph_type::lvid_type lvid_type;
328 
329  /// \internal \brief The type of the current engine instantiation
331 
332  /**
333  * \internal
334  * \brief States in the vertex state machine
335  *
336  * On a master vertex, it transitions sequentially
337  * between the following 5 states
338  * \li \c NONE Nothing is going on and nothing is scheduled on this
339  * vertex. The NONE state transits into the LOCKING state when
340  * a message is popped from the scheduler destined for this vertex.
341  * The popped message is stored in
342  * <code>vertex_state[lvid].current_message</code> and a call to the lock
343  * implementation \ref graphlab::distributed_chandy_misra is made to
344  * acquire exclusive locks on this vertex ( master_broadcast_locking(),
345  * rpc_begin_locking() ) .
346  * \li \c LOCKING At this point, we have an active message
347  * stored in <code>vertex_state[lvid].current_message</code>, and we
348  * are waiting for lock acquisition to complete. When lock acquisition is
349  * complete, we will be signalled by
350  * \ref graphlab::distributed_chandy_misra via the lock callback
351  * lock_ready() . Once a vertex is in a locking state, if the vertex
352  * is ever signaled, the scheduler is bypassed and the signalled message
353  * is merged into <code>vertex_state[lvid].current_message</code>
354  * directly. When locks acquisition is complete, we broadcast a
355  * remote_call to all mirrors to begin gathering
356  * ( master_broadcast_gathering() ). The master vertex transits into
357  * the GATHERING state, while all mirrored vertices transit into the
358  * MIRROR_GATHERING state ( rpc_begin_gathering() ).
359  * \li \c GATHERING/MIRROR_GATHERING Upon entry into the GATHERING state,
360  * an internal task corresponding to this vertex is put on the intenal
361  * task queue. When the task is popped and executed, it will complete
362  * gathering on the local graph, and send the partial gathered result
363  * back to the master. If the vertex is in MIRROR_GATHERING, it is a
364  * mirror, and it transits back to NONE
365  * ( rpc_gather_complete(), decrement_gather_counter() ) .
366  * If the vertex is in GATHERING, it stays in GATHERING until all partial
367  * gathers are received. After which it transits into APPLYING.
368  * \li \c APPLYING Upon entry into the APPLYING state, an internal task
369  *corresponding to this veretx is put on the internal task queue. When
370  the task is popped and executed, the apply is performed on the master
371  vertex. It then immediately transits into SCATTERING
372  * \li \c SCATTERING No internal task is generated for scattering. On
373  * the master vertex, scattering is performed immediately after applying.
374  * In SCATTERING, a partial scatter is performed on the local (master)
375  * vertex, and all mirrors are called ( master_broadcast_scattering() )
376  * to transit into MIRROR_SCATTERING. A partial lock release request is
377  * also issued on the master's subgraph. The master vertex than
378  * transits to the NONE state.
379  * \li \c MIRROR_SCATTERING When a mirror enters the MIRROR_SCATTERING
380  * state ( rpc_begin_scattering() ) it inserts a task into the internal
381  * queue. When the task is popped and executed, it will perform
382  * a partial scatter on the local (mirror) vertex, then perform
383  * a partial lock release request on the mirror's subgraph.
384  * \li \c MIRROR_SCATTERING_AND_NEXT_LOCKING Due to the distributed
385  * nature, there is an unusual condition in which the master completes
386  * scattering a new task has been scheduled on the same vertex, but
387  * mirrors have not completed scattering yet. In other words, the master
388  * vertex is in the LOCKING state while some mirrors are still in
389  * MIRROR_SCATTERING. This state is unique on the mirror to handle
390  * this case. If a lock request is received while the vertex is still in
391  * MIRROR_SCATTERING, the vertex transitions into this state. This state
392  * is equivalent to MIRROR_SCATTERING except that after completion
393  * of the scatter, it performs the partial lock release, then
394  * immediately re-requests the locks required on the local subgraph.
395  */
396  enum vertex_execution_state {
397  NONE = 0,
398  LOCKING, // state on owner
399  GATHERING, // state on owner
400  APPLYING, // state on owner
401  SCATTERING, // state on owner
402  MIRROR_GATHERING, // state on mirror
403  MIRROR_SCATTERING, // state on mirror
404  MIRROR_SCATTERING_AND_NEXT_LOCKING, // state on mirror
405  MIRROR_SCATTERING_AND_NEXT_GATHERING, // state on mirror. Only used by factorized
406  }; // end of vertex execution state
407 
408 
409  /**
410  * \internal
411  * The state machine + additional state maintained for each
412  * vetex and mirrors.
413  */
414  struct vertex_state {
415  /**
416  * This stores the active instance of the vertex program
417  */
418  vertex_program_type vertex_program;
419 
420  /**
421  * Used only by factorized. Holds the next program to execute
422  */
423  vertex_program_type factorized_next;
424 
425  /**
426  * This stores the current active message being executed.
427  */
428  message_type current_message;
429  /**
430  * This is temporary storage for the partial gathers on each mirror,
431  * and the accumulated gather on each machine.
432  */
433  conditional_gather_type combined_gather;
434  /**
435  * This is used to count down the number of gathers completed.
436  * It starts off at #mirrors + 1, and is decremented everytime
437  * one partial gather is received. The decrement is performed
438  * by decrement_gather_counter() . When the counter hits 0,
439  * the master vertex transits into APPLYING
440  */
441  uint32_t apply_count_down;
442 
443  /** This condition happens when a vertex is scheduled (popped from
444  * the scheduler) while the vertex is still running. In which case
445  * the message is placed back in the scheduler (without scheduling it),
446  * and the hasnext flag is set. When the vertex finishes execution,
447  * the scheduler is signaled to schedule the blocked vertex.
448  * This may only be set on a master vertex
449  */
450  bool hasnext;
451 
452  /// A 1 byte lock protecting the vertex state
453  simple_spinlock slock;
454  simple_spinlock factorized_lock;
455  /// State of each vertex. \ref vertex_execution_state for details
456  /// on the meaning of each state
457  vertex_execution_state state;
458 
459  /// initializes the vertex state to default values.
460  vertex_state(): current_message(message_type()),
461  apply_count_down(0),
462  hasnext(false),
463  state(NONE) { }
464 
465  /// Acquires a lock on the vertex state
466  void lock() {
467  slock.lock();
468  }
469  /// releases a lock on the vertex state
470  void unlock() {
471  slock.unlock();
472  }
473 
474  /// Acquires a lock on the vertex data
475  inline void d_lock() {
476  factorized_lock.lock();
477  }
478 
479  /// Acquires a lock on the vertex data
480  inline bool d_trylock() {
481  return factorized_lock.try_lock();
482  }
483 
484  /// releases a lock on the vertex data
485  inline void d_unlock() {
486  factorized_lock.unlock();
487  }
488  }; // end of vertex_state
489 
490  /**
491  * \internal
492  * This stores the internal scheduler for each thread.
493  * The representation is a single deque of pending vertices
494  * which is continuually appended to.
495  * If the vertex ID in the deque is < #local vertices, it is a task on
496  * the vertex in question. The actual task will depend on the state
497  * the vertex is currently in.
498  * However, if the vertex ID is >= #local vertices, the task is an
499  * aggregation. The aggregation key can be found in
500  * aggregate_id_to_key[-id]
501  */
502  struct thread_local_data {
503  std::vector<mutex> lock;
504  atomic<size_t> npending;
505  std::vector<std::vector<lvid_type> > pending_vertices;
506  thread_local_data() : lock(4), npending(0),
507  pending_vertices(4) { }
508  void add_task_priority(lvid_type v) {
509  size_t lid = 1;
510  lock[lid].lock();
511  pending_vertices[lid].push_back(v);
512  lock[lid].unlock();
513  ++npending;
514  }
515  void add_task(lvid_type v) {
516  size_t lid = (v / 32) % 4;
517  lock[lid].lock();
518  pending_vertices[lid].push_back(v);
519  lock[lid].unlock();
520  ++npending;
521  }
522  bool get_task(std::vector<std::vector<lvid_type> > &v) {
523  // clear the input
524  v.resize(4);
525  size_t nv = 0;
526  for (size_t i = 0;i < 4; ++i) v[i].clear();
527  for (int i = 0; i < 4; ++i) {
528  lock[i].lock();
529  if (!pending_vertices[i].empty()) {
530  v[i].swap(pending_vertices[i]);
531  }
532  lock[i].unlock();
533  npending -= v[i].size();
534  nv += v[i].size();
535  }
536  return nv > 0;
537  }
538  }; // end of thread local data
539 
540  /// The RPC interface
541  dc_dist_object<async_consistent_engine<VertexProgram> > rmi;
542 
543  /// A reference to the active graph
544  graph_type& graph;
545 
546  /// A pointer to the lock implementation
547  chandy_misra_interface<graph_type>* cmlocks;
548 
549  /// Engine threads.
550  thread_group thrgroup;
551 
552  //! The scheduler
553  ischeduler_type* scheduler_ptr;
554 
555  /// vector of vertex states. vstate[lvid] contains the vertex state
556  /// for local VID lvid.
557  std::vector<vertex_state> vstate;
558  /// Used if cache is enabled. ("use_cache=true"). Contains the result
559  /// of partial gathers
560  std::vector<conditional_gather_type> cache;
561 
562  typedef typename iengine<VertexProgram>::aggregator_type aggregator_type;
563  aggregator_type aggregator;
564 
565  /// Number of engine threads
566  size_t ncpus;
567  /// set to true if engine is started
568  bool started;
569  /// A pointer to the distributed consensus object
570  async_consensus* consensus;
571 
572  /// Thread local store. Used to hold the internal queus
573  std::vector<thread_local_data> thrlocal;
574 
575  // Various counters.
576  atomic<uint64_t> joined_messages;
577  atomic<uint64_t> blocked_issues; // issued messages which either
578  // 1: cannot start and have to be
579  // reinjected into the
580  // scheduler.
581  // 2: issued but is combined into a
582  // message which is currently locking
583  atomic<uint64_t> issued_messages;
584  atomic<uint64_t> pending_updates;
585  atomic<uint64_t> programs_executed;
586  atomic<double> total_update_time;
587 
588  timer launch_timer;
589 
590  /// engine option. Maximum proportion of clean forks allowed
591  float max_clean_fraction;
592  /// Equals to #edges * max_clean_fraction
593  /// Maximum number of clean forks allowed
594  size_t max_clean_forks;
595 
596  /// Sets the maximum number of pending updates
597  size_t max_pending;
598 
599  /// Defaults to (-1), defines a timeout
600  size_t timed_termination;
601  /// engine option Sets to true if caching is enabled
602  bool use_cache;
603  /// engine option. Sets to true if factorized consistency is used
604  bool factorized_consistency;
605 
606  bool handler_intercept;
607 
608  bool disable_locks;
609 
610  bool endgame_mode;
611 
612  /// If True adds tracking for the task retire time
613  bool track_task_retire_time;
614 
615  std::vector<double> task_start_time;
616 
617  /// Time when engine is started
618  float engine_start_time;
619  /// True when a force stop is triggered (possibly via a timeout)
620  bool force_stop;
621  graphlab_options opts_copy; // local copy of options to pass to
622  // scheduler construction
623 
624  execution_status::status_enum termination_reason;
625 
626  DECLARE_TRACER(disteng_eval_sched_task);
627  DECLARE_TRACER(disteng_chandy_misra);
628  DECLARE_TRACER(disteng_init_gathering);
629  DECLARE_TRACER(disteng_init_scattering);
630  DECLARE_TRACER(disteng_evalfac);
631  DECLARE_TRACER(disteng_internal_task_queue);
632 
633  DECLARE_EVENT(EVENT_APPLIES);
634  DECLARE_EVENT(EVENT_GATHERS);
635  DECLARE_EVENT(EVENT_SCATTERS);
636  DECLARE_EVENT(EVENT_ACTIVE_CPUS);
637  DECLARE_EVENT(EVENT_ACTIVE_TASKS);
638 
639 
640  inline void ASSERT_I_AM_OWNER(const lvid_type lvid) const {
641  ASSERT_EQ(graph.l_get_vertex_record(lvid).owner, rmi.procid());
642  }
643  inline void ASSERT_I_AM_NOT_OWNER(const lvid_type lvid) const {
644  ASSERT_NE(graph.l_get_vertex_record(lvid).owner, rmi.procid());
645  }
646 
647 
648  public:
649 
650  /**
651  * Constructs an asynchronous consistent distributed engine.
652  * The number of threads to create are read from
653  * \ref graphlab_options::get_ncpus "opts.get_ncpus()". The scheduler to
654  * construct is read from
655  * \ref graphlab_options::get_scheduler_type() "opts.get_scheduler_type()".
656  * The default scheduler
657  * is the queued_fifo scheduler. For details on the scheduler types
658  * \see scheduler_types
659  *
660  * See the <a href=#engineopts> main class documentation</a> for the
661  * available engine options.
662  *
663  * \param dc Distributed controller to associate with
664  * \param graph The graph to schedule over. The graph must be fully
665  * constructed and finalized.
666  * \param opts A graphlab::graphlab_options object containing options and
667  * parameters for the scheduler and the engine.
668  */
670  graph_type& graph,
671  const graphlab_options& opts = graphlab_options()) :
672  rmi(dc, this), graph(graph), scheduler_ptr(NULL),
673  aggregator(dc, graph, new context_type(*this, graph)), started(false),
674  engine_start_time(timer::approx_time_seconds()), force_stop(false),
675  vdata_exchange(dc),thread_barrier(opts.get_ncpus()) {
676  rmi.barrier();
677 
678  // set default values
679  max_clean_fraction = 0.2;
680  max_clean_forks = (size_t)(-1);
681  max_pending = (size_t)(-1);
682  timed_termination = (size_t)(-1);
683  use_cache = false;
684  factorized_consistency = true;
685  handler_intercept = rmi.numprocs() > 1;
686  track_task_retire_time = false;
687  disable_locks = false;
688  termination_reason = execution_status::UNSET;
689  set_options(opts);
690 
691  INITIALIZE_TRACER(disteng_eval_sched_task,
692  "distributed_engine: Evaluate Scheduled Task");
693  INITIALIZE_TRACER(disteng_init_gathering,
694  "distributed_engine: Initialize Gather");
695  INITIALIZE_TRACER(disteng_init_scattering,
696  "distributed_engine: Initialize Scattering");
697  INITIALIZE_TRACER(disteng_evalfac,
698  "distributed_engine: Time in Factorized Update user code");
699  INITIALIZE_TRACER(disteng_internal_task_queue,
700  "distributed_engine: Time in Internal Task Queue");
701  INITIALIZE_TRACER(disteng_chandy_misra,
702  "distributed_engine: Time in Chandy Misra");
703 
704  INITIALIZE_EVENT_LOG(dc);
705  ADD_CUMULATIVE_EVENT(EVENT_APPLIES, "Applies", "Calls");
706  ADD_CUMULATIVE_EVENT(EVENT_GATHERS , "Gathers", "Calls");
707  ADD_CUMULATIVE_EVENT(EVENT_SCATTERS , "Scatters", "Calls");
708  ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_CPUS, "Active Threads", "Threads");
709  ADD_INSTANTANEOUS_EVENT(EVENT_ACTIVE_TASKS, "Active Tasks", "Tasks");
710 
711  initialize();
712  rmi.barrier();
713  }
714 
715  private:
716 
717  /**
718  * \internal
719  * Configures the engine with the provided options.
720  * The number of threads to create are read from
721  * opts::get_ncpus(). The scheduler to construct is read from
722  * graphlab_options::get_scheduler_type(). The default scheduler
723  * is the queued_fifo scheduler. For details on the scheduler types
724  * \see scheduler_types
725  */
726  void set_options(const graphlab_options& opts) {
727  rmi.barrier();
728  ncpus = opts.get_ncpus();
729  ASSERT_GT(ncpus, 0);
730  thread_barrier.resize_unsafe(opts.get_ncpus());
731  std::vector<std::string> keys = opts.get_engine_args().get_option_keys();
732  foreach(std::string opt, keys) {
733  if (opt == "max_clean_fraction") {
734  opts.get_engine_args().get_option("max_clean_fraction", max_clean_fraction);
735  if (rmi.procid() == 0)
736  logstream(LOG_EMPH) << "Engine Option: max_clean_fraction = "
737  << max_clean_fraction << std::endl;
738  max_clean_forks = graph.num_local_edges() * max_clean_fraction;
739  } else if (opt == "handler_intercept") {
740  opts.get_engine_args().get_option("handler_intercept", handler_intercept);
741  } else if (opt == "disable_locks") {
742  opts.get_engine_args().get_option("disable_locks", disable_locks);
743  if (rmi.procid() == 0)
744  logstream(LOG_EMPH) << "Engine Option: disable_locks = "
745  << disable_locks << std::endl;
746  } else if (opt == "max_pending") {
747  opts.get_engine_args().get_option("max_pending", max_pending);
748  if (rmi.procid() == 0)
749  logstream(LOG_EMPH) << "Engine Option: max_pending = "
750  << max_pending << std::endl;
751  } else if (opt == "timeout") {
752  opts.get_engine_args().get_option("timeout", timed_termination);
753  if (rmi.procid() == 0)
754  logstream(LOG_EMPH) << "Engine Option: timeout = "
755  << max_clean_fraction << std::endl;
756  } else if (opt == "use_cache") {
757  opts.get_engine_args().get_option("use_cache", use_cache);
758  if (rmi.procid() == 0)
759  logstream(LOG_EMPH) << "Engine Option: use_cache = "
760  << use_cache << std::endl;
761  } else if (opt == "factorized") {
762  opts.get_engine_args().get_option("factorized", factorized_consistency);
763  if (rmi.procid() == 0)
764  logstream(LOG_EMPH) << "Engine Option: factorized = "
765  << factorized_consistency << std::endl;
766  } else if (opt == "track_task_time") {
767  opts.get_engine_args().get_option("track_task_time", track_task_retire_time);
768  if (rmi.procid() == 0)
769  logstream(LOG_EMPH) << "Engine Option: track_task_time = "
770  << track_task_retire_time << std::endl;
771  } else {
772  logstream(LOG_FATAL) << "Unexpected Engine Option: " << opt << std::endl;
773  }
774  }
775  opts_copy = opts;
776  // set a default scheduler if none
777  if (opts_copy.get_scheduler_type() == "") {
778  opts_copy.set_scheduler_type("queued_fifo");
779  }
780  rmi.barrier();
781 
782  }
783 
784  /**
785  * \internal
786  * Initializes the engine with respect to the associated graph.
787  * This call will initialize all internal and scheduling datastructures.
788  * This function must be called prior to any signal function.
789  */
790  void initialize() {
791  // construct all the required datastructures
792  // deinitialize performs the reverse
793  graph.finalize();
794  if (rmi.procid() == 0) memory_info::print_usage("Before Engine Initialization");
795  logstream(LOG_INFO)
796  << rmi.procid() << ": Initializing..." << std::endl;
797 
798  // construct scheduler passing in the copy of the options from set_options
799  scheduler_ptr = scheduler_factory<message_type>::
800  new_scheduler(graph.num_local_vertices(),
801  opts_copy);
802 
803  // create initial fork arrangement based on the alternate vid mapping
804  if (factorized_consistency == false) {
805  cmlocks = new distributed_chandy_misra<graph_type>(rmi.dc(), graph,
806  boost::bind(&engine_type::lock_ready, this, _1),
807  boost::bind(&engine_type::forward_cached_schedule, this, _1));
808  }
809  else {
810  cmlocks = new fake_chandy_misra<graph_type>(rmi.dc(), graph,
811  boost::bind(&engine_type::lock_ready, this, _1),
812  boost::bind(&engine_type::forward_cached_schedule, this, _1));
813  }
814  // construct the vertex programs
815  vstate.resize(graph.num_local_vertices());
816 
817  // construct the termination consensus object
818  consensus = new async_consensus(rmi.dc(), ncpus);
819 
820  // if cache is enabled, allocate the cache
821  if (use_cache) cache.resize(graph.num_local_vertices());
822 
823  // finally, the thread local queues
824  thrlocal.resize(ncpus);
825  if (rmi.procid() == 0) memory_info::print_usage("After Engine Initialization");
826  rmi.barrier();
827  }
828 
829  public:
831  thrlocal.clear();
832  delete consensus;
833  vstate.clear();
834  delete cmlocks;
835  delete scheduler_ptr;
836  }
837 
838 
839 
840 
841  // documentation inherited from iengine
842  size_t num_updates() const {
843  return programs_executed.value;
844  }
845 
846 
847 
848 
849 
850  // documentation inherited from iengine
851  float elapsed_seconds() const {
852  return timer::approx_time_seconds() - engine_start_time;
853  }
854 
855 
856  /**
857  * \brief Not meaningful for the asynchronous engine. Returns -1.
858  */
859  int iteration() const { return -1; }
860 
861 
862 /**************************************************************************
863  * Signaling Interface *
864  **************************************************************************/
865 
866  private:
867 
868  void internal_post_delta(const vertex_type& vertex,
869  const gather_type& delta) {
870  // only post deltas if caching is enabled
871  if(use_cache) {
872  if (!factorized_consistency) vstate[vertex.local_id()].d_lock();
873  if (cache[vertex.local_id()].not_empty()) {
874  cache[vertex.local_id()] += delta;
875  }
876  if (!factorized_consistency) vstate[vertex.local_id()].d_unlock();
877  }
878  }
879 
880  void internal_clear_gather_cache(const vertex_type& vertex) {
881  // only post clear cache if caching is enabled
882  if (use_cache) {
883  if (!factorized_consistency) vstate[vertex.local_id()].d_lock();
884  if(use_cache) cache[vertex.local_id()].clear();
885  if (!factorized_consistency) vstate[vertex.local_id()].d_unlock();
886  }
887  }
888 
889 
890 
891  /**
892  * \internal
893  * This is used to receive a message forwarded from another machine
894  */
895  void rpc_signal(vertex_id_type vid,
896  const message_type& message) {
897  if (force_stop) return;
898  const lvid_type local_vid = graph.local_vid(vid);
899  BEGIN_TRACEPOINT(disteng_scheduler_task_queue);
900  bool direct_injection = false;
901  // if the vertex is still in the locking state
902  // it has not received the message yet.
903  // lets directly inject it into the vertex_state message cache
904  if (vstate[local_vid].state == LOCKING) {
905  vstate[local_vid].lock();
906  if (vstate[local_vid].state == LOCKING) {
907  vstate[local_vid].current_message += message;
908  direct_injection = true;
909  joined_messages.inc();
910  }
911  vstate[local_vid].unlock();
912  }
913  // if we cannot directly inject into the vertex, then we have no
914  // choice but to put the message into the scheduler
915  if (direct_injection == false) {
916  scheduler_ptr->schedule(local_vid, message);
917  }
918  END_TRACEPOINT(disteng_scheduler_task_queue);
919  consensus->cancel();
920  }
921 
922 
923  /**
924  * \internal
925  * This will inject a "placed" task back to be scheduled.
926  */
927  void signal_local_next(vertex_id_type local_vid) {
928  /* This happens when a currently running vertex is activated.
929  We cannot return the vertex back into the scheduler since that would
930  cause the scheduler to loop indefinitely around a task which
931  cannot run. Therefore, we "place" it back in the scheduler, without
932  scheduling it, and set a "hasnext" flag in the vertex_state.
933  Then when the vertex finishes execution, we must reschedule the vertex
934  in the scheduler */
935  if (force_stop) return;
937  local_vid);
938  consensus->cancel();
939  }
940 
941  /**
942  * \internal
943  * \brief Signals a vertex with an optional message
944  *
945  * Signals a vertex, and schedules it to be executed in the future.
946  * must be called on a vertex accessible by the current machine.
947  */
948  void internal_signal(const vertex_type& vtx,
949  const message_type& message = message_type()) {
950  if (force_stop) return;
951  if (started) {
952 
953  BEGIN_TRACEPOINT(disteng_scheduler_task_queue);
954  if (factorized_consistency) {
955  // fast signal. push to the remote machine immediately
956  const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(vtx.local_id());
957  const procid_t owner = rec.owner;
958  if (owner != rmi.procid()) {
959  const vertex_id_type vid = rec.gvid;
960  rmi.remote_call(owner, &engine_type::rpc_signal, vid, message);
961  }
962  else {
964  vtx.local_id(), message);
965  consensus->cancel();
966  }
967  }
968  else {
970  vtx.local_id(), message);
971  consensus->cancel();
972  }
973  END_TRACEPOINT(disteng_scheduler_task_queue);
974  }
975  else {
976  scheduler_ptr->schedule(vtx.local_id(), message);
977  consensus->cancel();
978  }
979  } // end of schedule
980 
981 
982  /**
983  * \internal
984  * \brief Signals a vertex with an optional message
985  *
986  * Signals a global vid, and schedules it to be executed in the future.
987  * If current machine does not contain the vertex, it is ignored.
988  */
989  void internal_signal_gvid(vertex_id_type gvid,
990  const message_type& message = message_type()) {
991  if (force_stop) return;
992  if (graph.is_master(gvid)) {
993  internal_signal(graph.vertex(gvid), message);
994  }
995  } // end of schedule
996 
997 
998  void internal_signal_broadcast(vertex_id_type gvid,
999  const message_type& message = message_type()) {
1000  for (size_t i = 0;i < rmi.numprocs(); ++i) {
1001  rmi.remote_call(i, &async_consistent_engine::internal_signal_gvid,
1002  gvid, message);
1003  }
1004  } // end of signal_broadcast
1005 
1006 
1007  void rpc_internal_stop() {
1008  force_stop = true;
1009  termination_reason = execution_status::FORCED_ABORT;
1010  }
1011 
1012  /**
1013  * \brief Force engine to terminate immediately.
1014  *
1015  * This function is used to stop the engine execution by forcing
1016  * immediate termination.
1017  */
1018  void internal_stop() {
1019  for (procid_t i = 0;i < rmi.numprocs(); ++i) {
1020  rmi.remote_call(i, &async_consistent_engine::rpc_internal_stop);
1021  }
1022  }
1023 
1024 
1025  public:
1026 
1027 
1028 
1029  void signal(vertex_id_type gvid,
1030  const message_type& message = message_type()) {
1031  rmi.barrier();
1032  internal_signal_gvid(gvid, message);
1033  rmi.barrier();
1034  }
1035 
1036 
1037  void signal_all(const message_type& message = message_type(),
1038  const std::string& order = "shuffle") {
1039  logstream(LOG_DEBUG) << rmi.procid() << ": Schedule All" << std::endl;
1040  // allocate a vector with all the local owned vertices
1041  // and schedule all of them.
1042  std::vector<vertex_id_type> vtxs;
1043  vtxs.reserve(graph.num_local_own_vertices());
1044  for(lvid_type lvid = 0;
1045  lvid < graph.get_local_graph().num_vertices();
1046  ++lvid) {
1047  if (graph.l_vertex(lvid).owner() == rmi.procid()) {
1048  vtxs.push_back(lvid);
1049  }
1050  }
1051 
1052  if(order == "shuffle") {
1053  graphlab::random::shuffle(vtxs.begin(), vtxs.end());
1054  }
1055  foreach(lvid_type lvid, vtxs) {
1056  scheduler_ptr->schedule(lvid, message);
1057  }
1058  rmi.barrier();
1059  } // end of schedule all
1060 
1061  void signal_vset(const vertex_set& vset,
1062  const message_type& message = message_type(),
1063  const std::string& order = "shuffle") {
1064  logstream(LOG_DEBUG) << rmi.procid() << ": Schedule All" << std::endl;
1065  // allocate a vector with all the local owned vertices
1066  // and schedule all of them.
1067  std::vector<vertex_id_type> vtxs;
1068  vtxs.reserve(graph.num_local_own_vertices());
1069  for(lvid_type lvid = 0;
1070  lvid < graph.get_local_graph().num_vertices();
1071  ++lvid) {
1072  if (graph.l_vertex(lvid).owner() == rmi.procid() &&
1073  vset.l_contains(lvid)) {
1074  vtxs.push_back(lvid);
1075  }
1076  }
1077 
1078  if(order == "shuffle") {
1079  graphlab::random::shuffle(vtxs.begin(), vtxs.end());
1080  }
1081  foreach(lvid_type lvid, vtxs) {
1082  scheduler_ptr->schedule(lvid, message);
1083  }
1084  rmi.barrier();
1085  }
1086 
1087 /**************************************************************************
1088  * Computation Processing *
1089  * Internal vertex program scheduling. The functions are arranged roughly *
1090  * in the order they are used: locking, gathering, applying, scattering *
1091  **************************************************************************/
1092 
1093  private:
1094 
1095 
1096  /**
1097  * \internal
1098  * This function is called after a message to vertex sched_lvid was
1099  * issued by the scheduler. The message must then be stored in the
1100  * vertex_state. Calling this function will begin requesting
1101  * locks for this vertex on all mirrors.
1102  */
1103  void master_broadcast_locking(lvid_type sched_lvid) {
1104  local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1105 
1106  logstream(LOG_DEBUG) << rmi.procid() << ": Broadcast Gathering: "
1107  << lvertex.global_id() << std::endl;
1108 // ASSERT_I_AM_OWNER(sched_lvid);
1109 
1110  const unsigned char prevkey =
1111  rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
1112  rmi.remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1113  &engine_type::rpc_begin_locking, lvertex.global_id());
1114  rmi.dc().set_sequentialization_key(prevkey);
1115  }
1116 
1117 
1118  /**
1119  * \internal
1120  * RPC call issued by a master vertex requesting for locks
1121  * on sched_vid. This switches the vertex to a LOCKING state.
1122  * Note that, due to communication latencies, it is possible that
1123  * sched_vid is currently in a SCATTERING phase. If that is the cae,
1124  * sched_vid is switched to the MIRROR_SCATTERING_AND_NEXT_LOCKING
1125  * state
1126  */
1127  void rpc_begin_locking(vertex_id_type sched_vid) {
1128  logstream(LOG_DEBUG) << rmi.procid() << ": Mirror Begin Locking: "
1129  << sched_vid << std::endl;
1130  // immediately begin issuing the lock requests
1131  vertex_id_type sched_lvid = graph.local_vid(sched_vid);
1132  // set the vertex state
1133  vstate[sched_lvid].lock();
1134  if (vstate[sched_lvid].state == NONE) {
1135  vstate[sched_lvid].state = LOCKING;
1136  cmlocks->make_philosopher_hungry_per_replica(sched_lvid);
1137  //add_internal_task(sched_lvid);
1138  }
1139  else if (vstate[sched_lvid].state == MIRROR_SCATTERING) {
1140  vstate[sched_lvid].state = MIRROR_SCATTERING_AND_NEXT_LOCKING;
1141  }
1142  vstate[sched_lvid].unlock();
1143  }
1144 
1145 
1146  /**
1147  * \internal
1148  * When all distributed locks are acquired, this function is called
1149  * from the chandy misra implementation on the master vertex.
1150  * Here, we perform initialization
1151  * of the task and switch the vertex to a gathering state
1152  */
1153  void lock_ready(lvid_type lvid) {
1154 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1155  if (!factorized_consistency) {
1156 #endif
1157  // locks for this vertex is ready. perform initialization and
1158  // and switch the vertex to the gathering state
1159  logstream(LOG_DEBUG) << "Lock ready on " << "L" << lvid << std::endl;
1160  vstate[lvid].lock();
1161  vstate[lvid].state = GATHERING;
1162  do_init_gather(lvid);
1163  vstate[lvid].unlock();
1164  master_broadcast_gathering(lvid, vstate[lvid].vertex_program);
1165 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1166  }
1167 #endif
1168  }
1169 
1170 
1171  /**
1172  * \internal
1173  * Broadcasts to all machines to begin gathering on sched_lvid.
1174  * The vertex program is transmitted as well
1175  */
1176  void master_broadcast_gathering(lvid_type sched_lvid,
1177  const vertex_program_type& prog) {
1178  local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1179  BEGIN_TRACEPOINT(disteng_init_gathering);
1180  logstream(LOG_DEBUG) << rmi.procid() << ": Broadcast Gathering: "
1181  << lvertex.global_id() << std::endl;
1182 // ASSERT_I_AM_OWNER(sched_lvid);
1183  // convert to local ID
1184 
1185  const unsigned char prevkey =
1186  rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
1187  rmi.remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1188  &engine_type::rpc_begin_gathering, lvertex.global_id() , prog);
1189  rmi.dc().set_sequentialization_key(prevkey);
1190  END_TRACEPOINT(disteng_init_gathering);
1191  add_internal_task(sched_lvid);
1192  }
1193 
1194 
1195  /**
1196  * \internal
1197  * Called remotely by master_broadcast_gathering.
1198  * This function stores the vertex program in the vertex
1199  * state and switches the vertex to a GATHERING state.
1200  */
1201  void rpc_begin_gathering(vertex_id_type sched_vid,
1202  const vertex_program_type& prog) {
1203  logstream(LOG_DEBUG) << rmi.procid() << ": Mirror Begin Gathering: "
1204  << sched_vid << std::endl;
1205 // ASSERT_NE(graph.get_vertex_record(sched_vid).owner, rmi.procid());
1206  // immediately begin issuing the lock requests
1207  vertex_id_type sched_lvid = graph.local_vid(sched_vid);
1208  // set the vertex state
1209  vstate[sched_lvid].lock();
1210 // ASSERT_EQ(vstate[sched_lvid].state, LOCKING);
1211  if (vstate[sched_lvid].state == MIRROR_SCATTERING) {
1212  vstate[sched_lvid].state = MIRROR_SCATTERING_AND_NEXT_GATHERING;
1213  vstate[sched_lvid].factorized_next = prog;
1214  }
1215  else {
1216  // ASSERT_EQ((int)vstate[sched_lvid].state, (int)NONE);
1217  vstate[sched_lvid].state = MIRROR_GATHERING;
1218  vstate[sched_lvid].vertex_program = prog;
1219  vstate[sched_lvid].combined_gather.clear();
1220  add_internal_task(sched_lvid);
1221  }
1222  vstate[sched_lvid].unlock();
1223  // lets go
1224  }
1225 
1226 
1227  /**
1228  * \internal
1229  * When a mirror/master finishes its part of a gather, this function
1230  * is called on the master with the gathered value.
1231  */
1232  void rpc_gather_complete(vertex_id_type vid,
1233  const conditional_gather_type& uf) {
1234  logstream(LOG_DEBUG) << rmi.procid() << ": Receiving Gather Complete of "
1235  << vid << std::endl;
1236  lvid_type lvid = graph.local_vid(vid);
1237  vstate[lvid].lock();
1238  vstate[lvid].combined_gather += uf;
1239  decrement_gather_counter(lvid, true /* from rpc */);
1240  vstate[lvid].unlock();
1241  }
1242 
1243 
1244  /**
1245  * \internal
1246  * when a machine finishes its part of the gather, it calls
1247  * decrement_gather_counter which will count the number of machines
1248  * which have completed the gather. When all machines have responded
1249  * this function switches the vertex to the APPLYING state.
1250  * Must be called with locks
1251  *
1252  * Return true if fast path possible. i.e. this was called locally
1253  * and the counter went to zero
1254  */
1255  bool decrement_gather_counter(const lvid_type lvid, bool from_rpc = false) {
1256  vstate[lvid].apply_count_down--;
1257  logstream(LOG_DEBUG) << rmi.procid() << ": Partial Gather Complete: "
1258  << graph.global_vid(lvid) << "(" << vstate[lvid].apply_count_down << ")" << std::endl;
1259  if (vstate[lvid].apply_count_down == 0) {
1260  logstream(LOG_DEBUG) << rmi.procid() << ": Gather Complete "
1261  << graph.global_vid(lvid) << std::endl;
1262  vstate[lvid].state = APPLYING;
1263  // if numprocs == 1, we fast path it. and fall through into the apply/scatter
1264  // immediately in the state machine
1265  if (from_rpc) {
1266  add_internal_task(lvid);
1267  return false;
1268  } else {
1269  return true;
1270  }
1271  }
1272  return false;
1273  }
1274 
1275  /**
1276  * \internal Performs the init operation on vertex lvid using the
1277  * message stored in the vertex_state. locks should be acquired.
1278  */
1279  void do_init_gather(lvid_type lvid) {
1280  context_type context(*this, graph);
1281  vstate[lvid].vertex_program.init(context,
1282  vertex_type(graph.l_vertex(lvid)),
1283  vstate[lvid].current_message);
1284  vstate[lvid].current_message = message_type();
1285  vstate[lvid].combined_gather.clear();
1286  }
1287 
1288  void factorized_lock_edge2_begin(lvid_type hold) {
1289  vstate[hold].d_lock();
1290  }
1291  void factorized_lock_edge2(lvid_type hold, lvid_type advance) {
1292  if(vstate[advance].d_trylock()) return;
1293  else vstate[hold].d_unlock();
1294  lvid_type a = std::min(hold, advance);
1295  lvid_type b = std::max(hold, advance);
1296  vstate[a].d_lock();
1297  vstate[b].d_lock();
1298  }
1299 
1300  void factorized_unlock_edge2(lvid_type hold,
1301  lvid_type advance) {
1302  vstate[advance].d_unlock();
1303  }
1304 
1305  void factorized_unlock_edge2_end(lvid_type hold) {
1306  vstate[hold].d_unlock();
1307  }
1308 
1309 
1310  void factorized_lock_edge(local_edge_type edge) {
1311  lvid_type src = edge.source().id();
1312  lvid_type target = edge.target().id();
1313  lvid_type a = std::min(src, target);
1314  lvid_type b = std::max(src, target);
1315  vstate[a].d_lock();
1316  vstate[b].d_lock();
1317  }
1318 
1319  void factorized_unlock_edge(local_edge_type edge) {
1320  vstate[edge.source().id()].d_unlock();
1321  vstate[edge.target().id()].d_unlock();
1322  }
1323  /**
1324  * \internal
1325  * Performs the gather operation on vertex lvid. Locks should be acquired.
1326  */
1327  void do_gather(lvid_type lvid) { // Do gather
1328  BEGIN_TRACEPOINT(disteng_evalfac);
1329  local_vertex_type lvertex(graph.l_vertex(lvid));
1330  vertex_type vertex(lvertex);
1331 
1332  conditional_gather_type* gather_target = NULL;
1333  bool gather_target_is_cache = false;
1334  if (use_cache) {
1335  vstate[lvid].d_lock();
1336  if (cache[lvid].not_empty()) {
1337  // there is something in the cache. Return that
1338  vstate[lvid].combined_gather += cache[lvid];
1339  vstate[lvid].d_unlock();
1340  return;
1341  }
1342  else {
1343  // there is nothing in the cache. Make the cache the
1344  // gather target, then later write back to the combined gather
1345  gather_target = &(cache[lvid]);
1346  gather_target_is_cache = true;
1347  vstate[lvid].d_unlock();
1348  }
1349  }
1350  else {
1351  // cache not enabled, gather directly to the combined gather
1352  gather_target = &(vstate[lvid].combined_gather);
1353  }
1354 
1355  context_type context(*this, graph);
1356  vstate[lvid].vertex_program.pre_local_gather(gather_target->value);
1357  edge_dir_type gatherdir = vstate[lvid].vertex_program.gather_edges(context, vertex);
1358 
1359  if(gatherdir == graphlab::IN_EDGES ||
1360  gatherdir == graphlab::ALL_EDGES) {
1361  if (!disable_locks) factorized_lock_edge2_begin(lvid);
1362  foreach(local_edge_type edge, lvertex.in_edges()) {
1363  if (!disable_locks && factorized_consistency) {
1364  factorized_lock_edge2(lvid, edge.source().id());
1365  }
1366  edge_type e(edge);
1367  (*gather_target) +=
1368  vstate[lvid].vertex_program.gather(context, vertex, e);
1369  if (factorized_consistency && !disable_locks) {
1370  factorized_unlock_edge2(lvid, edge.source().id());
1371  }
1372  }
1373  if (!disable_locks) factorized_unlock_edge2_end(lvid);
1374  INCREMENT_EVENT(EVENT_GATHERS, lvertex.num_in_edges());
1375  }
1376  if(gatherdir == graphlab::OUT_EDGES ||
1377  gatherdir == graphlab::ALL_EDGES) {
1378  if (!disable_locks) factorized_lock_edge2_begin(lvid);
1379  foreach(local_edge_type edge, lvertex.out_edges()) {
1380  if (!disable_locks && factorized_consistency) {
1381  factorized_lock_edge2(lvid, edge.target().id());
1382  }
1383  edge_type e(edge);
1384  (*gather_target) +=
1385  vstate[lvid].vertex_program.gather(context, vertex, e);
1386  if (factorized_consistency && !disable_locks) factorized_unlock_edge2(lvid, edge.target().id());
1387  }
1388  if (!disable_locks) factorized_unlock_edge2_end(lvid);
1389  INCREMENT_EVENT(EVENT_GATHERS, lvertex.num_out_edges());
1390  }
1391 
1392  vstate[lvid].vertex_program.post_local_gather(gather_target->value);
1393  if (use_cache && gather_target_is_cache) {
1394  vstate[lvid].d_lock();
1395  // this is the condition where the gather target is the cache
1396  // now I need to combine it to the combined gather
1397  vstate[lvid].combined_gather += cache[lvid];
1398  vstate[lvid].d_unlock();
1399  }
1400 
1401  END_TRACEPOINT(disteng_evalfac);
1402  }
1403 
1404  /**
1405  * \internal
1406  * Main function to perform gathers on vertex lvid.
1407  * This function performs do_gather() on vertex lvid.
1408  * The resultant gathered value is then sent back to the master
1409  * for combining.
1410  * Return true if fast_path. i.e. counter is zero, we avoid inserting
1411  * into the internal task queue
1412  */
1413  bool process_gather(lvid_type lvid) {
1414 
1415  const vertex_id_type vid = graph.global_vid(lvid);
1416  logstream(LOG_DEBUG) << rmi.procid() << ": Gathering on " << vid
1417  << std::endl;
1418  do_gather(lvid);
1419 
1420  const procid_t vowner = graph.l_get_vertex_record(lvid).owner;
1421  if (vowner == rmi.procid()) {
1422  return decrement_gather_counter(lvid);
1423  } else {
1424  vstate[lvid].state = MIRROR_SCATTERING;
1425  logstream(LOG_DEBUG) << rmi.procid() << ": Send Gather Complete of " << vid
1426  << " to " << vowner << std::endl;
1427 
1428  rmi.remote_call(vowner,
1429  &engine_type::rpc_gather_complete,
1430  graph.global_vid(lvid),
1431  vstate[lvid].combined_gather);
1432 
1433  vstate[lvid].combined_gather.clear();
1434  return false;
1435  }
1436  }
1437 
1438 
1439  /**
1440  * \internal
1441  * Performs the apply operation on vertex lvid using
1442  * the gathered values stored in the vertex_state.
1443  * Locks should be acquired.
1444  */
1445  void do_apply(lvid_type lvid) {
1446  BEGIN_TRACEPOINT(disteng_evalfac);
1447  context_type context(*this, graph);
1448 
1449  vertex_type vertex(graph.l_vertex(lvid));
1450 
1451  logstream(LOG_DEBUG) << rmi.procid() << ": Apply On " << vertex.id() << std::endl;
1452  vstate[lvid].d_lock();
1453  vstate[lvid].vertex_program.apply(context,
1454  vertex,
1455  vstate[lvid].combined_gather.value);
1456  vstate[lvid].d_unlock();
1457  vstate[lvid].combined_gather.clear();
1458 
1459  DECREMENT_EVENT(EVENT_ACTIVE_TASKS, 1);
1460  INCREMENT_EVENT(EVENT_APPLIES, 1);
1461  END_TRACEPOINT(disteng_evalfac);
1462  }
1463 
1464  /**
1465  * \internal
1466  * Similar to master_broadcast_gathering. Sends the updated vertex program
1467  * and vertex data to all mirrors. And requests all mirrors to begin
1468  * scattering.
1469  */
1470  void master_broadcast_scattering(lvid_type sched_lvid,
1471  const vertex_program_type& prog,
1472  const vertex_data_type &central_vdata) {
1473  BEGIN_TRACEPOINT(disteng_init_scattering);
1474  local_vertex_type lvertex(graph.l_vertex(sched_lvid));
1475  logstream(LOG_DEBUG) << rmi.procid() << ": Broadcast Scattering: "
1476  << lvertex.global_id() << std::endl;
1477 // ASSERT_I_AM_OWNER(sched_lvid);
1478 
1479  const unsigned char prevkey =
1480  rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
1481  rmi.remote_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
1482  &engine_type::rpc_begin_scattering,
1483  lvertex.global_id(), prog, central_vdata);
1484  rmi.dc().set_sequentialization_key(prevkey);
1485  END_TRACEPOINT(disteng_init_scattering);
1486  }
1487 
1488 
1489  /**
1490  * \internal
1491  * Called remotely by master_broadcast_scattering.
1492  * Stores the modified vertex program and vertex data and
1493  * switches the vertex to a SCATTERING state.
1494  */
1495  void rpc_begin_scattering(vertex_id_type vid,
1496  const vertex_program_type& prog,
1497  const vertex_data_type &central_vdata) {
1498  vertex_id_type lvid = graph.local_vid(vid);
1499  vstate[lvid].lock();
1500 // ASSERT_I_AM_NOT_OWNER(lvid);
1501  //ASSERT_EQ((int)vstate[lvid].state, MIRROR_SCATTERING);
1502  //vstate[lvid].state = MIRROR_SCATTERING;
1503  ASSERT_MSG(vstate[lvid].state == MIRROR_SCATTERING ||
1504  vstate[lvid].state == MIRROR_SCATTERING_AND_NEXT_GATHERING,
1505  "Unexpected state: %d", (int)(vstate[lvid].state));
1506  graph.get_local_graph().vertex_data(lvid) = central_vdata;
1507  vstate[lvid].vertex_program = prog;
1508  add_internal_task(lvid);
1509  vstate[lvid].unlock();
1510  }
1511 
1512 
1513  /**
1514  * \internal
1515  * Performs the scatter operation on vertex lvid. locks should be acquired.
1516  */
1517  void do_scatter(lvid_type lvid) {
1518  BEGIN_TRACEPOINT(disteng_evalfac);
1519  local_vertex_type lvertex(graph.l_vertex(lvid));
1520  vertex_type vertex(lvertex);
1521 
1522  context_type context(*this, graph);
1523 
1524  edge_dir_type scatterdir = vstate[lvid].vertex_program.scatter_edges(context, vertex);
1525 
1526  if(scatterdir == graphlab::IN_EDGES ||
1527  scatterdir == graphlab::ALL_EDGES) {
1528  if (!disable_locks) factorized_lock_edge2_begin(lvid);
1529  foreach(local_edge_type edge, lvertex.in_edges()) {
1530  if (!disable_locks && factorized_consistency) {
1531  factorized_lock_edge2(lvid, edge.source().id());
1532  }
1533  edge_type e(edge);
1534  vstate[lvid].vertex_program.scatter(context, vertex, e);
1535  if (!disable_locks && factorized_consistency) {
1536  factorized_unlock_edge2(lvid, edge.source().id());
1537  }
1538  }
1539  if (!disable_locks) factorized_unlock_edge2_end(lvid);
1540  INCREMENT_EVENT(EVENT_SCATTERS, lvertex.num_in_edges());
1541  }
1542  if(scatterdir == graphlab::OUT_EDGES ||
1543  scatterdir == graphlab::ALL_EDGES) {
1544  if (!disable_locks) factorized_lock_edge2_begin(lvid);
1545  foreach(local_edge_type edge, lvertex.out_edges()) {
1546  if (!disable_locks && factorized_consistency) {
1547  factorized_lock_edge2(lvid, edge.target().id());
1548  }
1549  edge_type e(edge);
1550  vstate[lvid].vertex_program.scatter(context, vertex, e);
1551  if (!disable_locks && factorized_consistency) {
1552  factorized_unlock_edge2(lvid, edge.target().id());
1553  }
1554  }
1555  if (!disable_locks) factorized_unlock_edge2_end(lvid);
1556  INCREMENT_EVENT(EVENT_SCATTERS, lvertex.num_out_edges());
1557  }
1558  END_TRACEPOINT(disteng_evalfac);
1559  } // end of do scatter
1560 
1561 
1562 /**************************************************************************
1563  * Thread Management *
1564  * Functions which manage the thread queues, internal task queues, *
1565  * termination, etc *
1566  **************************************************************************/
1567  private:
1568  /**
1569  * \internal
1570  * Key internal dispatch method.
1571  * Advances the state of a vertex based on its state
1572  * as decribed in the vertex_state.
1573  */
1574  void eval_internal_task(size_t threadid, lvid_type lvid) {
1575  // if lvid is >= #local vertices, this is an aggregator task
1576  if (lvid >= vstate.size()) {
1577  aggregator.tick_asynchronous_compute(threadid,
1578  aggregate_id_to_key[-lvid]);
1579  return;
1580  }
1581  bool gather_fast_path = false;
1582  vstate[lvid].lock();
1583 EVAL_INTERNAL_TASK_RE_EVAL_STATE:
1584  switch(vstate[lvid].state) {
1585  case NONE:
1586  break;
1587  case LOCKING: {
1588  BEGIN_TRACEPOINT(disteng_chandy_misra);
1589  cmlocks->make_philosopher_hungry_per_replica(lvid);
1590  END_TRACEPOINT(disteng_chandy_misra);
1591  break;
1592  }
1593  case GATHERING: {
1594  logstream(LOG_DEBUG) << rmi.procid() << ": Internal Task: "
1595  << graph.global_vid(lvid) << ": GATHERING(" << vstate[lvid].apply_count_down << ")" << std::endl;
1596 
1597  gather_fast_path = process_gather(lvid);
1598  if (gather_fast_path) {
1599  // immdiately apply and scatter
1600  goto EVAL_INTERNAL_TASK_RE_EVAL_STATE;
1601  } else {
1602  break;
1603  }
1604  }
1605  case MIRROR_GATHERING: {
1606  logstream(LOG_DEBUG) << rmi.procid() << ": Internal Task: "
1607  << graph.global_vid(lvid) << ": MIRROR_GATHERING" << std::endl;
1608  process_gather(lvid);
1609  break;
1610  }
1611  case APPLYING: {
1612  logstream(LOG_DEBUG) << rmi.procid() << ": Internal Task: "
1613  << graph.global_vid(lvid) << ": APPLYING" << std::endl;
1614 
1615  do_apply(lvid);
1616  vstate[lvid].state = SCATTERING;
1617  master_broadcast_scattering(lvid,
1618  vstate[lvid].vertex_program,
1619  graph.get_local_graph().vertex_data(lvid));
1620  // fall through to scattering
1621  }
1622  case SCATTERING: {
1623  logstream(LOG_DEBUG) << rmi.procid() << ": Scattering: "
1624  << graph.global_vid(lvid) << ": SCATTERING" << std::endl;
1625 
1626  do_scatter(lvid);
1627  programs_executed.inc();
1628  pending_updates.dec();
1629  if (track_task_retire_time) {
1630  total_update_time.inc(launch_timer.current_time() - task_start_time[lvid]);
1631  }
1632  // clear the vertex program
1633  vstate[lvid].vertex_program = vertex_program_type();
1634  BEGIN_TRACEPOINT(disteng_chandy_misra);
1635  cmlocks->philosopher_stops_eating_per_replica(lvid);
1636  END_TRACEPOINT(disteng_chandy_misra);
1637 
1638  if (vstate[lvid].hasnext) {
1639  // stick next back into the scheduler
1640  signal_local_next(lvid);
1641  vstate[lvid].hasnext = false;
1642  }
1643  vstate[lvid].state = NONE;
1644  break;
1645  }
1646  case MIRROR_SCATTERING: {
1647  logstream(LOG_DEBUG) << rmi.procid() << ": Scattering: "
1648  << graph.global_vid(lvid) << ": MIRROR_SCATTERING" << std::endl;
1649  do_scatter(lvid);
1650  vstate[lvid].vertex_program = vertex_program_type();
1651  vstate[lvid].state = NONE;
1652  cmlocks->philosopher_stops_eating_per_replica(lvid);
1653 // ASSERT_FALSE(vstate[lvid].hasnext);
1654  break;
1655  }
1656  case MIRROR_SCATTERING_AND_NEXT_LOCKING: {
1657  logstream(LOG_DEBUG) << rmi.procid() << ": Scattering: "
1658  << graph.global_vid(lvid) << ": MIRROR_SCATTERING_AND_NEXT_LOCKING" << std::endl;
1659  do_scatter(lvid);
1660  vstate[lvid].vertex_program = vertex_program_type();
1661  vstate[lvid].state = LOCKING;
1662 // ASSERT_FALSE(vstate[lvid].hasnext);
1663  cmlocks->philosopher_stops_eating_per_replica(lvid);
1664  cmlocks->make_philosopher_hungry_per_replica(lvid);
1665  break;
1666  }
1667  case MIRROR_SCATTERING_AND_NEXT_GATHERING: {
1668  logstream(LOG_DEBUG) << rmi.procid() << ": Scattering: "
1669  << graph.global_vid(lvid) << ": MIRROR_SCATTERING_AND_NEXT_GATHERING" << std::endl;
1670  do_scatter(lvid);
1671  vstate[lvid].vertex_program = vstate[lvid].factorized_next;
1672  vstate[lvid].factorized_next = vertex_program_type();
1673  vstate[lvid].state = MIRROR_GATHERING;
1674  vstate[lvid].combined_gather.clear();
1675  goto EVAL_INTERNAL_TASK_RE_EVAL_STATE;
1676  }
1677  }
1678  vstate[lvid].unlock();
1679  } // end of eval internal task
1680 
1681 
1682  /**
1683  * \internal
1684  * Picks up a task from the internal queue or the scheduler.
1685  * \param has_internal_task Set to true, if an internal task is returned
1686  * \param internal_lvid Contains a queue of internal tasks.
1687  * Set if has_internal_task = true
1688  * \param has_sched_msg Set to true if a scheduler entry is returned
1689  * \param sched_lvid A vertex to activate. Set if has_sched_msg = true
1690  * \param msg A message to send to sched_lvid. Set if has_sched_msg = true
1691  */
1692  void get_a_task(size_t threadid,
1693  bool& has_internal_task,
1694  std::vector<std::vector<lvid_type> >& internal_lvid,
1695  bool& has_sched_msg,
1696  lvid_type& sched_lvid,
1697  message_type &msg) {
1698  has_internal_task = false;
1699  has_sched_msg = false;
1700  if (timer::approx_time_seconds() - engine_start_time > timed_termination) {
1701  return;
1702  }
1703  BEGIN_TRACEPOINT(disteng_internal_task_queue);
1704  if (thrlocal[threadid].get_task(internal_lvid)) {
1705  has_internal_task = true;
1706  END_TRACEPOINT(disteng_internal_task_queue);
1707  return;
1708  }
1709  END_TRACEPOINT(disteng_internal_task_queue);
1710 
1711  if (cmlocks->num_clean_forks() >= max_clean_forks) {
1712  return;
1713  }
1714  if (pending_updates.value > max_pending) {
1715  return;
1716  }
1718  scheduler_ptr->get_next(threadid, sched_lvid, msg);
1719  has_sched_msg = stat != sched_status::EMPTY;
1720  } // end of get a task
1721 
1722 
1723  /**
1724  * \internal
1725  * Called when get_a_task returns no internal task not a scheduler task.
1726  * This rechecks the status of the internal task queue and the scheduler
1727  * inside a consensus critical section.
1728  */
1729  bool try_to_quit(size_t threadid,
1730  bool& has_internal_task,
1731  std::vector<std::vector<lvid_type> >& internal_lvid,
1732  bool& has_sched_msg,
1733  lvid_type& sched_lvid,
1734  message_type &msg) {
1735 
1736  if (handler_intercept) rmi.dc().handle_incoming_calls(threadid, ncpus);
1737  static size_t ctr = 0;
1738  if (timer::approx_time_seconds() - engine_start_time > timed_termination) {
1739  termination_reason = execution_status::TIMEOUT;
1740  force_stop = true;
1741  }
1742  if (!force_stop &&
1743  issued_messages.value != programs_executed.value + blocked_issues.value) {
1744  ++ctr;
1745 /* if (ctr % 256 == 0) {
1746  usleep(1);
1747  }
1748  if (ctr % 4096 == 0) {
1749  if (issued_messages.value <= programs_executed.value + blocked_issues.value + 2) {
1750  for (size_t i = threadid;i < vstate.size(); i+=ncpus) {
1751  if (vstate[i].state != NONE) {
1752 
1753  logstream(LOG_INFO) << rmi.procid() << " Gvid: " << graph.global_vid(i) << "\n ";
1754  logstream(LOG_INFO) << "State = " << vstate[i].state << "\n ";
1755  local_vertex_type vertex = graph.l_vertex(i);
1756  if (graph.l_get_vertex_record(i).owner == rmi.procid()) {
1757  logstream(LOG_INFO) << "Mirrors on : ";
1758  foreach(const procid_t& mirror, vertex.mirrors()) {
1759  logstream(LOG_INFO) << mirror << " ";
1760  }
1761  logstream(LOG_INFO) << "\n ";
1762  logstream(LOG_INFO) << vstate[i].apply_count_down << " " << vstate[i].hasnext << "\n";
1763  } else {
1764  logstream(LOG_INFO) << "Master = " << graph.l_get_vertex_record(i).owner << "\n";
1765  }
1766  }
1767  }
1768  }
1769  } */
1770  rmi.dc().flush();
1771  return false;
1772  }
1773  logstream(LOG_DEBUG) << rmi.procid() << "-" << threadid << ": " << "Termination Attempt "
1774  << programs_executed.value << "/" << issued_messages.value << std::endl;
1775  has_internal_task = false;
1776  has_sched_msg = false;
1777 
1778  if (handler_intercept) rmi.dc().start_handler_threads(threadid, ncpus);
1779  consensus->begin_done_critical_section(threadid);
1780 
1781  BEGIN_TRACEPOINT(disteng_internal_task_queue);
1782  if (thrlocal[threadid].get_task(internal_lvid)) {
1783  logstream(LOG_DEBUG) << rmi.procid() << "-" << threadid << ": "
1784  << "\tCancelled by Internal Task" << std::endl;
1785  has_internal_task = true;
1786  consensus->cancel_critical_section(threadid);
1787  END_TRACEPOINT(disteng_internal_task_queue);
1788 
1789  if (handler_intercept) rmi.dc().stop_handler_threads(threadid, ncpus);
1790  return false;
1791  }
1792  END_TRACEPOINT(disteng_internal_task_queue);
1793 
1795  scheduler_ptr->get_next(threadid, sched_lvid, msg);
1796  if (stat == sched_status::EMPTY) {
1797  logstream(LOG_DEBUG) << rmi.procid() << "-" << threadid << ": "
1798  << "\tTermination Double Checked" << std::endl;
1799 
1800  DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
1801  if (!endgame_mode) logstream(LOG_EMPH) << "Endgame mode\n";
1802  endgame_mode = true;
1803 /*
1804  for (size_t i = threadid;i < vstate.size(); i+=ncpus) {
1805  if (vstate[i].state != NONE) {
1806  std::cout << rmi.procid() << " Gvid: " << graph.global_vid(i) << "\n";
1807  std::cout << "EGState = " << vstate[i].state << "\n";
1808  local_vertex_type vertex = graph.l_vertex(i);
1809  if (graph.l_get_vertex_record(i).owner == rmi.procid()) {
1810  std::cout << "Mirrors on : ";
1811  foreach(const procid_t& mirror, vertex.mirrors()) {
1812  std::cout << mirror << " ";
1813  }
1814  std::cout << "\n";
1815  std::cout << vstate[i].apply_count_down << " " << vstate[i].hasnext << "\n";
1816  } else {
1817  std::cout << "Master = " << graph.l_get_vertex_record(i).owner << "\n";
1818  }
1819  }
1820  }
1821 
1822 */
1823  bool ret = consensus->end_done_critical_section(threadid);
1824  INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
1825  if (ret == false) {
1826  if (handler_intercept) rmi.dc().stop_handler_threads(threadid, ncpus);
1827  logstream(LOG_DEBUG) << rmi.procid() << "-" << threadid << ": "
1828  << "\tCancelled" << std::endl;
1829  }
1830  return ret;
1831  } else {
1832  logstream(LOG_DEBUG) << rmi.procid() << "-" << threadid << ": "
1833  << "\tCancelled by Scheduler Task" << std::endl;
1834  consensus->cancel_critical_section(threadid);
1835  has_sched_msg = true;
1836  if (handler_intercept) rmi.dc().stop_handler_threads(threadid, ncpus);
1837  return false;
1838  }
1839  } // end of try to quit
1840 
1841 
1842  /**
1843  * \internal
1844  * Adds a vertex to the internal task queue
1845  */
1846  void add_internal_task(lvid_type lvid) {
1847  if (force_stop) return;
1848  BEGIN_TRACEPOINT(disteng_internal_task_queue);
1849  size_t a;
1850  a = lvid % thrlocal.size();
1851  thrlocal[a].add_task(lvid);
1852  consensus->cancel_one(a);
1853  END_TRACEPOINT(disteng_internal_task_queue);
1854  }
1855 
1856  void add_internal_aggregation_task(const std::string& key) {
1857  ASSERT_GT(aggregate_key_to_id.count(key), 0);
1858  lvid_type id = -aggregate_key_to_id[key];
1859  // add to all internal task queues
1860  for (size_t i = 0;i < ncpus; ++i) {
1861  thrlocal[i].add_task_priority(id);
1862  }
1863  consensus->cancel();
1864  }
1865 
1866  void ping() {
1867  }
1868 
1869  /**
1870  * \internal
1871  * Callback from the Chandy misra implementation.
1872  * This is called when a vertex acquired all available local forks.
1873  * This is really an optimization. We extract all available tasks from the
1874  * on this vertex from the scheduler and forward it to the master.
1875  */
1876  void forward_cached_schedule(lvid_type lvid) {
1877  if (!factorized_consistency) {
1878  message_type msg;
1879  const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(lvid);
1880  if (rec.owner != rmi.procid()) {
1881  if (scheduler_ptr->get_specific(lvid, msg) == sched_status::NEW_TASK) {
1882  rmi.remote_call(rec.owner, &engine_type::rpc_signal, rec.gvid, msg);
1883  }
1884  }
1885  }
1886  }
1887  /**
1888  * \internal
1889  * Called when the scheduler returns a vertex to run.
1890  * If this function is called with vertex locks acquired, prelocked
1891  * should be true. Otherwise it should be false.
1892  */
1893  template <bool prelocked>
1894  void eval_sched_task(const lvid_type sched_lvid,
1895  const message_type& msg) {
1896  BEGIN_TRACEPOINT(disteng_eval_sched_task);
1897  logstream(LOG_DEBUG) << rmi.procid() << ": Schedule Task: "
1898  << graph.global_vid(sched_lvid) << std::endl;
1899  // If I am not the owner just forward the task to the other
1900  // scheduler and return
1901  const typename graph_type::vertex_record& rec = graph.l_get_vertex_record(sched_lvid);
1902  const procid_t owner = rec.owner;
1903  bool acquirelock = false;
1904  if (owner != rmi.procid()) {
1905  const vertex_id_type vid = rec.gvid;
1906  rmi.remote_call(owner, &engine_type::rpc_signal, vid, msg);
1907  return;
1908  }
1909 // ASSERT_I_AM_OWNER(sched_lvid);
1910  // this is in local VIDs
1911  issued_messages.inc();
1912  pending_updates.inc();
1913  if (prelocked == false) {
1914  vstate[sched_lvid].lock();
1915  }
1916  if (track_task_retire_time) {
1917  task_start_time[sched_lvid] = launch_timer.current_time();
1918  }
1919 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1920  if (factorized_consistency) {
1921 
1922  if (vstate[sched_lvid].state == NONE) {
1923  // push into gathering state
1924  vstate[sched_lvid].state = GATHERING;
1925  vstate[sched_lvid].hasnext = false;
1926  vstate[sched_lvid].current_message = msg;
1927  vstate[sched_lvid].apply_count_down = graph.l_vertex(sched_lvid).num_mirrors() + 1;
1928  do_init_gather(sched_lvid);
1929  master_broadcast_gathering(sched_lvid, vstate[sched_lvid].vertex_program);
1930  }
1931  else {
1932  blocked_issues.inc();
1933  pending_updates.dec();
1934  if (vstate[sched_lvid].hasnext) {
1935  scheduler_ptr->place(sched_lvid, msg);
1936  joined_messages.inc();
1937  } else {
1938  vstate[sched_lvid].hasnext = true;
1939  scheduler_ptr->place(sched_lvid, msg);
1940  }
1941  }
1942  if (prelocked == false) vstate[sched_lvid].unlock();
1943  END_TRACEPOINT(disteng_eval_sched_task);
1944  }
1945  else {
1946 #endif
1947  if (vstate[sched_lvid].state == NONE) {
1948 
1949  INCREMENT_EVENT(EVENT_ACTIVE_TASKS, 1);
1950  // we start gather right here.
1951  // set up the state
1952  vstate[sched_lvid].state = LOCKING;
1953  vstate[sched_lvid].hasnext = false;
1954  vstate[sched_lvid].current_message = msg;
1955  vstate[sched_lvid].apply_count_down = graph.l_vertex(sched_lvid).num_mirrors() + 1;
1956  acquirelock = true;
1957  // we are going to broadcast after unlock
1958  } else if (vstate[sched_lvid].state == LOCKING) {
1959  blocked_issues.inc();
1960  pending_updates.dec();
1961  vstate[sched_lvid].current_message += msg;
1962  joined_messages.inc();
1963  } else {
1964  blocked_issues.inc();
1965  pending_updates.dec();
1966  if (vstate[sched_lvid].hasnext) {
1967  scheduler_ptr->place(sched_lvid, msg);
1968  joined_messages.inc();
1969  } else {
1970  vstate[sched_lvid].hasnext = true;
1971  scheduler_ptr->place(sched_lvid, msg);
1972  }
1973  }
1974  if (prelocked == false) vstate[sched_lvid].unlock();
1975  END_TRACEPOINT(disteng_eval_sched_task);
1976  if (acquirelock) {
1977  BEGIN_TRACEPOINT(disteng_chandy_misra);
1978  cmlocks->make_philosopher_hungry_per_replica(sched_lvid);
1979  END_TRACEPOINT(disteng_chandy_misra);
1980  master_broadcast_locking(sched_lvid);
1981  }
1982 #ifdef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
1983  }
1984 #endif
1985  }
1986 
1987 
1988  atomic<size_t> pingid;
1989  /**
1990  * \internal
1991  * Per thread main loop
1992  */
1993  void thread_start(size_t threadid) {
1994  if (handler_intercept) rmi.dc().stop_handler_threads(threadid, ncpus);
1995  bool has_internal_task = false;
1996  bool has_sched_msg = false;
1997  std::vector<std::vector<lvid_type> > internal_lvid;
1998  lvid_type sched_lvid;
1999 
2000  INCREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
2001  message_type msg;
2002 // size_t ctr = 0;
2003  float last_aggregator_check = timer::approx_time_seconds();
2004  // every 0.01 seconds, we poke a machine
2005  float next_processing_time = 0.05;
2006  timer ti; ti.start();
2007  while(1) {
2008  if (timer::approx_time_seconds() != last_aggregator_check) {
2009  last_aggregator_check = timer::approx_time_seconds();
2010  std::string key = aggregator.tick_asynchronous();
2011  if (key != "") add_internal_aggregation_task(key);
2012  }
2013 /* ++ctr;
2014  if (max_clean_forks != (size_t)(-1) && ctr % 10000 == 0) {
2015  std::cout << cmlocks->num_clean_forks() << "/" << max_clean_forks << "\n";
2016  }*/
2017 
2018  if (handler_intercept) rmi.dc().handle_incoming_calls(threadid, ncpus);
2019 
2020  if (ti.current_time() >= next_processing_time && rmi.numprocs() > 1) {
2021  // every now and then, I ping one machine. This has the
2022  // effect of completely flushing the channel between me and
2023  // that machine
2024  if (handler_intercept) rmi.dc().start_handler_threads(threadid, ncpus);
2025  size_t p = pingid.inc() % rmi.numprocs();
2026  if (p == rmi.procid()) {
2027  p = pingid.inc() % rmi.numprocs();
2028  }
2029  rmi.remote_request(p, &async_consistent_engine::ping);
2030 
2031  if (handler_intercept) {
2032  rmi.dc().stop_handler_threads(threadid, ncpus);
2033  }
2034  ti.start();
2035  }
2036 
2037  get_a_task(threadid,
2038  has_internal_task, internal_lvid,
2039  has_sched_msg, sched_lvid, msg);
2040  // if we managed to get a task..
2041  if (has_internal_task) {
2042  for (size_t i = 0;i < internal_lvid.size(); ++i) {
2043  for (size_t j = 0;j < internal_lvid[i].size(); ++j) {
2044  eval_internal_task(threadid, internal_lvid[i][j]);
2045  }
2046  }
2047  if (endgame_mode) rmi.dc().flush();
2048  } else if (has_sched_msg) {
2049  eval_sched_task<false>(sched_lvid, msg);
2050  if (endgame_mode) rmi.dc().flush();
2051  }
2052 
2053  /*
2054  * We failed to obtain a task, try to quit
2055  */
2056  else if (!try_to_quit(threadid,
2057  has_internal_task, internal_lvid,
2058  has_sched_msg, sched_lvid, msg)) {
2059  // if we ran out of tasks, lets flush more regularly
2060  next_processing_time = timer::approx_time_seconds();
2061  if (has_internal_task) {
2062  for (size_t i = 0;i < internal_lvid.size(); ++i) {
2063  for (size_t j = 0;j < internal_lvid[i].size(); ++j) {
2064  eval_internal_task(threadid, internal_lvid[i][j]);
2065  }
2066  }
2067  } else if (has_sched_msg) {
2068  eval_sched_task<false>(sched_lvid, msg);
2069  }
2070  } else { break; }
2071  }
2072  if (endgame_mode) next_processing_time = 0.01;
2073  DECREMENT_EVENT(EVENT_ACTIVE_CPUS, 1);
2074  } // end of thread start
2075 
2076 
2077 /**************************************************************************
2078  * For init vertex program *
2079  * For convenience, the init() of vertex programs are managed by a *
2080  * separate set of threads which are only activated at the start. *
2081  * This is copied from the synchronous engine. *
2082 ***************************************************************************/
2083  private:
2084  /// \internal pair of vertex id and vertex data
2085  typedef std::pair<vertex_id_type, vertex_data_type> vid_vdata_pair_type;
2086  /// \internal Exchange type used to swap vertex data on init program
2087  typedef buffered_exchange<vid_vdata_pair_type> vdata_exchange_type;
2088  /// \internal Exchange structure used to swap vertex data on init program
2089  vdata_exchange_type vdata_exchange;
2090 
2091  /// \internal Synchronizes init vertex program threads
2092  barrier thread_barrier;
2093 
2094  /**
2095  * \internal
2096  * synchronizes the local vertex lvid
2097  */
2098  void sync_vertex_data(lvid_type lvid) {
2099  ASSERT_TRUE(graph.l_is_master(lvid));
2100  const vertex_id_type vid = graph.global_vid(lvid);
2101  local_vertex_type vertex = graph.l_vertex(lvid);
2102  foreach(const procid_t& mirror, vertex.mirrors()) {
2103  vdata_exchange.send(mirror, std::make_pair(vid, vertex.data()));
2104  }
2105  } // end of sync_vertex_data
2106 
2107  /**
2108  * \internal
2109  * Receives data from the enchange and saves it
2110  */
2111  void recv_vertex_data() {
2112  procid_t procid(-1);
2113  typename vdata_exchange_type::buffer_type buffer;
2114  while(vdata_exchange.recv(procid, buffer)) {
2115  foreach(const vid_vdata_pair_type& pair, buffer) {
2116  const lvid_type lvid = graph.local_vid(pair.first);
2117  ASSERT_FALSE(graph.l_is_master(lvid));
2118  graph.l_vertex(lvid).data() = pair.second;
2119  }
2120  }
2121  } // end of recv vertex data
2122 
2123  // /**
2124  // * \internal
2125  // * Called simultaneously by all machines to initialize all vertex programs
2126  // */
2127  // void initialize_vertex_programs(size_t thread_id) {
2128  // // For now we are using the engine as the context interface
2129  // context_type context(*this, graph);
2130  // for(lvid_type lvid = thread_id; lvid < graph.num_local_vertices();
2131  // lvid += ncpus) {
2132  // if(graph.l_is_master(lvid)) {
2133  // vertex_type vertex = local_vertex_type(graph.l_vertex(lvid));
2134  // vstate[lvid].vertex_program.init(context, vertex);
2135  // sync_vertex_data(lvid);
2136  // }
2137  // recv_vertex_data();
2138  // }
2139  // // Flush the buffer and finish receiving any remaining vertex
2140  // // programs.
2141  // thread_barrier.wait();
2142  // if(thread_id == 0) { vdata_exchange.flush(); }
2143  // thread_barrier.wait();
2144  // recv_vertex_data();
2145  // } // end of initialize_vertex_programs
2146 
2147 
2148 
2149 /**************************************************************************
2150  * Main engine start() *
2151  **************************************************************************/
2152 
2153  public:
2154 
2155  /**
2156  * \brief Start the engine execution.
2157  *
2158  * This function starts the engine and does not
2159  * return until the scheduler has no tasks remaining.
2160  *
2161  * \return the reason for termination
2162  */
2164  logstream(LOG_INFO) << "Spawning " << ncpus << " threads" << std::endl;
2165  ASSERT_TRUE(scheduler_ptr != NULL);
2166  consensus->reset();
2167  // start the scheduler
2168  scheduler_ptr->start();
2169 
2170 
2171  // start the aggregator
2172  aggregator.start(ncpus);
2173  aggregator.aggregate_all_periodic();
2174  generate_aggregate_keys();
2175  // now since I am overloading the internal task queue IDs to also
2176  // hold aggregator keys, this constraints the number of vertices
2177  // I can handle. Check for overflow
2178  {
2179  lvid_type lv = (lvid_type)graph.num_local_vertices();
2180  ASSERT_MSG(lv + aggregate_id_to_key.size() >= lv,
2181  "Internal Queue IDs numeric overflow");
2182  }
2183  started = true;
2184 
2185  rmi.barrier();
2186 
2187  size_t allocatedmem = memory_info::allocated_bytes();
2188  rmi.all_reduce(allocatedmem);
2189 
2190  engine_start_time = timer::approx_time_seconds();
2191  force_stop = false;
2192  total_update_time.value = 0.0;
2193  endgame_mode = false;
2194  issued_messages = 0;
2195  pending_updates = 0;
2196  blocked_issues = 0;
2197  programs_executed = 0;
2198  if (track_task_retire_time) {
2199  task_start_time.resize(graph.num_local_vertices());
2200  }
2201  launch_timer.start();
2202 
2203  termination_reason = execution_status::RUNNING;
2204 
2205  // if (perform_init_vertex_program) {
2206  // logstream(LOG_INFO) << "Initialize Vertex Programs: "
2207  // << allocatedmem << std::endl;
2208  // for (size_t i = 0; i < ncpus; ++i) {
2209  // thrgroup.launch(boost::bind(&engine_type::initialize_vertex_programs, this, i));
2210  // }
2211  // thrgroup.join();
2212  // }
2213 
2214  if (rmi.procid() == 0) {
2215  logstream(LOG_INFO) << "Total Allocated Bytes: " << allocatedmem << std::endl;
2216  }
2217  for (size_t i = 0; i < ncpus; ++i) {
2218  thrgroup.launch(boost::bind(&engine_type::thread_start, this, i), i);
2219  }
2220  thrgroup.join();
2221  aggregator.stop();
2222  // if termination reason was not changed, then it must be depletion
2223  if (termination_reason == execution_status::RUNNING) {
2224  termination_reason = execution_status::TASK_DEPLETION;
2225  }
2226 
2227  if (rmi.procid() == 0) {
2228  }
2229  size_t ctasks = programs_executed.value;
2230  rmi.all_reduce(ctasks);
2231  programs_executed.value = ctasks;
2232 
2233  ctasks = issued_messages.value;
2234  rmi.all_reduce(ctasks);
2235  issued_messages.value = ctasks;
2236 
2237  ctasks = blocked_issues.value;
2238  rmi.all_reduce(ctasks);
2239  blocked_issues.value = ctasks;
2240 
2241  ctasks = joined_messages.value;
2242  ctasks += scheduler_ptr->num_joins();
2243  rmi.all_reduce(ctasks);
2244  joined_messages.value = ctasks;
2245 
2246  double total_upd_time = total_update_time.value;
2247  rmi.all_reduce(total_upd_time);
2248  total_update_time.value = total_upd_time;
2249 
2250  rmi.cout() << "Completed Tasks: " << programs_executed.value << std::endl;
2251  rmi.cout() << "Issued Tasks: " << issued_messages.value << std::endl;
2252  rmi.cout() << "Blocked Issues: " << blocked_issues.value << std::endl;
2253  if (track_task_retire_time) {
2254  rmi.cout() << "Average Task Retire Time: "
2255  << total_update_time.value / programs_executed.value
2256  << std::endl;
2257  }
2258  rmi.cout() << "Joined Tasks: " << joined_messages.value << std::endl;
2259 
2260  /*for (size_t i = 0;i < vstate.size(); ++i) {
2261  if(vstate[i].state != NONE) {
2262  std::cout << "Vertex: " << i << ": " << vstate[i].state << " " << (int)(cmlocks->philosopherset[i].state) << " " << cmlocks->philosopherset[i].num_edges << " " << cmlocks->philosopherset[i].forks_acquired << "\n";
2263 
2264  foreach(typename local_graph_type::edge_type edge, cmlocks->graph.in_edges(i)) {
2265  std::cout << (int)(cmlocks->forkset[cmlocks->graph.edge_id(edge)]) << " ";
2266  }
2267  std::cout << "\n";
2268  foreach(typename local_graph_type::edge_type edge, cmlocks->graph.out_edges(i)) {
2269  std::cout << (int)(cmlocks->forkset[cmlocks->graph.edge_id(edge)]) << " ";
2270  }
2271  std::cout << "\n";
2272  getchar();
2273  }
2274  }*/
2275  started = false;
2276  return termination_reason;
2277  } // end of start
2278 
2279 /************************************************************
2280  * Aggregators *
2281  ************************************************************/
2282  private:
2283  std::map<std::string, lvid_type> aggregate_key_to_id;
2284  std::vector<std::string> aggregate_id_to_key;
2285 
2286  /**
2287  * \internal
2288  * Assigns each periodic aggregation key a unique ID so we can use it in
2289  * internal task scheduling.
2290  */
2291  void generate_aggregate_keys() {
2292  aggregate_key_to_id.clear();
2293  aggregate_id_to_key.clear();
2294  // no ID 0. since we are using negatives for scheduling
2295  aggregate_id_to_key.push_back("");
2296  std::set<std::string> keys = aggregator.get_all_periodic_keys();
2297 
2298  foreach(std::string key, keys) {
2299  aggregate_id_to_key.push_back(key);
2300  aggregate_key_to_id[key] = (lvid_type)(aggregate_id_to_key.size() - 1);
2301 
2302  }
2303  }
2304  public:
2305  // // Exposed aggregator functionality
2306  // /**
2307  // * \copydoc distributed_aggregator::add_vertex_aggregator()
2308  // */
2309  // template <typename ReductionType>
2310  // bool add_vertex_aggregator(const std::string& key,
2311  // boost::function<ReductionType(icontext_type&,
2312  // vertex_type&)> map_function,
2313  // boost::function<void(icontext_type&,
2314  // const ReductionType&)> finalize_function) {
2315  // rmi.barrier();
2316  // return aggregator.add_vertex_aggregator<ReductionType>(key,
2317  // map_function,
2318  // finalize_function);
2319  // }
2320 
2321  // /**
2322  // * \copydoc distributed_aggregator::add_edge_aggregator()
2323  // */
2324  // template <typename ReductionType>
2325  // bool add_edge_aggregator(const std::string& key,
2326  // boost::function<ReductionType(icontext_type&,
2327  // edge_type&)> map_function,
2328  // boost::function<void(icontext_type&,
2329  // const ReductionType&)> finalize_function) {
2330  // rmi.barrier();
2331  // return aggregator.add_edge_aggregator<ReductionType>(key,
2332  // map_function,
2333  // finalize_function);
2334  // }
2335 
2336  // /**
2337  // * \copydoc distributed_aggregator::aggregate_now()
2338  // */
2339  // bool aggregate_now(const std::string& key) {
2340  // rmi.barrier();
2341  // return aggregator.aggregate_now(key);
2342  // }
2343 
2344  // /**
2345  // * \copydoc distributed_aggregator::aggregate_periodic()
2346  // */
2347  // bool aggregate_periodic(const std::string& key, float seconds) {
2348  // rmi.barrier();
2349  // return aggregator.aggregate_periodic(key, seconds);
2350  // }
2351 
2352  aggregator_type* get_aggregator() { return &aggregator; }
2353 
2354  }; // end of class
2355 } // namespace
2356 
2357 #include <graphlab/macros_undef.hpp>
2358 
2359 #undef ASYNC_ENGINE_FACTORIZED_AVOID_SCHEDULER_GATHER
2360 
2361 #endif // GRAPHLAB_DISTRIBUTED_ENGINE_HPP
2362