GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
async_consensus.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef ASYNC_TERMINATOR_HPP
25 #define ASYNC_TERMINATOR_HPP
26 
27 #include <graphlab/parallel/pthread_tools.hpp>
28 
29 #include <graphlab/rpc/dc.hpp>
30 #include <graphlab/rpc/dc_dist_object_base.hpp>
31 #include <graphlab/rpc/dc_dist_object.hpp>
32 
33 
34 namespace graphlab {
35  /**
36  * \ingroup rpc
37  * \brief This implements a distributed consensus algorithm which waits
38  * for global completion of all computation/RPC events on a given object.
39  *
40  * The use case is as follows:
41  *
42  * A collection of threads on a collection of distributed machines, each
43  * running the following
44  *
45  * \code
46  * while (work to be done) {
47  * Do_stuff
48  * }
49  * \endcode
50  *
51  * However, <tt>Do_stuff</tt> will include RPC calls which may introduce
52  * work to other threads/machines.
53  * Figuring out when termination is possible is complex. For instance RPC calls
54  * could be in-flight and not yet received. This async_consensus class
55  * implements a solution built around the algorithm in
56  * <i>Misra, J.: Detecting Termination of Distributed Computations Using Markers, SIGOPS, 1983</i>
57  * extended to handle the mixed parallelism (distributed with threading) case.
58  *
59  * The main loop of the user has to be modified to:
60  *
61  * \code
62  * done = false;
63  * while (!done) {
64  * Do_stuff
65  * // if locally, I see that there is no work to be done
66  * // we begin the consensus checking
67  * if (no work to be done) {
68  * // begin the critical section and try again
69  * consensus.begin_done_critical_section();
70  * // if still no work to be done
71  * if (no work to be done) {
72  * done = consensus.end_done_critical_section();
73  * }
74  * else {
75  * consensus.cancel_critical_section();
76  * }
77  * }
78  * }
79  *
80  * \endcode
81  *
82  * Additionally, incoming RPC calls which create work must ensure there are
83  * active threads which are capable of processing the work. An easy solution
84  * will be to simply cancel_one(). Other more optimized solutions
85  * include keeping a counter of the number of active threads, and only calling
86  * cancel() or cancel_one() if all threads are asleep. (Note that the optimized
87  * solution does require some care to ensure dead-lock free execution).
88  */
90  public:
91  /** \brief Constructs an asynchronous consensus object
92  *
93  * The consensus procedure waits till all threads have no work to do and are
94  * waiting in consensus, and there is no communication going on which
95  * could wake up a thread. The consensus object is therefore associated
96  * with a communication context, either a graphlab::dc_dist_object,
97  * or the global context (the root distributed_control).
98  *
99  * \param dc The distributed control object to use for communication
100  * \param required_threads_in_done local consensus is achieved if this many
101  * threads are waiting for consensus locally.
102  * \param attach The context to associate with. If NULL, we associate with
103  * the global context.
104  */
105  async_consensus(distributed_control &dc, size_t required_threads_in_done = 1,
106  const dc_impl::dc_dist_object_base* attach = NULL);
107 
108 
109  /**
110  * \brief A thread enters the critical section by calling
111  * this function.
112  *
113  * After which it should check its termination
114  * condition locally. If termination condition
115  * is still fulfilled, end_done_critical_section() should be called.
116  * Otherwise cancel_critical_section() should be called.
117  *
118  * \param cpuid Thread ID of the thread.
119  */
120  void begin_done_critical_section(size_t cpuid);
121 
122  /**
123  * \brief Leaves the critical section because termination condition
124  * is not fullfilled.
125  *
126  * See begin_done_critical_section()
127  * \param cpuid Thread ID of the thread.
128  */
129  void cancel_critical_section(size_t cpuid);
130 
131  /**
132  * \brief Thread must call this function if termination condition
133  * is fullfilled while in the critical section.
134  *
135  * See begin_done_critical_section()
136  *
137  * \param cpuid Thread ID of the thread.
138  * \returns True if global consensus is achieved. False otherwise.
139  */
140  bool end_done_critical_section(size_t cpuid);
141 
142  /**
143  * \brief Forces the consensus to be set
144  */
145  void force_done();
146 
147 
148  /// \brief Wakes up all local threads waiting in done()
149  void cancel();
150 
151  /// \brief Wakes up a specific thread waiting in done()
152  void cancel_one(size_t cpuid);
153 
154  /// \brief Returns true if consensus is achieved.
155  bool is_done() const {
156  return done;
157  }
158  /** \brief Resets the consensus object. Must be called simultaneously by
159  * exactly one thread on each machine.
160  * This function is not safe to call while consensus is being achieved.
161  */
162  void reset();
163 
164  private:
165 
166  /**
167  * The token object that is passed around the machines.
168  * It counts the total number of RPC calls that has been sent
169  * or received, as well as the machine which last changed the value.
170  * When the token goes one full round with no change, consensus is
171  * achieved.
172  */
173  struct token {
174  size_t total_calls_sent;
175  size_t total_calls_received;
176  procid_t last_change;
177  void save(oarchive &oarc) const {
178  oarc << total_calls_sent << total_calls_received << last_change;
179  }
180 
181  void load(iarchive &iarc) {
182  iarc >> total_calls_sent >> total_calls_received >> last_change;
183  }
184  };
185 
186 
187  dc_dist_object<async_consensus> rmi;
188  const dc_impl::dc_dist_object_base* attachedobj;
189 
190  size_t last_calls_sent;
191  size_t last_calls_received;
192 
193 
194 
195  /// counts the number of threads which are not sleeping
196  /// protected by the mutex
197  size_t numactive;
198 
199  /// Total number of CPUs
200  size_t ncpus;
201 
202  /// once flag is set, the terminator is invalid, and all threads
203  /// should leave
204  bool done;
205 
206  /// set if abort() is called
207  bool forced_abort;
208 
209  /// Number of threads which have called
210  /// begin_critical_section(), and have not left end_critical_section()
211  /// This is an atomic counter and is not protected.
212  atomic<size_t> trying_to_sleep;
213 
214  /// critical[i] is set if thread i has called
215  /// begin_critical_section(), but has not left end_critical_section()
216  /// sum of critical should be the same as trying_to_sleep
217  std::vector<char> critical;
218 
219  /// sleeping[i] is set if threads[i] is in cond.wait()
220  std::vector<char> sleeping;
221 
222 
223  bool hastoken;
224  /// If I have the token, the value of the token
225  token cur_token;
226 
227  mutex m;
228  std::vector<conditional> cond;
229 
230 
231  void receive_the_token(token &tok);
232  void pass_the_token();
233  };
234 
235 }
236 #endif
237 
238 
239