GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
blocking_queue.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_BLOCKING_QUEUE_HPP
25 #define GRAPHLAB_BLOCKING_QUEUE_HPP
26 
27 
28 
29 #include <list>
30 #include <deque>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 
33 #include <graphlab/macros_def.hpp>
34 
35 namespace graphlab {
36 
37  /**
38  * \ingroup util
39  * \brief Implements a blocking queue useful for producer/consumer models
40  */
41  template<typename T>
43  protected:
44 
45  typedef typename std::deque<T> queue_type;
46 
47  bool m_alive;
48  queue_type m_queue;
49  mutex m_mutex;
50  conditional m_conditional;
51  conditional m_empty_conditional;
52 
53  volatile uint16_t sleeping;
54  volatile uint16_t sleeping_on_empty;
55 
56 
57  public:
58 
59  //! creates a blocking queue
60  blocking_queue() : m_alive(true),sleeping(0),sleeping_on_empty(0) { }
61 
62  //! Add an element to the blocking queue
63  inline void enqueue(const T& elem) {
64  m_mutex.lock();
65  m_queue.push_back(elem);
66  // Signal threads waiting on the queue
67  if (sleeping) m_conditional.signal();
68  m_mutex.unlock();
69  }
70 
71  //! Add an element to the blocking queue
72  inline void enqueue_to_head(const T& elem) {
73  m_mutex.lock();
74  m_queue.push_front(elem);
75  // Signal threads waiting on the queue
76  if (sleeping) m_conditional.signal();
77  m_mutex.unlock();
78  }
79 
80 
81 
82  inline void enqueue_conditional_signal(const T& elem, size_t signal_at_size) {
83  m_mutex.lock();
84  m_queue.push_back(elem);
85  // Signal threads waiting on the queue
86  if (sleeping && m_queue.size() >= signal_at_size) m_conditional.signal();
87  m_mutex.unlock();
88  }
89 
90 
91  bool empty_unsafe() {
92  return m_queue.empty();
93  }
94 
95  void begin_critical_section() {
96  m_mutex.lock();
97  }
98 
99 
100  bool is_alive() {
101  return m_alive;
102  }
103 
104  void swap(queue_type &q) {
105  m_mutex.lock();
106  q.swap(m_queue);
107  if (m_queue.empty() && sleeping_on_empty) {
108  m_empty_conditional.signal();
109  }
110  m_mutex.unlock();
111  }
112 
113  inline std::pair<T, bool> try_dequeue_in_critical_section() {
114  T elem = T();
115  // Wait while the queue is empty and this queue is alive
116  if (m_queue.empty() || m_alive == false) {
117  return std::make_pair(elem, false);
118  }
119  else {
120  elem = m_queue.front();
121  m_queue.pop_front();
122  if (m_queue.empty() && sleeping_on_empty) {
123  m_empty_conditional.signal();
124  }
125  return std::make_pair(elem, true);
126  }
127  }
128 
129  void end_critical_section() {
130  m_mutex.unlock();
131  }
132 
133 
134  inline std::pair<T, bool> dequeue_and_begin_critical_section_on_success() {
135  m_mutex.lock();
136  T elem = T();
137  bool success = false;
138  // Wait while the queue is empty and this queue is alive
139  while(m_queue.empty() && m_alive) {
140  sleeping++;
141  m_conditional.wait(m_mutex);
142  sleeping--;
143  }
144  // An element has been added or a signal was raised
145  if(!m_queue.empty()) {
146  success = true;
147  elem = m_queue.front();
148  m_queue.pop_front();
149  if (m_queue.empty() && sleeping_on_empty) {
150  m_empty_conditional.signal();
151  }
152  }
153  if (!success) m_mutex.unlock();
154  return std::make_pair(elem, success);
155  }
156 
157  /// Returns immediately of queue size is >= immedeiate_size
158  /// Otherwise, it will poll over 'ns' nanoseconds or on a signal
159  /// until queue is not empty.
160  inline bool timed_wait_for_data(size_t ns, size_t immediate_size) {
161  m_mutex.lock();
162  bool success = false;
163  // Wait while the queue is empty and this queue is alive
164  if (m_queue.size() < immediate_size) {
165  do {
166  sleeping++;
167  m_conditional.timedwait_ns(m_mutex, ns);
168  sleeping--;
169  }while(m_queue.empty() && m_alive);
170  }
171  // An element has been added or a signal was raised
172  if(!m_queue.empty()) {
173  success = true;
174  }
175  m_mutex.unlock();
176 
177  return success;
178  }
179 
180 
181  /// Returns immediately of queue size is >= immedeiate_size
182  /// Otherwise, it will poll over 'ns' nanoseconds or on a signal
183  /// until queue is not empty.
184  inline bool try_timed_wait_for_data(size_t ns, size_t immediate_size) {
185  m_mutex.lock();
186  bool success = false;
187  // Wait while the queue is empty and this queue is alive
188  if (m_queue.size() < immediate_size) {
189  if (m_queue.empty() && m_alive) {
190  sleeping++;
191  m_conditional.timedwait_ns(m_mutex, ns);
192  sleeping--;
193  }
194  }
195  // An element has been added or a signal was raised
196  if(!m_queue.empty()) {
197  success = true;
198  }
199  m_mutex.unlock();
200 
201  return success;
202  }
203 
204 
205 
206  inline bool wait_for_data() {
207 
208  m_mutex.lock();
209  bool success = false;
210  // Wait while the queue is empty and this queue is alive
211  while(m_queue.empty() && m_alive) {
212  sleeping++;
213  m_conditional.wait(m_mutex);
214  sleeping--;
215  }
216  // An element has been added or a signal was raised
217  if(!m_queue.empty()) {
218  success = true;
219  }
220  m_mutex.unlock();
221 
222  return success;
223  }
224 
225 
226  /**
227  * Blocks until an element is available in the queue
228  * or until stop_blocking() is called.
229  * The return value is a pair of <T value, bool success>
230  * If "success" if set, then "value" is valid and
231  * is an element popped from the queue.
232  * If "success" is false, stop_blocking() was called
233  * and the queue has been destroyed.
234  */
235  inline std::pair<T, bool> dequeue() {
236 
237  m_mutex.lock();
238  T elem = T();
239  bool success = false;
240  // Wait while the queue is empty and this queue is alive
241  while(m_queue.empty() && m_alive) {
242  sleeping++;
243  m_conditional.wait(m_mutex);
244  sleeping--;
245  }
246  // An element has been added or a signal was raised
247  if(!m_queue.empty()) {
248  success = true;
249  elem = m_queue.front();
250  m_queue.pop_front();
251  if (m_queue.empty() && sleeping_on_empty) {
252  m_empty_conditional.signal();
253  }
254  }
255  m_mutex.unlock();
256 
257  return std::make_pair(elem, success);
258  }
259 
260  /**
261  * Returns an element if the queue has an entry.
262  * returns [item, false] otherwise.
263  */
264  inline std::pair<T, bool> try_dequeue() {
265  if (m_queue.empty() || m_alive == false) return std::make_pair(T(), false);
266  m_mutex.lock();
267  T elem = T();
268  // Wait while the queue is empty and this queue is alive
269  if (m_queue.empty() || m_alive == false) {
270  m_mutex.unlock();
271  return std::make_pair(elem, false);
272  }
273  else {
274  elem = m_queue.front();
275  m_queue.pop_front();
276  if (m_queue.empty() && sleeping_on_empty) {
277  m_empty_conditional.signal();
278  }
279  }
280  m_mutex.unlock();
281 
282  return std::make_pair(elem, true);
283  }
284 
285  //! Returns true if the queue is empty
286  inline bool empty() {
287  m_mutex.lock();
288  bool res = m_queue.empty();
289  m_mutex.unlock();
290  return res;
291  }
292 
293  /** Wakes up all threads waiting on the queue whether
294  or not an element is available. Once this function is called,
295  all existing and future dequeue operations will return with failure.
296  Note that there could be elements remaining in the queue after
297  stop_blocking() is called.
298  */
299  inline void stop_blocking() {
300  m_mutex.lock();
301  m_alive = false;
302  m_conditional.broadcast();
303  m_empty_conditional.broadcast();
304  m_mutex.unlock();
305  }
306 
307  /**
308  Resumes operation of the blocking_queue. Future calls to
309  dequeue will proceed as normal.
310  */
311  inline void start_blocking() {
312  m_mutex.lock();
313  m_alive = true;
314  m_mutex.unlock();
315  }
316 
317  //! get the current size of the queue
318  inline size_t size() {
319  m_mutex.lock();
320  size_t size = m_queue.size();
321  m_mutex.unlock();
322  return size;
323  }
324 
325  /**
326  * The conceptual "reverse" of dequeue().
327  * This function will block until the queue becomes empty, or
328  * until stop_blocking() is called.
329  * Returns true on success.
330  * Returns false if the queue is no longer alive
331  */
333  m_mutex.lock();
334  // if the queue still has elements in it while I am still alive, wait
335  while (m_queue.empty() == false && m_alive == true) {
336  sleeping_on_empty++;
337  m_empty_conditional.wait(m_mutex);
338  sleeping_on_empty--;
339  }
340  m_mutex.unlock();
341  // if I am alive, the queue must be empty. i.e. success
342  // otherwise I am dead
343  return m_alive;
344  }
345 
346  /**
347  * Causes any threads currently blocking on a dequeue to wake up
348  * and evaluate the state of the queue. If the queue is empty,
349  * the threads will return back to sleep immediately. If the queue
350  * is destroyed through stop_blocking, all threads will return.
351  */
352  void broadcast() {
353  m_mutex.lock();
354  m_conditional.broadcast();
355  m_mutex.unlock();
356  }
357 
358 
359 
360  /**
361  * Causes any threads blocking on "wait_until_empty()" to wake
362  * up and evaluate the state of the queue. If the queue is not empty,
363  * the threads will return back to sleep immediately. If the queue
364  * is empty, all threads will return.
365  */
367  m_mutex.lock();
368  m_empty_conditional.broadcast();
369  m_mutex.unlock();
370  }
371 
372 
373  ~blocking_queue() {
374  m_alive = false;
375  broadcast();
377  }
378  }; // end of blocking_queue class
379 
380 
381 } // end of namespace graphlab
382 
383 #include <graphlab/macros_undef.hpp>
384 
385 #endif
386