GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
queued_rwlock.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef QUEUED_RWLOCK_HPP
25 #define QUEUED_RWLOCK_HPP
26 
27 
28 namespace graphlab {
29 
30 
31 #define QUEUED_RW_LOCK_REQUEST_READ 0
32 #define QUEUED_RW_LOCK_REQUEST_WRITE 1
33 #define QUEUED_RW_LOCK_REQUEST_NONE 2
34 
35 /**
36  * Fair rw-lock with local-only spinning implemented and
37  * modified from
38  * Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors.
39  * John M. Mellor-Crummey and Michael L. Scott
40  */
42  public:
43 
44  union state_union {
45  volatile uint32_t stateu;
46  struct {
47  volatile uint16_t successor_class;
48  volatile bool blocked;
49  } state;
50  };
51 
52  struct request{
53  void* id;
54  volatile request* volatile next;
55  volatile state_union s;
56  volatile char lockclass;
57  };
58  private:
59  request* volatile tail;
60  atomic<size_t> reader_count;
61  request* volatile next_writer;
62  public:
63  queued_rw_lock(): tail(NULL), reader_count(0), next_writer(NULL) { }
64 
65  inline void writelock(request *I) {
66  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
67  I->next = NULL;
68  I->s.stateu = 0;
69  I->s.state.blocked = true;
70  I->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
71  __sync_synchronize();
72  request* predecessor = __sync_lock_test_and_set(&tail, I);
73 
74  if (predecessor == NULL) {
75  next_writer = I;
76  __sync_synchronize();
77  if (reader_count.value == 0) {
78  if (__sync_lock_test_and_set(&next_writer, (request*)NULL) == I) {
79  I->s.state.blocked = false;
80  }
81  }
82  }
83  else {
84  predecessor->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_WRITE;
85  __sync_synchronize();
86  predecessor->next = I;
87  }
88  // while I->blocked. continue
89  volatile state_union& is = I->s;
90  while (is.state.blocked) sched_yield();
91  assert(reader_count.value == 0);
92  }
93 
94  inline void wrunlock(request *I) {
95  __sync_synchronize();
96  if (I->next != NULL || !__sync_bool_compare_and_swap(&tail, I, (request*)NULL)) {
97  // wait
98  while(I->next == NULL) sched_yield();
99  __sync_synchronize();
100 
101  if (I->next->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
102  reader_count.inc();
103  }
104  I->next->s.state.blocked = false;
105  }
106  }
107 
108  inline void readlock(request *I) {
109  I->lockclass =QUEUED_RW_LOCK_REQUEST_READ;
110  I->next = NULL;
111  I->s.stateu = 0;
112  I->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
113  I->s.state.blocked = true;
114  __sync_synchronize();
115  request* predecessor = __sync_lock_test_and_set(&tail, I);
116  if (predecessor == NULL) {
117  reader_count.inc();
118  I->s.state.blocked = false;
119  }
120  else {
121 
122  state_union tempold, tempnew;
123  tempold.state.blocked = true;
124  tempold.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
125  tempnew.state.blocked = true;
126  tempnew.state.successor_class = QUEUED_RW_LOCK_REQUEST_READ;
127  __sync_synchronize();
128  if (predecessor->lockclass == QUEUED_RW_LOCK_REQUEST_WRITE ||
129  atomic_compare_and_swap(predecessor->s.stateu,
130  tempold.stateu,
131  tempnew.stateu)) {
132 
133  predecessor->next = I;
134  // wait
135  __sync_synchronize();
136  volatile state_union& is = I->s;
137  while(is.state.blocked) sched_yield();
138  }
139  else {
140  reader_count.inc();
141  predecessor->next = I;
142  __sync_synchronize();
143  I->s.state.blocked = false;
144  }
145  }
146  __sync_synchronize();
147  if (I->s.state.successor_class == QUEUED_RW_LOCK_REQUEST_READ) {
148 
149  // wait
150  while(I->next == NULL) sched_yield();
151  reader_count.inc();
152  I->next->s.state.blocked = false;
153  }
154  }
155 
156  inline void rdunlock(request *I) {
157  __sync_synchronize();
158  if (I->next != NULL || !__sync_bool_compare_and_swap(&tail, I, (request*)NULL)) {
159  while(I->next == NULL) sched_yield();
160  if (I->s.state.successor_class == QUEUED_RW_LOCK_REQUEST_WRITE) {
161  next_writer = (request*)(I->next);
162  __sync_synchronize();
163  }
164  }
165  if (reader_count.dec() == 0) {
166  __sync_synchronize();
167  request * w = __sync_lock_test_and_set(&next_writer, (request*)NULL);
168  if (w != NULL) {
169  w->s.state.blocked = false;
170  __sync_synchronize();
171  }
172  }
173  }
174 };
175 
176 }
177 #endif
178