GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
chandy_misra2.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_LOCAL_CHANDY_MISRA_HPP
25 #define GRAPHLAB_LOCAL_CHANDY_MISRA_HPP
26 #include <vector>
27 #include <graphlab/logger/assertions.hpp>
28 #include <graphlab/parallel/pthread_tools.hpp>
29 #include <graphlab/parallel/atomic.hpp>
30 #include <graphlab/macros_def.hpp>
31 namespace graphlab {
32 
33 template <typename GraphType>
34 class chandy_misra {
35  private:
36  GraphType &graph;
37  /*
38  * Each "fork" is one character.
39  * bit 0: owner. if 0 is src. if 1 is target
40  * bit 1: clean = 0, dirty = 1
41  * bit 2: owner 0 request
42  * bit 3: owner 1 request
43  */
44  std::vector<unsigned char> forkset;
45  enum { OWNER_BIT = 1,
46  DIRTY_BIT = 2,
47  REQUEST_0 = 4,
48  REQUEST_1 = 8 };
49  enum {OWNER_SOURCE = 0, OWNER_TARGET = 1};
50  inline unsigned char request_bit(bool owner) {
51  return owner ? REQUEST_1 : REQUEST_0;
52  }
53 
54  struct philosopher {
55  vertex_id_type num_edges;
56  atomic<vertex_id_type> forks_acquired;
57  simple_spinlock lock;
58  unsigned char state;
59  bool atomic_eat() {
60  if (num_edges == forks_acquired.value) {
61  return atomic_compare_and_swap(state,
62  (unsigned char)HUNGRY,
63  (unsigned char)EATING);
64  }
65  return false;
66  }
67  };
68  std::vector<philosopher> philosopherset;
69  /*
70  * Possible values for the philosopher state
71  */
72  enum {
73  THINKING = 0,
74  HUNGRY = 1,
75  EATING = 2
76  };
77 
78  /** Places a request for the fork. Requires fork to be locked */
79  inline void request_for_fork(size_t forkid, bool nextowner) {
80  forkset[forkid] |= request_bit(nextowner);
81  }
82 
83  inline bool fork_owner(size_t forkid) {
84  return forkset[forkid] & OWNER_BIT;
85  }
86 
87  inline bool fork_dirty(size_t forkid) {
88  return !!(forkset[forkid] & DIRTY_BIT);
89  }
90 
91  inline void dirty_fork(size_t forkid) {
92  forkset[forkid] |= DIRTY_BIT;
93  }
94 
95  /** changes the fork owner if it is dirty, and the other side
96  * has requested for it. Fork must be locked.
97  * Returns true if fork moved. false otherwise.
98  */
99  inline bool advance_fork_state_on_lock(size_t forkid,
100  vertex_id_type source,
101  vertex_id_type target) {
102 
103  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
104  // edge_ids for the request bits
105  unsigned char my_request_bit = request_bit(currentowner);
106  unsigned char other_request_bit = request_bit(!currentowner);
107 
108  bool current_owner_is_eating =
109  (currentowner == OWNER_SOURCE && philosopherset[source].state == EATING) ||
110  (currentowner == OWNER_TARGET && philosopherset[target].state == EATING);
111  bool current_owner_is_hungry =
112  (currentowner == OWNER_SOURCE && philosopherset[source].state == HUNGRY) ||
113  (currentowner == OWNER_TARGET && philosopherset[target].state == HUNGRY);
114 
115  // if the current owner is not eating, and the
116  // fork is dirty and other side has placed a request
117  if (current_owner_is_eating == false &&
118  (forkset[forkid] & DIRTY_BIT) &&
119  (forkset[forkid] & other_request_bit)) {
120  // change the owner and clean the fork)
121 
122  forkset[forkid] = (!currentowner);
123  if (current_owner_is_hungry) {
124  forkset[forkid] |= my_request_bit;
125  }
126  return true;
127  }
128  return false;
129  }
130 
131 
132  inline bool advance_fork_state_on_unlock(size_t forkid,
133  vertex_id_type source,
134  vertex_id_type target) {
135 
136  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
137  // edge_ids for the request bits
138  unsigned char my_request_bit = request_bit(currentowner);
139  unsigned char other_request_bit = request_bit(!currentowner);
140 
141  // if the current owner is not eating, and the
142  // fork is dirty and other side has placed a request
143  if ((forkset[forkid] & DIRTY_BIT) &&
144  (forkset[forkid] & other_request_bit)) {
145  // change the owner and clean the fork)
146  // keep my request bit if any
147  forkset[forkid] = (forkset[forkid] & my_request_bit) | (!currentowner);
148  return true;
149  }
150  return false;
151  }
152 
153  void compute_initial_fork_arrangement() {
154  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
155  philosopherset[i].num_edges = graph.num_in_edges(i) +
156  graph.num_out_edges(i);
157  philosopherset[i].state = THINKING;
158  foreach(typename GraphType::edge_type edge, graph.in_edges(i)) {
159  if (edge.source() > edge.target()) {
160  forkset[graph.edge_id(edge)] = DIRTY_BIT | 1;
161  }
162  else {
163  forkset[graph.edge_id(edge)] = DIRTY_BIT;
164  }
165  }
166  }
167  }
168 
169 
170  /**
171  * We already have v1, we want to acquire v2.
172  * When this function returns, both v1 and v2 locks are acquired
173  */
174  void try_acquire_edge_with_backoff(vertex_id_type v1,
175  vertex_id_type v2) {
176  if (v1 < v2) {
177  philosopherset[v2].lock.lock();
178  }
179  else if (!philosopherset[v2].lock.try_lock()) {
180  philosopherset[v1].lock.unlock();
181  philosopherset[v2].lock.lock();
182  philosopherset[v1].lock.lock();
183  }
184  }
185 
186 
187  public:
188  inline chandy_misra(GraphType &graph):graph(graph) {
189  forkset.resize(graph.num_edges(), 0);
190  philosopherset.resize(graph.num_vertices());
191  compute_initial_fork_arrangement();
192  }
193 
194  inline const vertex_id_type invalid_vid() const {
195  return (vertex_id_type)(-1);
196  }
197 
198  inline vertex_id_type make_philosopher_hungry(vertex_id_type p_id) {
199  vertex_id_type retval = vertex_id_type(-1);
200  philosopherset[p_id].lock.lock();
201  philosopherset[p_id].forks_acquired.value = 0;
202  //philosopher is now hungry!
203  ASSERT_EQ((int)philosopherset[p_id].state, (int)THINKING);
204  philosopherset[p_id].state = HUNGRY;
205 
206  // now try to get all the forks. lock one edge at a time
207  // using the backoff strategy
208  //std::cout << "vertex " << p_id << std::endl;
209  //std::cout << "in edges: " << std::endl;
210  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
211  try_acquire_edge_with_backoff(edge.target(), edge.source());
212  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
213  size_t edgeid = graph.edge_id(edge);
214  // if fork is owned by other edge, try to take it
215  if (fork_owner(edgeid) == OWNER_SOURCE) {
216  request_for_fork(edgeid, OWNER_TARGET);
217 
218  philosopherset[p_id].forks_acquired.inc(
219  advance_fork_state_on_lock(edgeid, edge.source(), edge.target()));
220  }
221  else {
222  philosopherset[p_id].forks_acquired.inc();
223  }
224  philosopherset[edge.source()].lock.unlock();
225  }
226  //std::cout << "out edges: " << std::endl;
227  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
228  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
229  try_acquire_edge_with_backoff(edge.source(), edge.target());
230  size_t edgeid = graph.edge_id(edge);
231 
232  // if fork is owned by other edge, try to take it
233  if (fork_owner(edgeid) == OWNER_TARGET) {
234  request_for_fork(edgeid, OWNER_SOURCE);
235  philosopherset[p_id].forks_acquired.inc(
236  advance_fork_state_on_lock(edgeid, edge.source(), edge.target()));
237  }
238  else {
239  philosopherset[p_id].forks_acquired.inc();
240  }
241  philosopherset[edge.target()].lock.unlock();
242  }
243 
244  // if I got all forks I can eat
245  if (philosopherset[p_id].atomic_eat()) {
246  // signal eating
247  retval = p_id;
248  }
249  philosopherset[p_id].lock.unlock();
250  return retval;
251  }
252 
253  inline std::vector<vertex_id_type> philosopher_stops_eating(size_t p_id) {
254  std::vector<vertex_id_type> retval;
255  philosopherset[p_id].lock.lock();
256  //philosopher is now hungry!
257  ASSERT_EQ((int)philosopherset[p_id].state, (int)EATING);
258  // now forks are dirty
259  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
260  //try_acquire_edge_with_backoff(edge.target(), edge.source());
261  size_t edgeid = graph.edge_id(edge);
262  vertex_id_type other = edge.source();
263  dirty_fork(edgeid);
264  philosopherset[other].forks_acquired.inc(
265  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target()));
266  if (philosopherset[other].atomic_eat()) {
267  // signal eating on other
268  retval.push_back(other);
269  }
270  //philosopherset[edge.source()].lock.unlock();
271  }
272 
273  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
274  //try_acquire_edge_with_backoff(edge.source(), edge.target());
275  size_t edgeid = graph.edge_id(edge);
276  vertex_id_type other = edge.target();
277  dirty_fork(edgeid);
278  philosopherset[other].forks_acquired.inc(
279  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target()));
280  if (philosopherset[other].atomic_eat()) {
281  // signal eating on other
282  retval.push_back(other);
283  }
284  //philosopherset[other].lock.unlock();
285  }
286  philosopherset[p_id].state = THINKING;
287 
288  philosopherset[p_id].lock.unlock();
289  return retval;
290  }
291 
292  void no_locks_consistency_check() {
293  // make sure all forks are dirty
294  for (size_t i = 0;i < forkset.size(); ++i) ASSERT_TRUE(fork_dirty(i));
295  // all philosophers are THINKING
296  for (size_t i = 0;i < philosopherset.size(); ++i) ASSERT_TRUE(philosopherset[i].state == THINKING);
297  }
298 
299  void complete_consistency_check() {
300  for (vertex_id_type v = 0; v < graph.num_vertices(); ++v) {
301  // count the number of forks I own
302  size_t numowned = 0;
303  size_t numowned_clean = 0;
304  foreach(typename GraphType::edge_type edge, graph.in_edges(v)) {
305  size_t edgeid = graph.edge_id(edge);
306  if (fork_owner(edgeid) == OWNER_TARGET) {
307  numowned++;
308  if (!fork_dirty(edgeid)) numowned_clean++;
309  }
310  }
311  foreach(typename GraphType::edge_type edge, graph.out_edges(v)) {
312  size_t edgeid = graph.edge_id(edge);
313  if (fork_owner(edgeid) == OWNER_SOURCE) {
314  numowned++;
315  if (!fork_dirty(edgeid)) numowned_clean++;
316  }
317  }
318 
319  if (philosopherset[v].state == THINKING) {
320  ASSERT_EQ(numowned_clean, 0);
321  }
322  else if (philosopherset[v].state == HUNGRY) {
323  ASSERT_EQ(philosopherset[v].forks_acquired.value, numowned);
324  }
325  else if (philosopherset[v].state == EATING) {
326  ASSERT_EQ(philosopherset[v].forks_acquired.value, philosopherset[v].num_edges);
327  ASSERT_EQ(philosopherset[v].forks_acquired.value, numowned);
328  }
329  }
330  }
331 };
332 
333 }
334 
335 #include <graphlab/macros_undef.hpp>
336 
337 #endif