24 #ifndef GRAPHLAB_BLOCKING_QUEUE_HPP
25 #define GRAPHLAB_BLOCKING_QUEUE_HPP
31 #include <graphlab/parallel/pthread_tools.hpp>
33 #include <graphlab/macros_def.hpp>
45 typedef typename std::deque<T> queue_type;
53 volatile uint16_t sleeping;
54 volatile uint16_t sleeping_on_empty;
65 m_queue.push_back(elem);
67 if (sleeping) m_conditional.
signal();
74 m_queue.push_front(elem);
76 if (sleeping) m_conditional.
signal();
82 inline void enqueue_conditional_signal(
const T& elem,
size_t signal_at_size) {
84 m_queue.push_back(elem);
86 if (sleeping && m_queue.size() >= signal_at_size) m_conditional.
signal();
92 return m_queue.empty();
95 void begin_critical_section() {
104 void swap(queue_type &q) {
107 if (m_queue.empty() && sleeping_on_empty) {
108 m_empty_conditional.
signal();
113 inline std::pair<T, bool> try_dequeue_in_critical_section() {
116 if (m_queue.empty() || m_alive ==
false) {
117 return std::make_pair(elem,
false);
120 elem = m_queue.front();
122 if (m_queue.empty() && sleeping_on_empty) {
123 m_empty_conditional.
signal();
125 return std::make_pair(elem,
true);
129 void end_critical_section() {
134 inline std::pair<T, bool> dequeue_and_begin_critical_section_on_success() {
137 bool success =
false;
139 while(m_queue.empty() && m_alive) {
141 m_conditional.
wait(m_mutex);
145 if(!m_queue.empty()) {
147 elem = m_queue.front();
149 if (m_queue.empty() && sleeping_on_empty) {
150 m_empty_conditional.
signal();
153 if (!success) m_mutex.
unlock();
154 return std::make_pair(elem, success);
162 bool success =
false;
164 if (m_queue.size() < immediate_size) {
169 }
while(m_queue.empty() && m_alive);
172 if(!m_queue.empty()) {
186 bool success =
false;
188 if (m_queue.size() < immediate_size) {
189 if (m_queue.empty() && m_alive) {
196 if(!m_queue.empty()) {
206 inline bool wait_for_data() {
209 bool success =
false;
211 while(m_queue.empty() && m_alive) {
213 m_conditional.
wait(m_mutex);
217 if(!m_queue.empty()) {
239 bool success =
false;
241 while(m_queue.empty() && m_alive) {
243 m_conditional.
wait(m_mutex);
247 if(!m_queue.empty()) {
249 elem = m_queue.front();
251 if (m_queue.empty() && sleeping_on_empty) {
252 m_empty_conditional.
signal();
257 return std::make_pair(elem, success);
265 if (m_queue.empty() || m_alive ==
false)
return std::make_pair(T(),
false);
269 if (m_queue.empty() || m_alive ==
false) {
271 return std::make_pair(elem,
false);
274 elem = m_queue.front();
276 if (m_queue.empty() && sleeping_on_empty) {
277 m_empty_conditional.
signal();
282 return std::make_pair(elem,
true);
288 bool res = m_queue.empty();
320 size_t size = m_queue.size();
335 while (m_queue.empty() ==
false && m_alive ==
true) {
337 m_empty_conditional.
wait(m_mutex);
383 #include <graphlab/macros_undef.hpp>