GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
chandy_misra_lockfree.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_LOCAL_CHANDY_MISRA_LOCKFREE_HPP
25 #define GRAPHLAB_LOCAL_CHANDY_MISRA_LOCKFREE_HPP
26 #include <vector>
27 #include <graphlab/logger/assertions.hpp>
28 #include <graphlab/parallel/pthread_tools.hpp>
29 #include <graphlab/parallel/atomic.hpp>
30 #include <graphlab/macros_def.hpp>
31 namespace graphlab {
32 
33 template <typename GraphType>
34 class chandy_misra_lockfree {
35  private:
36  GraphType &graph;
37  /*
38  * Each "fork" is one character.
39  * bit 0: owner. if 0 is src. if 1 is target
40  * bit 1: clean = 0, dirty = 1
41  * bit 2: owner 0 request
42  * bit 3: owner 1 request
43  */
44  std::vector<unsigned char> forkset;
45  enum { OWNER_BIT = 1,
46  DIRTY_BIT = 2,
47  REQUEST_0 = 4,
48  REQUEST_1 = 8 };
49  enum {OWNER_SOURCE = 0, OWNER_TARGET = 1};
50  static inline unsigned char request_bit(bool owner) {
51  return owner ? REQUEST_1 : REQUEST_0;
52  }
53 
54  struct philosopher {
55  vertex_id_type num_edges;
56  atomic<vertex_id_type> forks_acquired;
57  simple_spinlock lock;
58  unsigned char state;
59  bool atomic_eat() {
60  if (num_edges == forks_acquired.value) {
61  return atomic_compare_and_swap(state, (unsigned char)HUNGRY, (unsigned char)EATING);
62  }
63  return false;
64  }
65  };
66  std::vector<philosopher> philosopherset;
67  /*
68  * Possible values for the philosopher state
69  */
70  enum {
71  THINKING = 0,
72  HUNGRY = 1,
73  EATING = 2
74  };
75 
76  /** Places a request for the fork. Requires fork to be locked */
77  inline void request_for_fork(size_t forkid, bool nextowner) {
78  __sync_fetch_and_or(&forkset[forkid], request_bit(nextowner));
79  }
80 
81  inline bool fork_owner(size_t forkid) {
82  return forkset[forkid] & OWNER_BIT;
83  }
84 
85  inline bool fork_dirty(size_t forkid) {
86  return !!(forkset[forkid] & DIRTY_BIT);
87  }
88 
89  inline void dirty_fork(size_t forkid) {
90  __sync_fetch_and_or(&forkset[forkid], (unsigned char)DIRTY_BIT);
91  }
92 
93  /** changes the fork owner if it is dirty, and the other side
94  * has requested for it. Fork must be locked.
95  * Returns true if fork moved. false otherwise.
96  */
97  inline bool advance_fork_state_on_lock(size_t forkid,
98  vertex_id_type source,
99  vertex_id_type target) {
100  while(1) {
101  unsigned char forkval = forkset[forkid];
102  unsigned char currentowner = forkval & OWNER_BIT;
103  // edge_ids for the request bits
104  unsigned char my_request_bit = request_bit(currentowner);
105  unsigned char other_request_bit = request_bit(!currentowner);
106 
107  bool current_owner_is_eating =
108  (currentowner == OWNER_SOURCE && philosopherset[source].state == EATING) ||
109  (currentowner == OWNER_TARGET && philosopherset[target].state == EATING);
110  bool current_owner_is_hungry =
111  (currentowner == OWNER_SOURCE && philosopherset[source].state == HUNGRY) ||
112  (currentowner == OWNER_TARGET && philosopherset[target].state == HUNGRY);
113 
114  // if the current owner is not eating, and the
115  // fork is dirty and other side has placed a request
116  if (current_owner_is_eating == false &&
117  (forkval & DIRTY_BIT) &&
118  (forkval & other_request_bit)) {
119  // change the owner and clean the fork)
120  unsigned char newforkval = (!currentowner);
121  if (current_owner_is_hungry) {
122  newforkval |= my_request_bit;
123  }
124 
125  if (atomic_compare_and_swap(forkset[forkid], forkval, newforkval)) {
126  return true;
127  }
128  }
129  else {
130  return false;
131  }
132  }
133  }
134 
135 
136  inline bool advance_fork_state_on_unlock(size_t forkid,
137  vertex_id_type source,
138  vertex_id_type target) {
139 
140  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
141  // edge_ids for the request bits
142  unsigned char my_request_bit = request_bit(currentowner);
143  unsigned char other_request_bit = request_bit(!currentowner);
144 
145  // if the current owner is not eating, and the
146  // fork is dirty and other side has placed a request
147  if ((forkset[forkid] & DIRTY_BIT) &&
148  (forkset[forkid] & other_request_bit)) {
149  // change the owner and clean the fork)
150  // keep my request bit if any
151  forkset[forkid] = (forkset[forkid] & my_request_bit) | (!currentowner);
152  return true;
153  }
154  return false;
155  }
156 
157  void compute_initial_fork_arrangement() {
158  for (vertex_id_type i = 0;i < graph.num_vertices(); ++i) {
159  philosopherset[i].num_edges = graph.num_in_edges(i) +
160  graph.num_out_edges(i);
161  philosopherset[i].state = THINKING;
162  foreach(typename GraphType::edge_type edge, graph.in_edges(i)) {
163  if (edge.source() > edge.target()) {
164  forkset[graph.edge_id(edge)] = DIRTY_BIT | 1;
165  }
166  else {
167  forkset[graph.edge_id(edge)] = DIRTY_BIT;
168  }
169  }
170  }
171  }
172 
173 
174  /**
175  * We already have v1, we want to acquire v2.
176  * When this function returns, both v1 and v2 locks are acquired
177  */
178  void try_acquire_edge_with_backoff(vertex_id_type v1,
179  vertex_id_type v2) {
180  if (v1 < v2) {
181  philosopherset[v2].lock.lock();
182  }
183  else if (!philosopherset[v2].lock.try_lock()) {
184  philosopherset[v1].lock.unlock();
185  philosopherset[v2].lock.lock();
186  philosopherset[v1].lock.lock();
187  }
188  }
189 
190 
191  public:
192  inline chandy_misra_lockfree(GraphType &graph):graph(graph) {
193  forkset.resize(graph.num_edges(), 0);
194  philosopherset.resize(graph.num_vertices());
195  compute_initial_fork_arrangement();
196  }
197 
198  inline const vertex_id_type invalid_vid() const {
199  return (vertex_id_type)(-1);
200  }
201 
202  inline vertex_id_type make_philosopher_hungry(vertex_id_type p_id) {
203  vertex_id_type retval = vertex_id_type(-1);
204  //philosopherset[p_id].lock.lock();
205  philosopherset[p_id].forks_acquired.value = 0;
206  //philosopher is now hungry!
207  ASSERT_EQ((int)philosopherset[p_id].state, (int)THINKING);
208  philosopherset[p_id].state = HUNGRY;
209 
210  // now try to get all the forks. lock one edge at a time
211  // using the backoff strategy
212  //std::cout << "vertex " << p_id << std::endl;
213  //std::cout << "in edges: " << std::endl;
214  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
215  //try_acquire_edge_with_backoff(edge.target(), edge.source());
216  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
217  size_t edgeid = graph.edge_id(edge);
218  // if fork is owned by other edge, try to take it
219  if (fork_owner(edgeid) == OWNER_SOURCE) {
220  request_for_fork(edgeid, OWNER_TARGET);
221 
222  philosopherset[p_id].forks_acquired.inc(
223  advance_fork_state_on_lock(edgeid, edge.source(), edge.target()));
224  }
225  else {
226  philosopherset[p_id].forks_acquired.inc();
227  }
228  //philosopherset[edge.source()].lock.unlock();
229  }
230  //std::cout << "out edges: " << std::endl;
231  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
232  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
233  //try_acquire_edge_with_backoff(edge.source(), edge.target());
234  size_t edgeid = graph.edge_id(edge);
235 
236  // if fork is owned by other edge, try to take it
237  if (fork_owner(edgeid) == OWNER_TARGET) {
238  request_for_fork(edgeid, OWNER_SOURCE);
239  philosopherset[p_id].forks_acquired.inc(
240  advance_fork_state_on_lock(edgeid, edge.source(), edge.target()));
241  }
242  else {
243  philosopherset[p_id].forks_acquired.inc();
244  }
245  //philosopherset[edge.target()].lock.unlock();
246  }
247 
248  // if I got all forks I can eat
249  if (philosopherset[p_id].atomic_eat()) {
250  // signal eating
251  retval = p_id;
252  }
253  //philosopherset[p_id].lock.unlock();
254  return retval;
255  }
256 
257  inline std::vector<vertex_id_type> philosopher_stops_eating(size_t p_id) {
258  std::vector<vertex_id_type> retval;
259  //philosopherset[p_id].lock.lock();
260  //philosopher is now hungry!
261  ASSERT_EQ((int)philosopherset[p_id].state, (int)EATING);
262  // now forks are dirty
263  foreach(typename GraphType::edge_type edge, graph.in_edges(p_id)) {
264  //try_acquire_edge_with_backoff(edge.target(), edge.source());
265  size_t edgeid = graph.edge_id(edge);
266  vertex_id_type other = edge.source();
267  dirty_fork(edgeid);
268  philosopherset[other].forks_acquired.inc(
269  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target()));
270  if (philosopherset[other].atomic_eat()) {
271  // signal eating on other
272  retval.push_back(other);
273  }
274  //philosopherset[edge.source()].lock.unlock();
275  }
276 
277  foreach(typename GraphType::edge_type edge, graph.out_edges(p_id)) {
278  //try_acquire_edge_with_backoff(edge.source(), edge.target());
279  size_t edgeid = graph.edge_id(edge);
280  vertex_id_type other = edge.target();
281  dirty_fork(edgeid);
282  philosopherset[other].forks_acquired.inc(
283  advance_fork_state_on_unlock(edgeid, edge.source(), edge.target()));
284  if (philosopherset[other].atomic_eat()) {
285  // signal eating on other
286  retval.push_back(other);
287  }
288  //philosopherset[other].lock.unlock();
289  }
290  philosopherset[p_id].state = THINKING;
291 
292  //philosopherset[p_id].lock.unlock();
293  return retval;
294  }
295 
296  void no_locks_consistency_check() {
297  // make sure all forks are dirty
298  for (size_t i = 0;i < forkset.size(); ++i) ASSERT_TRUE(fork_dirty(i));
299  // all philosophers are THINKING
300  for (size_t i = 0;i < philosopherset.size(); ++i) ASSERT_TRUE(philosopherset[i].state == THINKING);
301  }
302 
303  void complete_consistency_check() {
304  for (vertex_id_type v = 0; v < graph.num_vertices(); ++v) {
305  // count the number of forks I own
306  size_t numowned = 0;
307  size_t numowned_clean = 0;
308  foreach(typename GraphType::edge_type edge, graph.in_edges(v)) {
309  size_t edgeid = graph.edge_id(edge);
310  if (fork_owner(edgeid) == OWNER_TARGET) {
311  numowned++;
312  if (!fork_dirty(edgeid)) numowned_clean++;
313  }
314  }
315  foreach(typename GraphType::edge_type edge, graph.out_edges(v)) {
316  size_t edgeid = graph.edge_id(edge);
317  if (fork_owner(edgeid) == OWNER_SOURCE) {
318  numowned++;
319  if (!fork_dirty(edgeid)) numowned_clean++;
320  }
321  }
322 
323  if (philosopherset[v].state == THINKING) {
324  ASSERT_EQ(numowned_clean, 0);
325  }
326  else if (philosopherset[v].state == HUNGRY) {
327  ASSERT_EQ(philosopherset[v].forks_acquired.value, numowned);
328  }
329  else if (philosopherset[v].state == EATING) {
330  ASSERT_EQ(philosopherset[v].forks_acquired.value, philosopherset[v].num_edges);
331  ASSERT_EQ(philosopherset[v].forks_acquired.value, numowned);
332  }
333  }
334  }
335 };
336 
337 }
338 
339 #include <graphlab/macros_undef.hpp>
340 
341 #endif