GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
chandy_misra.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_LOCAL_CHANDY_MISRA_HPP
25 #define GRAPHLAB_LOCAL_CHANDY_MISRA_HPP
26 #include <vector>
27 #include <graphlab/logger/assertions.hpp>
28 #include <graphlab/parallel/pthread_tools.hpp>
29 #include <graphlab/macros_def.hpp>
30 namespace graphlab {
31 
32 template <typename GraphType>
33 class chandy_misra {
34  public:
35  GraphType &graph;
36  /*
37  * Each "fork" is one character.
38  * bit 0: owner. if 0 is src. if 1 is target
39  * bit 1: clean = 0, dirty = 1
40  * bit 2: owner 0 request
41  * bit 3: owner 1 request
42  */
43  std::vector<unsigned char> forkset;
44  enum { OWNER_BIT = 1,
45  DIRTY_BIT = 2,
46  REQUEST_0 = 4,
47  REQUEST_1 = 8 };
48  enum {OWNER_SOURCE = 0, OWNER_TARGET = 1};
49  inline unsigned char request_bit(bool owner) {
50  return owner ? REQUEST_1 : REQUEST_0;
51  }
52 
53  struct philosopher {
54  vertex_id_type num_edges;
55  vertex_id_type forks_acquired;
56  simple_spinlock lock;
57  unsigned char state;
58  };
59  std::vector<philosopher> philosopherset;
60  /*
61  * Possible values for the philosopher state
62  */
63  enum {
64  THINKING = 0,
65  HUNGRY = 1,
66  EATING = 2
67  };
68 
69  /** Places a request for the fork. Requires fork to be locked */
70  inline void request_for_fork(size_t forkid, bool nextowner) {
71  forkset[forkid] |= request_bit(nextowner);
72  }
73 
74  inline bool fork_owner(size_t forkid) {
75  return forkset[forkid] & OWNER_BIT;
76  }
77 
78  inline bool fork_dirty(size_t forkid) {
79  return !!(forkset[forkid] & DIRTY_BIT);
80  }
81 
82  inline void dirty_fork(size_t forkid) {
83  forkset[forkid] |= DIRTY_BIT;
84  }
85 
86  /** changes the fork owner if it is dirty, and the other side
87  * has requested for it. Fork must be locked.
88  * Returns true if fork moved. false otherwise.
89  */
90  inline void advance_fork_state_on_lock(size_t forkid,
91  vertex_id_type source,
92  vertex_id_type target) {
93 
94  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
95  if (currentowner == OWNER_SOURCE) {
96  // if the current owner is not eating, and the
97  // fork is dirty and other side has placed a request
98  if (philosopherset[source].state != EATING &&
99  (forkset[forkid] & DIRTY_BIT) &&
100  (forkset[forkid] & REQUEST_1)) {
101  // change the owner and clean the fork)
102 
103  forkset[forkid] = OWNER_TARGET;
104  if (philosopherset[source].state == HUNGRY) {
105  forkset[forkid] |= REQUEST_0;
106  }
107  philosopherset[source].forks_acquired--;
108  philosopherset[target].forks_acquired++;
109  }
110  }
111  else {
112  // if the current owner is not eating, and the
113  // fork is dirty and other side has placed a request
114  if (philosopherset[target].state != EATING &&
115  (forkset[forkid] & DIRTY_BIT) &&
116  (forkset[forkid] & REQUEST_0)) {
117  // change the owner and clean the fork)
118 
119  forkset[forkid] = OWNER_SOURCE;
120  if (philosopherset[target].state == HUNGRY) {
121  forkset[forkid] |= REQUEST_1;
122  }
123  philosopherset[source].forks_acquired++;
124  philosopherset[target].forks_acquired--;
125  }
126  }
127  }
128 
129 
130  inline bool advance_fork_state_on_unlock(size_t forkid,
131  vertex_id_type source,
132  vertex_id_type target) {
133 
134  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
135  if (currentowner == OWNER_SOURCE) {
136  // if the current owner is not eating, and the
137  // fork is dirty and other side has placed a request
138  if ((forkset[forkid] & DIRTY_BIT) &&
139  (forkset[forkid] & REQUEST_1)) {
140  // change the owner and clean the fork)
141  // keep my request bit if any
142  forkset[forkid] = OWNER_TARGET;
143  philosopherset[source].forks_acquired--;
144  philosopherset[target].forks_acquired++;
145  return true;
146  }
147  }
148  else {
149  // if the current owner is not eating, and the
150  // fork is dirty and other side has placed a request
151  if ((forkset[forkid] & DIRTY_BIT) &&
152  (forkset[forkid] & REQUEST_0)) {
153  // change the owner and clean the fork)
154  // keep my request bit if any
155  forkset[forkid] = OWNER_SOURCE;
156  philosopherset[source].forks_acquired++;
157  philosopherset[target].forks_acquired--;
158  return true;
159  }
160  }
161  return false;
162  }
163 
164  void compute_initial_fork_arrangement() {
165  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
166  philosopherset[i].num_edges = graph.num_in_edges(i) +
167  graph.num_out_edges(i);
168  philosopherset[i].state = THINKING;
169  philosopherset[i].forks_acquired = 0;
170  }
171  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
172  foreach(typename GraphType::edge_type edge, graph.in_edges(i)) {
173  if (edge.source() > edge.target()) {
174  forkset[graph.edge_id(edge)] = DIRTY_BIT | OWNER_TARGET;
175  philosopherset[edge.target()].forks_acquired++;
176  }
177  else {
178  forkset[graph.edge_id(edge)] = DIRTY_BIT | OWNER_SOURCE;
179  philosopherset[edge.source()].forks_acquired++;
180  }
181  }
182  }
183  }
184 
185  void compute_initial_fork_arrangement(const std::vector<vertex_id_type> &altvids) {
186  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
187  philosopherset[i].num_edges = graph.num_in_edges(i) +
188  graph.num_out_edges(i);
189  philosopherset[i].state = THINKING;
190  philosopherset[i].forks_acquired = 0;
191  }
192  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
193  foreach(typename GraphType::edge_type edge, graph.in_edges(i)) {
194  if (altvids[edge.source()] > altvids[edge.target()]) {
195  forkset[graph.edge_id(edge)] = DIRTY_BIT | OWNER_TARGET;
196  philosopherset[edge.target()].forks_acquired++;
197  }
198  else {
199  forkset[graph.edge_id(edge)] = DIRTY_BIT | OWNER_SOURCE;
200  philosopherset[edge.source()].forks_acquired++;
201  }
202  }
203  }
204  }
205 
206 
207  /**
208  * We already have v1, we want to acquire v2.
209  * When this function returns, both v1 and v2 locks are acquired
210  */
211  void try_acquire_edge_with_backoff(vertex_id_type v1,
212  vertex_id_type v2) {
213  if (v1 < v2) {
214  philosopherset[v2].lock.lock();
215  }
216  else if (!philosopherset[v2].lock.try_lock()) {
217  philosopherset[v1].lock.unlock();
218  philosopherset[v2].lock.lock();
219  philosopherset[v1].lock.lock();
220  }
221  }
222 
223 
224  public:
225  inline chandy_misra(GraphType &graph):graph(graph) {
226  forkset.resize(graph.num_edges(), 0);
227  philosopherset.resize(graph.num_vertices());
228  compute_initial_fork_arrangement();
229  }
230 
231  inline chandy_misra(GraphType &graph,
232  const std::vector<vertex_id_type> &altvids):graph(graph) {
233  forkset.resize(graph.num_edges(), 0);
234  philosopherset.resize(graph.num_vertices());
235  compute_initial_fork_arrangement(altvids);
236  }
237 
238  inline const vertex_id_type invalid_vid() const {
239  return (vertex_id_type)(-1);
240  }
241 
242  inline vertex_id_type make_philosopher_hungry(vertex_id_type p_id) {
243  vertex_id_type retval = vertex_id_type(-1);
244  philosopherset[p_id].lock.lock();
245  //philosopher is now hungry!
246  ASSERT_EQ((int)philosopherset[p_id].state, (int)THINKING);
247  philosopherset[p_id].state = HUNGRY;
248 
249  // now try to get all the forks. lock one edge at a time
250  // using the backoff strategy
251  //std::cout << "vertex " << p_id << std::endl;
252  //std::cout << "in edges: " << std::endl;
253  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
254  try_acquire_edge_with_backoff(edge.target(), edge.source());
255  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
256  size_t edgeid = graph.edge_id(edge);
257  // if fork is owned by other edge, try to take it
258  if (fork_owner(edgeid) == OWNER_SOURCE) {
259  request_for_fork(edgeid, OWNER_TARGET);
260  advance_fork_state_on_lock(edgeid, edge.source(), edge.target());
261  }
262  philosopherset[edge.source()].lock.unlock();
263  }
264  //std::cout << "out edges: " << std::endl;
265  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
266  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
267  try_acquire_edge_with_backoff(edge.source(), edge.target());
268  size_t edgeid = graph.edge_id(edge);
269 
270  // if fork is owned by other edge, try to take it
271  if (fork_owner(edgeid) == OWNER_TARGET) {
272  request_for_fork(edgeid, OWNER_SOURCE);
273  advance_fork_state_on_lock(edgeid, edge.source(), edge.target());
274  }
275  philosopherset[edge.target()].lock.unlock();
276  }
277 
278  // if I got all forks I can eat
279  if (philosopherset[p_id].forks_acquired ==
280  philosopherset[p_id].num_edges) {
281  philosopherset[p_id].state = EATING;
282  // signal eating
283  retval = p_id;
284  }
285  philosopherset[p_id].lock.unlock();
286  return retval;
287  }
288 
289  inline std::vector<vertex_id_type> philosopher_stops_eating(size_t p_id) {
290  std::vector<vertex_id_type> retval;
291  philosopherset[p_id].lock.lock();
292  //philosopher is now hungry!
293  ASSERT_EQ((int)philosopherset[p_id].state, (int)EATING);
294  philosopherset[p_id].state = THINKING;
295 
296  // now forks are dirty
297  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
298  try_acquire_edge_with_backoff(edge.target(), edge.source());
299  size_t edgeid = graph.edge_id(edge);
300  vertex_id_type other = edge.source();
301  dirty_fork(edgeid);
302  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target());
303  if (philosopherset[other].state == HUNGRY &&
304  philosopherset[other].forks_acquired ==
305  philosopherset[other].num_edges) {
306  philosopherset[other].state = EATING;
307  // signal eating on other
308  retval.push_back(other);
309  }
310  philosopherset[other].lock.unlock();
311  }
312 
313  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
314  try_acquire_edge_with_backoff(edge.source(), edge.target());
315  size_t edgeid = graph.edge_id(edge);
316  vertex_id_type other = edge.target();
317  dirty_fork(edgeid);
318  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target());
319  if (philosopherset[other].state == HUNGRY &&
320  philosopherset[other].forks_acquired ==
321  philosopherset[other].num_edges) {
322  philosopherset[other].state = EATING;
323  // signal eating on other
324  retval.push_back(other);
325  }
326  philosopherset[other].lock.unlock();
327  }
328 
329  philosopherset[p_id].lock.unlock();
330  return retval;
331  }
332 
333  inline std::vector<vertex_id_type> cancel_eating_philosopher(vertex_id_type p_id) {
334  std::vector<vertex_id_type> retval;
335  philosopherset[p_id].lock.lock();
336  //philosopher is now hungry!
337  if(philosopherset[p_id].state != EATING) {
338  philosopherset[p_id].lock.unlock();
339  return retval;
340  }
341  philosopherset[p_id].state = HUNGRY;
342 
343  // now forks are dirty
344  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
345  try_acquire_edge_with_backoff(edge.target(), edge.source());
346  size_t edgeid = graph.edge_id(edge);
347  vertex_id_type other = edge.source();
348  if (fork_dirty(edgeid)) {
349  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target());
350  if (philosopherset[other].state == HUNGRY &&
351  philosopherset[other].forks_acquired ==
352  philosopherset[other].num_edges) {
353  philosopherset[other].state = EATING;
354  // signal eating on other
355  retval.push_back(other);
356  }
357  }
358  philosopherset[other].lock.unlock();
359  }
360 
361  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
362  try_acquire_edge_with_backoff(edge.source(), edge.target());
363  size_t edgeid = graph.edge_id(edge);
364  vertex_id_type other = edge.target();
365  if (fork_dirty(edgeid)) {
366  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target());
367  if (philosopherset[other].state == HUNGRY &&
368  philosopherset[other].forks_acquired ==
369  philosopherset[other].num_edges) {
370  philosopherset[other].state = EATING;
371  // signal eating on other
372  retval.push_back(other);
373  }
374  }
375  philosopherset[other].lock.unlock();
376  }
377  // if I got all forks I can eat
378  if (philosopherset[p_id].forks_acquired ==
379  philosopherset[p_id].num_edges) {
380  philosopherset[p_id].state = EATING;
381  // signal eating
382  retval.push_back(p_id);
383  }
384  philosopherset[p_id].lock.unlock();
385  return retval;
386  }
387 
388 
389  void no_locks_consistency_check() {
390  // make sure all forks are dirty
391  for (size_t i = 0;i < forkset.size(); ++i) ASSERT_TRUE(fork_dirty(i));
392  // all philosophers are THINKING
393  for (size_t i = 0;i < philosopherset.size(); ++i) ASSERT_TRUE(philosopherset[i].state == THINKING);
394  }
395 
396  void complete_consistency_check() {
397  for (vertex_id_type v = 0; v < graph.num_vertices(); ++v) {
398  // count the number of forks I own
399  size_t numowned = 0;
400  size_t numowned_clean = 0;
401  foreach(typename GraphType::edge_type edge, graph.in_edges(v)) {
402  size_t edgeid = graph.edge_id(edge);
403  if (fork_owner(edgeid) == OWNER_TARGET) {
404  numowned++;
405  if (!fork_dirty(edgeid)) numowned_clean++;
406  }
407  }
408  foreach(typename GraphType::edge_type edge, graph.out_edges(v)) {
409  size_t edgeid = graph.edge_id(edge);
410  if (fork_owner(edgeid) == OWNER_SOURCE) {
411  numowned++;
412  if (!fork_dirty(edgeid)) numowned_clean++;
413  }
414  }
415 
416  ASSERT_EQ(philosopherset[v].forks_acquired, numowned);
417  if (philosopherset[v].state == THINKING) {
418  ASSERT_EQ(numowned_clean, 0);
419  }
420  else if (philosopherset[v].state == HUNGRY) {
421  ASSERT_NE(philosopherset[v].num_edges, philosopherset[v].forks_acquired);
422  // any fork I am unable to acquire. Must be clean, and the other person
423  // must be eating or hungry
424  foreach(typename GraphType::edge_type edge, graph.in_edges(v)) {
425  size_t edgeid = graph.edge_id(edge);
426  // not owned
427  if (fork_owner(edgeid) == OWNER_SOURCE) {
428  if (philosopherset[edge.source()].state != EATING) {
429  if (fork_dirty(edgeid)) {
430  std::cout << (int)(forkset[edgeid]) << " "
431  << (int)philosopherset[edge.source()].state
432  << "->" << (int)philosopherset[edge.target()].state
433  << std::endl;
434  ASSERT_FALSE(fork_dirty(edgeid));
435  }
436  }
437  ASSERT_NE(philosopherset[edge.source()].state, (int)THINKING);
438  }
439  }
440  foreach(typename GraphType::edge_type edge, graph.out_edges(v)) {
441  size_t edgeid = graph.edge_id(edge);
442  if (fork_owner(edgeid) == OWNER_TARGET) {
443  if (philosopherset[edge.target()].state != EATING) {
444  if (fork_dirty(edgeid)) {
445  std::cout << (int)(forkset[edgeid]) << " "
446  << (int)philosopherset[edge.source()].state
447  << "->"
448  << (int)philosopherset[edge.target()].state
449  << std::endl;
450  ASSERT_FALSE(fork_dirty(edgeid));
451  }
452  }
453  ASSERT_NE(philosopherset[edge.target()].state, (int)THINKING);
454  }
455  }
456 
457  }
458  else if (philosopherset[v].state == EATING) {
459  ASSERT_EQ(philosopherset[v].forks_acquired, philosopherset[v].num_edges);
460  }
461  }
462  }
463 };
464 
465 }
466 
467 #include <graphlab/macros_undef.hpp>
468 
469 #endif