GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
async_consensus.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <graphlab/rpc/async_consensus.hpp>
25 
26 namespace graphlab {
28  size_t required_threads_in_done,
29  const dc_impl::dc_dist_object_base *attach)
30  :rmi(dc, this), attachedobj(attach),
31  last_calls_sent(0), last_calls_received(0),
32  numactive(required_threads_in_done),
33  ncpus(required_threads_in_done),
34  done(false),
35  trying_to_sleep(0),
36  critical(ncpus, 0),
37  sleeping(ncpus, 0),
38  hastoken(dc.procid() == 0),
39  cond(ncpus){
40 
41  cur_token.total_calls_sent = 0;
42  cur_token.total_calls_received = 0;
43  cur_token.last_change = (procid_t)(rmi.numprocs() - 1);
44  }
45 
47  last_calls_sent = 0;
48  last_calls_received = 0;
49  numactive = ncpus;
50  done = false;
51  trying_to_sleep = false;
52  critical = std::vector<char>(ncpus, 0);
53  sleeping = std::vector<char>(ncpus, 0);
54  hastoken = (rmi.procid() == 0);
55  cur_token.total_calls_sent = 0;
56  cur_token.total_calls_received = 0;
57  cur_token.last_change = (procid_t)(rmi.numprocs() - 1);
58  }
59 
61  m.lock();
62  done = true;
63  m.unlock();
64  cancel();
65  }
66 
68  trying_to_sleep.inc();
69  critical[cpuid] = true;
70  m.lock();
71  }
72 
73 
75  m.unlock();
76  critical[cpuid] = false;
77  trying_to_sleep.dec();
78  }
79 
81  // if done flag is set, quit immediately
82  if (done) {
83  m.unlock();
84  critical[cpuid] = false;
85  trying_to_sleep.dec();
86  return true;
87  }
88  /*
89  Assertion: Since numactive is decremented only within
90  a critical section, and is incremented only within the same critical
91  section. Therefore numactive is a valid counter of the number of threads
92  outside of this critical section.
93  */
94  --numactive;
95 
96  /*
97  Assertion: If numactive is ever 0 at this point, the algorithm is done.
98  WLOG, let the current thread which just decremented numactive be thread 0
99 
100  Since there is only 1 active thread (0), there must be no threads
101  performing insertions, and are no othe threads which are waking up.
102  All threads must therefore be sleeping in cond.wait().
103  */
104  if (numactive == 0) {
105  logstream(LOG_INFO) << rmi.procid() << ": Termination Possible" << std::endl;
106  if (hastoken) pass_the_token();
107  }
108  sleeping[cpuid] = true;
109  while(1) {
110  // here we are protected by the mutex again.
111 
112  // woken up by someone else. leave the
113  // terminator
114  if (sleeping[cpuid] == false || done) {
115  break;
116  }
117  cond[cpuid].wait(m);
118  }
119  m.unlock();
120  critical[cpuid] = false;
121  trying_to_sleep.dec();
122  return done;
123  }
124 
126  /*
127  Assertion: numactive > 0 if there is work to do.
128  If there are threads trying to sleep, lets wake them up
129  */
130  if (trying_to_sleep > 0 || numactive < ncpus) {
131  m.lock();
132  size_t oldnumactive = numactive;
133  // once I acquire this lock, all threads must be
134  // in the following states
135  // 1: still running and has not reached begin_critical_section()
136  // 2: is sleeping in cond.wait()
137  // 3: has called begin_critical_section() but has not acquired
138  // the mutex
139  // In the case of 1,3: These threads will perform one more sweep
140  // of their task queues. Therefore they will see any new job if available
141  // in the case of 2: numactive must be < ncpus since numactive
142  // is mutex protected. Then I can wake them up by
143  // clearing their sleeping flags and broadcasting.
144  if (numactive < ncpus) {
145  // this is safe. Note that it is done from within
146  // the critical section.
147  for (size_t i = 0;i < ncpus; ++i) {
148  numactive += sleeping[i];
149  if (sleeping[i]) {
150  sleeping[i] = 0;
151  cond[i].signal();
152  }
153  }
154  if (oldnumactive == 0 && !done) {
155  logstream(LOG_INFO) << rmi.procid() << ": Waking" << std::endl;
156  }
157 
158  }
159  m.unlock();
160  }
161  }
162 
163  void async_consensus::cancel_one(size_t cpuhint) {
164  if (critical[cpuhint]) {
165  m.lock();
166  size_t oldnumactive = numactive;
167  // see new_job() for detailed comments
168  if (sleeping[cpuhint]) {
169  numactive += sleeping[cpuhint];
170  sleeping[cpuhint] = 0;
171  if (oldnumactive == 0 && !done) {
172  logstream(LOG_INFO) << rmi.procid() << ": Waking" << std::endl;
173  }
174  cond[cpuhint].signal();
175  }
176  m.unlock();
177  }
178  }
179 
180  void async_consensus::receive_the_token(token &tok) {
181  m.lock();
182  // save the token
183  hastoken = true;
184  cur_token = tok;
185  // if I am waiting on done, pass the token.
186  logstream(LOG_INFO) << rmi.procid() << ": Token Received" << std::endl;
187  if (numactive == 0) {
188  pass_the_token();
189  }
190  m.unlock();
191  }
192 
193  void async_consensus::pass_the_token() {
194  // note that this function does not acquire the token lock
195  // the caller must acquire it
196  assert(hastoken);
197  // first check if we are done
198  if (cur_token.last_change == rmi.procid() &&
199  cur_token.total_calls_received == cur_token.total_calls_sent) {
200  logstream(LOG_INFO) << "Completed Token: "
201  << cur_token.total_calls_received << " "
202  << cur_token.total_calls_sent << std::endl;
203  // we have completed a loop around!
204  // broadcast a completion
205  for (procid_t i = 0;i < rmi.numprocs(); ++i) {
206  if (i != rmi.procid()) {
207  rmi.control_call(i,
209  }
210  }
211  // set the complete flag
212  // we can't call consensus() since it will deadlock
213  done = true;
214  // this is the same code as cancel(), but we can't call cancel
215  // since we are holding on to a lock
216  if (numactive < ncpus) {
217  // this is safe. Note that it is done from within
218  // the critical section.
219  for (size_t i = 0;i < ncpus; ++i) {
220  numactive += sleeping[i];
221  if (sleeping[i]) {
222  sleeping[i] = 0;
223  cond[i].signal();
224  }
225  }
226  }
227 
228  }
229  else {
230  // update the token
231  size_t callsrecv;
232  size_t callssent;
233 
234  if (attachedobj) {
235  callsrecv = attachedobj->calls_received();
236  callssent = attachedobj->calls_sent();
237  }
238  else {
239  callsrecv = rmi.dc().calls_received();
240  callssent = rmi.dc().calls_sent();
241  }
242 
243  if (callssent != last_calls_sent ||
244  callsrecv != last_calls_received) {
245  cur_token.total_calls_sent += callssent - last_calls_sent;
246  cur_token.total_calls_received += callsrecv - last_calls_received;
247  cur_token.last_change = rmi.procid();
248  }
249  //std::cout << "Sending token: (" << cur_token.total_calls_sent
250  //<< ", " << cur_token.total_calls_received << ")" << std::endl;
251 
252  last_calls_sent = callssent;
253  last_calls_received = callsrecv;
254  // send it along.
255  hastoken = false;
256  logstream(LOG_INFO) << "Passing Token " << rmi.procid() << "-->"
257  << (rmi.procid() + 1) % rmi.numprocs() << ": "
258  << cur_token.total_calls_received << " "
259  << cur_token.total_calls_sent << std::endl;
260 
261  rmi.control_call((procid_t)((rmi.procid() + 1) % rmi.numprocs()),
262  &async_consensus::receive_the_token,
263  cur_token);
264  }
265  }
266 }
267 
268