GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
deferred_rwlock.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef DEFERRED_RWLOCK_HPP
25 #define DEFERRED_RWLOCK_HPP
26 #include <graphlab/parallel/pthread_tools.hpp>
27 #include <graphlab/parallel/queued_rwlock.hpp>
28 #include <graphlab/logger/assertions.hpp>
29 namespace graphlab {
30 class deferred_rwlock{
31  public:
32 
33  struct request{
34  char lockclass : 2;
35  __attribute__((may_alias)) uint64_t id : 62;
36  request* next;
37  };
38  private:
39  request* head;
40  request* tail;
41  uint16_t reader_count;
42  bool writer;
43  simple_spinlock lock;
44  public:
45 
46  deferred_rwlock(): head(NULL),
47  tail(NULL), reader_count(0),writer(false) { }
48 
49  // debugging purposes only
50  inline size_t get_reader_count() {
51  __sync_synchronize();
52  return reader_count;
53  }
54 
55  // debugging purposes only
56  inline bool has_waiters() {
57  return head != NULL || tail != NULL;
58  }
59 
60  inline void insert_queue(request *I) {
61  if (head == NULL) {
62  head = I;
63  tail = I;
64  }
65  else {
66  tail->next = I;
67  tail = I;
68  }
69  }
70  inline void insert_queue_head(request *I) {
71  if (head == NULL) {
72  head = I;
73  tail = I;
74  }
75  else {
76  I->next = head;
77  head = I;
78  }
79  }
80 
81  inline bool writelock_priority(request *I) {
82  I->next = NULL;
83  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
84  lock.lock();
85  if (reader_count == 0 && writer == false) {
86  // fastpath
87  writer = true;
88  lock.unlock();
89  return true;
90  }
91  else {
92  insert_queue_head(I);
93  lock.unlock();
94  return false;
95  }
96  }
97 
98  inline bool writelock(request *I) {
99  I->next = NULL;
100  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
101  lock.lock();
102  if (reader_count == 0 && writer == false) {
103  // fastpath
104  writer = true;
105  lock.unlock();
106  return true;
107  }
108  else {
109  insert_queue(I);
110  lock.unlock();
111  return false;
112  }
113  }
114 
115  // completes the write lock on the head. lock must be acquired
116  // head must be a write lock
117  inline void complete_wrlock() {
118  // ASSERT_EQ(reader_count.value, 0);
119  head = head->next;
120  if (head == NULL) tail = NULL;
121  writer = true;
122  }
123 
124  // completes the read lock on the head. lock must be acquired
125  // head must be a read lock
126  inline size_t complete_rdlock(request* &released) {
127  released = head;
128  size_t numcompleted = 1;
129  head = head->next;
130  request* readertail = released;
131  while (head != NULL && head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
132  readertail = head;
133  head = head->next;
134  numcompleted++;
135  }
136  reader_count += numcompleted;
137  if (head == NULL) tail = NULL;
138 
139  // now released is the head to a reader list
140  // and head is the head of a writer list
141  // I want to go through the writer list and extract all the readers
142  // this essentially
143  // splits the list into two sections, one containing only readers, and
144  // one containing only writers.
145  // (reader biased locking)
146  if (head != NULL) {
147  request* latestwriter = head;
148  request* cur = head->next;
149  while (1) {
150  if (cur->lockclass == QUEUED_RW_LOCK_REQUEST_WRITE) {
151  latestwriter = cur;
152  }
153  else {
154  readertail->next = cur;
155  readertail = cur;
156  reader_count++;
157  numcompleted++;
158  latestwriter->next = cur->next;
159  }
160  if (cur == tail) break;
161  cur=cur->next;
162  }
163  }
164  return numcompleted;
165  }
166 
167  inline size_t wrunlock(request* &released) {
168  released = NULL;
169  lock.lock();
170  writer = false;
171  size_t ret = 0;
172  if (head != NULL) {
173  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
174  ret = complete_rdlock(released);
175  if (ret == 2) assert(released->next != NULL);
176  }
177  else {
178  writer = true;
179  released = head;
180  complete_wrlock();
181  ret = 1;
182  }
183  }
184  lock.unlock();
185  return ret;
186  }
187 
188  inline size_t readlock(request *I, request* &released) {
189  released = NULL;
190  size_t ret = 0;
191  I->next = NULL;
192  I->lockclass = QUEUED_RW_LOCK_REQUEST_READ;
193  lock.lock();
194  // there are readers and no one is writing
195  if (head == NULL && writer == false) {
196  // fast path
197  ++reader_count;
198  lock.unlock();
199  released = I;
200  return 1;
201  }
202  else {
203  // slow path. Insert into queue
204  insert_queue(I);
205  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ && writer == false) {
206  ret = complete_rdlock(released);
207  }
208  lock.unlock();
209  return ret;
210  }
211  }
212 
213  inline size_t readlock_priority(request *I, request* &released) {
214  released = NULL;
215  size_t ret = 0;
216  I->next = NULL;
217  I->lockclass = QUEUED_RW_LOCK_REQUEST_READ;
218  lock.lock();
219  // there are readers and no one is writing
220  if (head == NULL && writer == false) {
221  // fast path
222  ++reader_count;
223  lock.unlock();
224  released = I;
225  return 1;
226  }
227  else {
228  // slow path. Insert into queue
229  insert_queue_head(I);
230  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ && writer == false) {
231  ret = complete_rdlock(released);
232  }
233  lock.unlock();
234  return ret;
235  }
236  }
237 
238  inline size_t rdunlock(request* &released) {
239  released = NULL;
240  lock.lock();
241  --reader_count;
242  if (reader_count == 0) {
243  size_t ret = 0;
244  if (head != NULL) {
245  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
246  ret = complete_rdlock(released);
247  }
248  else {
249  writer = true;
250  released = head;
251  complete_wrlock();
252  ret = 1;
253  }
254  }
255  lock.unlock();
256  return ret;
257  }
258  else {
259  lock.unlock();
260  return 0;
261  }
262  }
263 };
264 
265 }
266 #endif
267