Caffe2 - C++ API
A deep learning, cross platform ML framework
simple_queue.h
1 #ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_
2 #define CAFFE2_UTILS_SIMPLE_QUEUE_H_
3 
4 #include <condition_variable> // NOLINT
5 #include <mutex> // NOLINT
6 #include <queue>
7 
8 #include "caffe2/core/logging.h"
9 
10 namespace caffe2 {
11 
12 // This is a very simple queue that Yangqing wrote when bottlefeeding the baby,
13 // so don't take it seriously. What it does is a minimal thread-safe queue that
14 // allows me to run network as a DAG.
15 //
16 // A usual work pattern looks like this: one or multiple producers push jobs
17 // into this queue, and one or multiple workers pops jobs from this queue. If
18 // nothing is in the queue but NoMoreJobs() is not called yet, the pop calls
19 // will wait. If NoMoreJobs() has been called, pop calls will return false,
20 // which serves as a message to the workers that they should exit.
21 template <typename T>
22 class SimpleQueue {
23  public:
24  SimpleQueue() : no_more_jobs_(false) {}
25 
26  // Pops a value and writes it to the value pointer. If there is nothing in the
27  // queue, this will wait till a value is inserted to the queue. If there are
28  // no more jobs to pop, the function returns false. Otherwise, it returns
29  // true.
30  bool Pop(T* value) {
31  std::unique_lock<std::mutex> mutex_lock(mutex_);
32  while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock);
33  if (queue_.size() == 0 && no_more_jobs_) return false;
34  *value = queue_.front();
35  queue_.pop();
36  return true;
37  }
38 
39  int size() {
40  std::unique_lock<std::mutex> mutex_lock(mutex_);
41  return queue_.size();
42  }
43 
44  // Push pushes a value to the queue.
45  void Push(const T& value) {
46  {
47  std::lock_guard<std::mutex> mutex_lock(mutex_);
48  CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue.");
49  queue_.push(value);
50  }
51  cv_.notify_one();
52  }
53 
54  // NoMoreJobs() marks the close of this queue. It also notifies all waiting
55  // Pop() calls so that they either check out remaining jobs, or return false.
56  // After NoMoreJobs() is called, this queue is considered closed - no more
57  // Push() functions are allowed, and once existing items are all checked out
58  // by the Pop() functions, any more Pop() function will immediately return
59  // false with nothing set to the value.
60  void NoMoreJobs() {
61  {
62  std::lock_guard<std::mutex> mutex_lock(mutex_);
63  no_more_jobs_ = true;
64  }
65  cv_.notify_all();
66  }
67 
68  private:
69  std::mutex mutex_;
70  std::condition_variable cv_;
71  std::queue<T> queue_;
72  bool no_more_jobs_;
73  // We do not allow copy constructors.
74  SimpleQueue(const SimpleQueue& src) {}
75 };
76 
77 } // namespace caffe2
78 
79 #endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...