GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
lockfree_push_back.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_PARALLEL_LOCKFREE_PUSHBACK_HPP
25 #define GRAPHLAB_PARALLEL_LOCKFREE_PUSHBACK_HPP
26 #include <graphlab/parallel/atomic.hpp>
27 
28 namespace graphlab {
29 
30 namespace lockfree_push_back_impl {
31  struct idx_ref {
32  idx_ref(): reference_count(0), idx(0) { }
33  idx_ref(size_t idx): reference_count(0), idx(idx) { }
34 
35  volatile int reference_count;
36  atomic<size_t> idx;
37  enum {
38  MAX_REF = 65536
39  };
40 
41  inline void inc_ref() {
42  while (1) {
43  int curref = reference_count;
44  if ((curref & MAX_REF) == 0 &&
45  atomic_compare_and_swap(reference_count, curref, curref + 1)) {
46  break;
47  }
48  }
49  }
50 
51  inline void wait_till_no_ref() {
52  while((reference_count & (MAX_REF - 1)) != 0);
53  }
54 
55  inline void dec_ref() {
56  __sync_fetch_and_sub(&reference_count, 1);
57  }
58 
59  inline void flag_ref() {
60  __sync_fetch_and_xor(&reference_count, MAX_REF);
61  }
62 
63  inline size_t inc_idx() {
64  return idx.inc_ret_last();
65  }
66 
67  inline size_t inc_idx(size_t n) {
68  return idx.inc_ret_last(n);
69  }
70  };
71 } // lockfree_push_back_impl
72 
73 /**
74  * Provides a lock free way to insert elements to the end
75  * of a container. Container must provide 3 functions.
76  * - T& operator[](size_t idx)
77  * - void resize(size_t len)
78  * - size_t size()
79  *
80  * resize(n) must guarantee that size() >= n.
81  * T& operator[](size_t idx) must succeed for idx < size() and must be
82  * safely executeable in parallel.
83  * size() must be safely executeable in parallel with resize().
84  */
85 template <typename Container, typename T = typename Container::value_type>
87  private:
88  Container& container;
89  lockfree_push_back_impl::idx_ref cur;
90  mutex mut;
91  float scalefactor;
92  public:
93  lockfree_push_back(Container& container, size_t startidx, float scalefactor = 2):
94  container(container),cur(startidx), scalefactor(scalefactor) { }
95 
96  size_t size() const {
97  return cur.idx.value;
98  }
99 
100  void set_size(size_t s) {
101  cur.idx.value = s;
102  }
103 
104  template <typename Iterator>
105  size_t push_back(Iterator begin, Iterator end) {
106  size_t numel = std::distance(begin, end);
107  size_t putpos = cur.inc_idx(numel);
108  size_t endidx = putpos + numel;
109  while(1) {
110  cur.inc_ref();
111  if (endidx <= container.size()) {
112  while(putpos < endidx) {
113  container[putpos] = (*begin);
114  ++putpos; ++begin;
115  }
116  cur.dec_ref();
117  break;
118  }
119  else {
120  cur.dec_ref();
121 
122  if (mut.try_lock()) {
123  // ok. we need to resize
124  // flag the reference and wait till there are no more references
125  cur.flag_ref();
126  cur.wait_till_no_ref();
127  // we are exclusive here. resize
128  if (endidx > container.size()) {
129  container.resize(std::max<size_t>(endidx, container.size() * scalefactor));
130  }
131  while(putpos < endidx) {
132  container[putpos] = (*begin);
133  ++putpos; ++begin;
134  }
135  cur.flag_ref();
136  mut.unlock();
137  break;
138  }
139  }
140  }
141  return putpos;
142  }
143 
144  bool query(size_t item, T& value) {
145  bool ret = false;
146  cur.inc_ref();
147  if (item < cur.idx) {
148  value = container[item];
149  ret = true;
150  }
151  cur.dec_ref();
152  return ret;
153  }
154 
155  T* query(size_t item) {
156  T* ret = NULL;
157  cur.inc_ref();
158  if (item < cur.idx) {
159  ret = &(container[item]);
160  }
161  cur.dec_ref();
162  return ret;
163  }
164 
165  bool query_unsafe(size_t item, T& value) {
166  bool ret = false;
167  if (item < cur.idx) {
168  value = container[item];
169  ret = true;
170  }
171  return ret;
172  }
173 
174  T* query_unsafe(size_t item) {
175  T* ret = NULL;
176  if (item < cur.idx) {
177  ret = &(container[item]);
178  }
179  return ret;
180  }
181 
182 
183  size_t push_back(const T& t) {
184  size_t putpos = cur.inc_idx();
185  while(1) {
186  cur.inc_ref();
187  if (putpos < container.size()) {
188  container[putpos] = t;
189  cur.dec_ref();
190  break;
191  }
192  else {
193  cur.dec_ref();
194 
195  if (mut.try_lock()) {
196  // ok. we need to resize
197  // flag the reference and wait till there are no more references
198  cur.flag_ref();
199  cur.wait_till_no_ref();
200  // we are exclusive here. resize
201  if (putpos >= container.size()) {
202  container.resize(std::max<size_t>(putpos + 1, container.size() * scalefactor));
203  }
204  container[putpos] = t;
205  cur.flag_ref();
206  mut.unlock();
207  break;
208  }
209  }
210  }
211  return putpos;
212  }
213 };
214 
215 } // namespace graphlab
216 #endif