GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
atomic_add_vector.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 /**
25  * Also contains code that is Copyright 2011 Yahoo! Inc. All rights
26  * reserved.
27  *
28  * Contributed under the iCLA for:
29  * Joseph Gonzalez ([email protected])
30  *
31  */
32 
33 
34 
35 #ifndef GRAPHLAB_ATOMIC_ADD_VECTOR_HPP
36 #define GRAPHLAB_ATOMIC_ADD_VECTOR_HPP
37 
38 
39 #include <vector>
40 
41 
42 #include <graphlab/parallel/pthread_tools.hpp>
43 #include <graphlab/util/lock_free_pool.hpp>
44 
45 
46 
47 namespace graphlab {
48 
49  /**
50  * \TODO DOCUMENT THIS CLASS
51  */
52 
53  template<typename ValueType>
55  public:
56  typedef ValueType value_type;
57 
58  // We needed a second "NULL" pointer value to indicate a value
59  // that is being swapped.
60 #define VALUE_PENDING (value_type*)(size_t)(-1)
61 
62  private:
63  lock_free_pool<value_type> pool;
64  atomic<size_t> joincounter;
65 
66  class atomic_box_type {
67  private:
68  value_type* volatile value_ptr;
69  public:
70  atomic_box_type() : value_ptr(NULL) { }
71 
72  // void assign_unsafe(const atomic_box_type &other) {
73  // value_ptr = other.value_ptr;
74  // }
75 
76  // bool peek_unsafe(const value_type& other) {
77  // if (value_ptr != NULL) {
78  // other = (*value_ptr);
79  // return true;
80  // } else { return false; }
81  // }
82 
83  // bool get_reference_unsafe(value_type*& ret) {
84  // ret = value_ptr;
85  // return ret != NULL;
86  // }
87 
88  // /** returns true if set for the first time */
89  // inline bool set_unsafe(lock_free_pool<value_type>& pool,
90  // const value_type& other,
91  // atomic<size_t> &joincounter) {
92  // if (value_ptr == NULL) {
93  // value_ptr = pool.alloc();
94  // (*value_ptr) = other;
95  // return true;
96  // } else {
97  // (*value_ptr) += other;
98  // joincounter.inc();
99  // return false;
100  // }
101  // } // end of set_unsafe
102 
103  inline bool peek(lock_free_pool<value_type>& pool,
104  value_type& new_value,
105  atomic<size_t>& joincounter) {
106  bool retval = false;
107 
108  while(1) {
109  value_type* vptr = VALUE_PENDING;
110  // pull it out to process it
111  atomic_exchange(value_ptr, vptr);
112  if (vptr == VALUE_PENDING) {
113  // nothing. try again
114  continue;
115  } else {
116  // read the value
117  new_value = (*vptr);
118  }
119  // swap it back
120  atomic_exchange(value_ptr, vptr);
121  //aargh! I swapped something else out. Now we have to
122  //try to put it back in
123  if (__unlikely__(vptr != NULL && vptr != VALUE_PENDING)) {
124  retval = set(pool, (*vptr), new_value, joincounter);
125  }
126  break;
127  }
128  return retval;
129  }
130 
131 
132  /** returns true if set for the first time */
133  inline bool set(lock_free_pool<value_type>& pool,
134  const value_type& other,
135  value_type& new_value,
136  atomic<size_t>& joincounter) {
137  bool ret = false;
138  value_type toinsert = other;
139  while(1) {
140  value_type* vptr = VALUE_PENDING;
141  // pull it out to process it
142  atomic_exchange(value_ptr, vptr);
143  // if there is nothing in there, set it
144  // otherwise add it
145  if (vptr == NULL) {
146  vptr = pool.alloc();
147  (*vptr) = toinsert;
148  ret = true;
149  } else if (vptr == VALUE_PENDING) {
150  // a pending is in here. it is not ready for reading. try again.
151  continue;
152  } else { (*vptr) += toinsert; joincounter.inc(); }
153  // swap it back in
154  ASSERT_TRUE(vptr != VALUE_PENDING);
155  new_value = (*vptr);
156  atomic_exchange(value_ptr, vptr);
157  //aargh! I swapped something else out. Now we have to
158  //try to put it back in
159  if (__unlikely__(vptr != NULL && vptr != VALUE_PENDING)) {
160  toinsert = (*vptr);
161  } else { break; }
162  }
163  return ret;
164  }
165 
166  void clear(lock_free_pool<value_type>& pool) {
167  value_type val; test_and_get(val);
168  }
169 
170  bool empty() { return value_ptr == NULL; }
171 
172  inline bool test_and_get(lock_free_pool<value_type>& pool,
173  value_type& r) {
174  value_type* ret;
175  while (1) {
176  ret = value_ptr;
177  if (ret == NULL) return false;
178  else if (__likely__(ret != VALUE_PENDING)) {
179  if (__likely__(atomic_compare_and_swap(value_ptr, ret,
180  (value_type*)NULL))) {
181  r = *ret;
182  pool.free(ret);
183  return true;
184  }
185  }
186  }
187  return false;
188  } // end of test_and_get
189 
190 
191 
192 
193  }; // end of atomic_box_type;
194 
195 
196 
197  typedef std::vector< atomic_box_type > atomic_box_vec_type;
198  atomic_box_vec_type atomic_box_vec;
199 
200 
201  /** Not assignable */
202  void operator=(const atomic_add_vector& other) { }
203 
204 
205  public:
206  /** Initialize the per vertex task set */
207  atomic_add_vector(size_t num_vertices = 0) :
208  pool(num_vertices + 256), atomic_box_vec(num_vertices) { }
209 
210  /**
211  * Resize the internal locks for a different graph
212  */
213  void resize(size_t num_vertices) {
214  atomic_box_vec.resize(num_vertices);
215  pool.reset_pool(num_vertices + 256);
216  }
217 
218  /** Add a task to the set returning false if the task was already
219  present. */
220  bool add(const size_t& idx,
221  const value_type& val) {
222  ASSERT_LT(idx, atomic_box_vec.size());
223  value_type new_value;
224  return atomic_box_vec[idx].set(pool, val, new_value, joincounter);
225  } // end of add task to set
226 
227 
228  // /** Add a task to the set returning false if the task was already
229  // present. */
230  // bool add_unsafe(const size_t& idx,
231  // const value_type& val) {
232  // ASSERT_LT(idx, atomic_box_vec.size());
233  // return atomic_box_vec[idx].set_unsafe(pool, val, joincounter);
234  // } // end of add task to set
235 
236 
237  bool add(const size_t& idx,
238  const value_type& val,
239  value_type& new_value) {
240  ASSERT_LT(idx, atomic_box_vec.size());
241  return atomic_box_vec[idx].set(pool, val, new_value, joincounter);
242  } // end of add task to set
243 
244 
245 
246  // /** Add a task to the set returning false if the task was already
247  // present. Returns the priority of the task before and after
248  // insertion. If the task did not exist prior to the add,
249  // prev_priority = 0 */
250  // bool add(const size_t& idx,
251  // const value_type& val,
252  // double& prev_priority,
253  // double& new_priority) {
254  // ASSERT_LT(idx, atomic_box_vec.size());
255  // return atomic_box_vec[idx].set(pool, val, prev_priority, new_priority,
256  // joincounter);
257  // } // end of add task to set
258 
259  // bool get_nondestructive_unsafe(const size_t& idx,
260  // value_type& ret_val) {
261  // return atomic_box_vec[idx].get_nondestructive_unsafe(ret_val);
262  // }
263 
264  // bool get_reference_unsafe(const size_t& idx,
265  // value_type*& ret_val) {
266  // return atomic_box_vec[idx].get_reference_unsafe(ret_val);
267  // }
268 
269 
270  bool test_and_get(const size_t& idx,
271  value_type& ret_val) {
272  ASSERT_LT(idx, atomic_box_vec.size());
273  return atomic_box_vec[idx].test_and_get(pool, ret_val);
274  }
275 
276  bool peek(const size_t& idx,
277  value_type& ret_val) {
278  ASSERT_LT(idx, atomic_box_vec.size());
279  return atomic_box_vec[idx].peek(pool, ret_val, joincounter);
280  }
281 
282  bool empty(const size_t& idx) {
283  return atomic_box_vec[idx].empty();
284  }
285 
286  size_t size() const {
287  return atomic_box_vec.size();
288  }
289 
290  size_t num_joins() const {
291  return joincounter.value;
292  }
293 
294 
295  void clear() {
296  for (size_t i = 0; i < atomic_box_vec.size(); ++i) clear(i);
297  }
298 
299  void clear(size_t i) { atomic_box_vec[i].clear(pool); }
300 
301  }; // end of vertex map
302 
303 }; // end of namespace graphlab
304 
305 #undef VALUE_PENDING
306 
307 #endif
308