GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
queued_fifo_scheduler.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_QUEUED_FIFO_SCHEDULER_HPP
25 #define GRAPHLAB_QUEUED_FIFO_SCHEDULER_HPP
26 
27 #include <algorithm>
28 #include <queue>
29 
30 
31 #include <graphlab/graph/graph_basic_types.hpp>
32 #include <graphlab/parallel/pthread_tools.hpp>
33 #include <graphlab/parallel/atomic.hpp>
34 
35 
36 #include <graphlab/scheduler/ischeduler.hpp>
37 #include <graphlab/parallel/atomic_add_vector2.hpp>
38 
39 #include <graphlab/scheduler/get_message_priority.hpp>
40 #include <graphlab/options/graphlab_options.hpp>
41 
42 
43 #include <graphlab/macros_def.hpp>
44 namespace graphlab {
45 
46  /**
47  * \ingroup group_schedulers
48  *
49  * This class defines a multiple queue approximate fifo scheduler.
50  * Each processor has its own in_queue which it puts new tasks in
51  * and out_queue which it pulls tasks from. Once a processors
52  * in_queue gets too large, the entire queue is placed at the end of
53  * the shared master queue. Once a processors out queue is empty it
54  * grabs the next out_queue from the master.
55  */
56  template<typename Message>
57  class queued_fifo_scheduler : public ischeduler<Message> {
58 
59  public:
60 
61  typedef Message message_type;
62 
63  typedef std::deque<lvid_type> queue_type;
64 
65  private:
67  std::deque<queue_type> master_queue;
68  mutex master_lock;
69  size_t sub_queue_size;
70  std::vector<queue_type> in_queues;
71  std::vector<mutex> in_queue_locks;
72  std::vector<queue_type> out_queues;
73  double min_priority;
74  // Terminator
75  public:
76 
77  queued_fifo_scheduler(size_t num_vertices,
78  const graphlab_options& opts) :
79  messages(num_vertices),
80  sub_queue_size(100),
81  in_queues(opts.get_ncpus()), in_queue_locks(opts.get_ncpus()),
82  out_queues(opts.get_ncpus()),
83  min_priority(-std::numeric_limits<double>::max()){
84  set_options(opts);
85  }
86 
87  void set_options(const graphlab_options& opts) {
88  size_t new_ncpus = opts.get_ncpus();
89  // check if ncpus changed
90  if (new_ncpus != in_queues.size()) {
91  logstream(LOG_INFO) << "Changing ncpus from " << in_queues.size()
92  << " to " << new_ncpus << std::endl;
93  ASSERT_GE(new_ncpus, 1);
94  // if increasing ncpus, we just resize the queues
95  // push everything in in_queues to the master queue
96  for (size_t i = 0;i < in_queues.size(); ++i) {
97  master_queue.push_back(in_queues[i]);
98  in_queues[i].clear();
99  }
100  // resize the queues
101  in_queues.resize(new_ncpus);
102  in_queue_locks.resize(new_ncpus);
103  out_queues.resize(new_ncpus);
104  }
105 
106  // read the remaining options.
107  std::vector<std::string> keys = opts.get_scheduler_args().get_option_keys();
108  foreach(std::string opt, keys) {
109  if (opt == "queuesize") {
110  opts.get_scheduler_args().get_option("queuesize", sub_queue_size);
111  } else if (opt == "min_priority") {
112  opts.get_scheduler_args().get_option("min_priority", min_priority);
113  } else {
114  logstream(LOG_FATAL) << "Unexpected Scheduler Option: " << opt << std::endl;
115  }
116  }
117  }
118 
119  void start() {
120  master_lock.lock();
121  for (size_t i = 0;i < in_queues.size(); ++i) {
122  master_queue.push_back(in_queues[i]);
123  in_queues[i].clear();
124  }
125  master_lock.unlock();
126  }
127 
128  void schedule(const lvid_type vid,
129  const message_type& msg) {
130  // If this is a new message, schedule it
131  // the min priority will be taken care of by the get_next function
132  if (messages.add(vid, msg)) {
133  const size_t cpuid = random::rand() % in_queues.size();
134  in_queue_locks[cpuid].lock();
135  queue_type& queue = in_queues[cpuid];
136  queue.push_back(vid);
137  if(queue.size() > sub_queue_size) {
138  master_lock.lock();
139  queue_type emptyq;
140  master_queue.push_back(emptyq);
141  master_queue.back().swap(queue);
142  master_lock.unlock();
143  }
144  in_queue_locks[cpuid].unlock();
145  }
146  } // end of schedule
147 
148  void schedule_from_execution_thread(const size_t cpuid,
149  const lvid_type vid) {
150  if (!messages.empty(vid)) {
151  ASSERT_LT(cpuid, in_queues.size());
152  in_queue_locks[cpuid].lock();
153  queue_type& queue = in_queues[cpuid];
154  queue.push_back(vid);
155  if(queue.size() > sub_queue_size) {
156  master_lock.lock();
157  queue_type emptyq;
158  master_queue.push_back(emptyq);
159  master_queue.back().swap(queue);
160  master_lock.unlock();
161  }
162  in_queue_locks[cpuid].unlock();
163  }
164  } // end of schedule
165 
166  void schedule_all(const message_type& msg,
167  const std::string& order) {
168  if(order == "shuffle") {
169  std::vector<lvid_type> permutation =
170  random::permutation<lvid_type>(messages.size());
171  foreach(lvid_type vid, permutation) schedule(vid, msg);
172  } else {
173  for (lvid_type vid = 0; vid < messages.size(); ++vid)
174  schedule(vid, msg);
175  }
176  } // end of schedule_all
177 
178  void completed(const size_t cpuid,
179  const lvid_type vid,
180  const message_type& msg) {
181  }
182 
183 
186  message_type& ret_msg) {
187  bool get_success = messages.test_and_get(vid, ret_msg);
188  if (get_success) return sched_status::NEW_TASK;
189  else return sched_status::EMPTY;
190  }
191 
192  void place(lvid_type vid,
193  const message_type& msg) {
194  messages.add(vid, msg);
195  }
196 
197 
198  /** Get the next element in the queue */
200  lvid_type& ret_vid,
201  message_type& ret_msg) {
202  // if the local queue is empty try to get a queue from the master
203  while(1) {
204  if(out_queues[cpuid].empty()) {
205  master_lock.lock();
206  if(!master_queue.empty()) {
207  out_queues[cpuid].swap(master_queue.front());
208  master_queue.pop_front();
209  }
210  master_lock.unlock();
211  }
212  // if the local queue is still empty see if there is any local
213  // work left
214  in_queue_locks[cpuid].lock();
215  if(out_queues[cpuid].empty() && !in_queues[cpuid].empty()) {
216  out_queues[cpuid].swap(in_queues[cpuid]);
217  }
218  in_queue_locks[cpuid].unlock();
219  // end of get next
220  queue_type& queue = out_queues[cpuid];
221  if(!queue.empty()) {
222  ret_vid = queue.front();
223  queue.pop_front();
224  if(messages.test_and_get(ret_vid, ret_msg)) {
225  if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
226  return sched_status::NEW_TASK;
227  } else {
228  // it is below priority. try to put it back. If putting it back
229  // makes it exceed priority, reschedule it
230  message_type combined_message;
231  messages.add(ret_vid, ret_msg, combined_message);
232  double ret_priority = scheduler_impl::get_message_priority(combined_message);
233  if(ret_priority >= min_priority) {
234  // aargh. we put it back and it exceeded priority
235  // stick it back in the queue.
236  in_queue_locks[cpuid].lock();
237  in_queues[cpuid].push_back(ret_vid);
238  in_queue_locks[cpuid].unlock();
239  }
240  }
241  }
242  } else {
243  return sched_status::EMPTY;
244  }
245  }
246  } // end of get_next_task
247 
248 
249  size_t num_joins() const {
250  return messages.num_joins();
251  }
252  /**
253  * Print a help string describing the options that this scheduler
254  * accepts.
255  */
256  static void print_options_help(std::ostream& out) {
257  out << "\t queuesize: [the size at which a subqueue is "
258  << "placed in the master queue. default = 100]\n"
259  << "min_priority = [double, minimum priority required to receive \n"
260  << "\t a message, default = -inf]\n";
261  }
262 
263 
264  };
265 
266 
267 } // end of namespace graphlab
268 #include <graphlab/macros_undef.hpp>
269 
270 #endif
271