GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
fifo_scheduler.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_FIFO_SCHEDULER_HPP
25 #define GRAPHLAB_FIFO_SCHEDULER_HPP
26 
27 #include <algorithm>
28 #include <queue>
29 
30 #include <graphlab/graph/graph_basic_types.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/parallel/atomic.hpp>
33 
34 #include <graphlab/util/random.hpp>
35 #include <graphlab/scheduler/ischeduler.hpp>
36 #include <graphlab/parallel/atomic_add_vector2.hpp>
37 
38 #include <graphlab/scheduler/get_message_priority.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
40 
41 #include <graphlab/macros_def.hpp>
42 namespace graphlab {
43 
44  /**
45  * \ingroup group_schedulers
46  *
47  * This class defines a multiple queue approximate fifo scheduler.
48  * Each processor has its own in_queue which it puts new tasks in
49  * and out_queue which it pulls tasks from. Once a processors
50  * in_queue gets too large, the entire queue is placed at the end of
51  * the shared master queue. Once a processors out queue is empty it
52  * grabs the next out_queue from the master.
53  */
54  template<typename Message>
55  class fifo_scheduler : public ischeduler<Message> {
56 
57  public:
58 
59  typedef Message message_type;
60 
61 
62  typedef std::deque<lvid_type> queue_type;
63 
64  private:
65 
67  std::vector<queue_type> queues;
68  std::vector<spinlock> locks;
69  size_t multi;
70  std::vector<size_t> current_queue;
71 
72  double min_priority;
73 
74  // Terminator
75  public:
76 
77  fifo_scheduler(size_t num_vertices,
78  const graphlab_options& opts) :
79  messages(num_vertices), multi(3),
80  current_queue(opts.get_ncpus()),
81  min_priority(-std::numeric_limits<double>::max()) {
82  set_options(opts);
83  }
84 
85 
86  void set_options(const graphlab_options& opts) {
87  size_t new_ncpus = opts.get_ncpus();
88  // check if ncpus changed
89  if (new_ncpus != current_queue.size()) {
90  logstream(LOG_INFO) << "Changing ncpus from " << current_queue.size()
91  << " to " << new_ncpus << std::endl;
92  ASSERT_GE(new_ncpus, 1);
93  current_queue.resize(new_ncpus);
94  }
95 
96  std::vector<std::string> keys = opts.get_scheduler_args().get_option_keys();
97  foreach(std::string opt, keys) {
98  if (opt == "multi") {
99  opts.get_scheduler_args().get_option("multi", multi);
100  } else if (opt == "min_priority") {
101  opts.get_scheduler_args().get_option("min_priority", min_priority);
102  } else {
103  logstream(LOG_FATAL) << "Unexpected Scheduler Option: " << opt << std::endl;
104  }
105  }
106 
107  const size_t nqueues = std::max(multi*current_queue.size(), size_t(1));
108  // changing the number of queues.
109  // reinsert everything
110  if (nqueues != queues.size()) {
111  std::vector<queue_type> old_queues;
112  std::swap(old_queues, queues);
113  queues.resize(nqueues);
114  locks.resize(nqueues);
115 
116  size_t idx = 0;
117  for (size_t i = 0;i < old_queues.size(); ++i) {
118  while (!old_queues[i].empty()) {
119  queues[idx].push_back(old_queues[i].front());
120  old_queues[i].pop_front();
121  ++idx;
122  }
123  }
124  }
125  }
126 
127 
128  void start() { }
129 
130 
131 
132  void schedule(const lvid_type vid,
133  const message_type& msg) {
134  // If this is a new message, schedule it
135  // the min priority will be taken care of by the get_next function
136  if (messages.add(vid, msg)) {
137  /* "Randomize" the task queue task is put in. Note that we do
138  not care if this counter is corrupted in race conditions
139  Find first queue that is not locked and put task there (or
140  after iteration limit) Choose two random queues and use the
141  one which has smaller size */
142  // M.D. Mitzenmacher The Power of Two Choices in Randomized
143  // Load Balancing (1991)
144  // http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.
145  size_t idx = 0;
146  if(queues.size() > 1) {
147  const uint32_t prod =
148  random::fast_uniform(uint32_t(0),
149  uint32_t(queues.size() * queues.size() - 1));
150  const uint32_t r1 = prod / queues.size();
151  const uint32_t r2 = prod % queues.size();
152  idx = (queues[r1].size() < queues[r2].size()) ? r1 : r2;
153  }
154  locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
155  }
156  } // end of schedule
157 
158 
159  void schedule_from_execution_thread(const size_t cpuid,
160  const lvid_type vid) {
161  size_t idx = 0;
162  if(queues.size() > 1) {
163  const uint32_t prod =
164  random::fast_uniform(uint32_t(0),
165  uint32_t(queues.size() * queues.size() - 1));
166  const uint32_t r1 = prod / queues.size();
167  const uint32_t r2 = prod % queues.size();
168  idx = (queues[r1].size() < queues[r2].size()) ? r1 : r2;
169  }
170  locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
171  }
172  void schedule_all(const message_type& msg,
173  const std::string& order) {
174  if(order == "shuffle") {
175  // add vertices randomly
176  std::vector<lvid_type> permutation =
177  random::permutation<lvid_type>(messages.size());
178  foreach(lvid_type vid, permutation) {
179  if(messages.add(vid,msg)) {
180  const size_t idx = vid % queues.size();
181  locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
182  }
183  }
184  } else {
185  // Add vertices sequentially
186  for (lvid_type vid = 0; vid < messages.size(); ++vid) {
187  if(messages.add(vid,msg)) {
188  const size_t idx = vid % queues.size();
189  locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
190  }
191  }
192  }
193  } // end of schedule_all
194 
195  void completed(const size_t cpuid,
196  const lvid_type vid,
197  const message_type& msg) { }
198 
199 
200  /** Get the next element in the queue */
202  lvid_type& ret_vid,
203  message_type& ret_msg) {
204  while(1) {
205  /* Check all of my queues for a task */
206  for(size_t i = 0; i < multi; ++i) {
207  const size_t idx = (++current_queue[cpuid] % multi) + cpuid * multi;
208  locks[idx].lock();
209  if(!queues[idx].empty()) {
210  ret_vid = queues[idx].front();
211  queues[idx].pop_front();
212  locks[idx].unlock();
213  const bool get_success = messages.test_and_get(ret_vid, ret_msg);
214  // managed to retrieve a task
215  if(get_success) {
216  // if it is above priority, everything is good
217  if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
218  return sched_status::NEW_TASK;
219  } else {
220  // it is below priority. try to put it back. If putting it back
221  // makes it exceed priority, reschedule it
222  message_type combined_message;
223  messages.add(ret_vid, ret_msg, combined_message);
224  double ret_priority = scheduler_impl::get_message_priority(combined_message);
225  if(ret_priority >= min_priority) {
226  locks[idx].lock();
227  queues[idx].push_back(ret_vid);
228  locks[idx].unlock();
229  }
230  }
231  }
232  else continue;
233  }
234  else {
235  locks[idx].unlock();
236  }
237  }
238  /* Check all the queues */
239  for(size_t i = 0; i < queues.size(); ++i) {
240  const size_t idx = ++current_queue[cpuid] % queues.size();
241  if(!queues[idx].empty()) { // quick pretest
242  locks[idx].lock();
243  if(!queues[idx].empty()) {
244  ret_vid = queues[idx].front();
245  queues[idx].pop_front();
246  locks[idx].unlock();
247  const bool get_success = messages.test_and_get(ret_vid, ret_msg);
248  if(get_success) {
249  // if it is above priority, everything is good
250  if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
251  return sched_status::NEW_TASK;
252  } else {
253  // it is below priority. try to put it back. If putting it back
254  // makes it exceed priority, reschedule it
255  message_type combined_message;
256  messages.add(ret_vid, ret_msg, combined_message);
257  double ret_priority = scheduler_impl::get_message_priority(combined_message);
258  if(ret_priority >= min_priority) {
259  locks[idx].lock();
260  queues[idx].push_back(ret_vid);
261  locks[idx].unlock();
262  }
263  }
264  }
265  }
266  else {
267  locks[idx].unlock();
268  }
269  }
270  }
271  break;
272  }
273  return sched_status::EMPTY;
274  } // end of get_next_task
275 
276 
277 
280  message_type& ret_msg) {
281  bool get_success = messages.test_and_get(vid, ret_msg);
282  if (get_success) return sched_status::NEW_TASK;
283  else return sched_status::EMPTY;
284  }
285 
286  void place(lvid_type vid,
287  const message_type& msg) {
288  messages.add(vid, msg);
289  }
290 
291 
292  size_t num_joins() const {
293  return messages.num_joins();
294  }
295 
296  static void print_options_help(std::ostream& out) {
297  out << "\t multi = [number of queues per thread. Default = 3].\n"
298  << "min_priority = [double, minimum priority required to receive \n"
299  << "\t a message, default = -inf]\n";
300  }
301 
302 
303  }; // end of fifo scheduler
304 
305 
306 } // end of namespace graphlab
307 #include <graphlab/macros_undef.hpp>
308 
309 #endif
310