GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
priority_scheduler.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 /**
25  * \author jegonzal This class defines a multiqueue version of the
26  * priority scheduler.
27  **/
28 #ifndef GRAPHLAB_PRIORITY_SCHEDULER_HPP
29 #define GRAPHLAB_PRIORITY_SCHEDULER_HPP
30 
31 #include <queue>
32 #include <cmath>
33 #include <cassert>
34 
35 
36 #include <graphlab/graph/graph_basic_types.hpp>
37 #include <graphlab/parallel/pthread_tools.hpp>
38 
39 #include <graphlab/util/mutable_queue.hpp>
40 #include <graphlab/scheduler/ischeduler.hpp>
41 #include <graphlab/parallel/atomic_add_vector2.hpp>
42 #include <graphlab/scheduler/get_message_priority.hpp>
43 #include <graphlab/options/graphlab_options.hpp>
44 
45 
46 #include <graphlab/macros_def.hpp>
47 namespace graphlab {
48 
49  /** \ingroup group_schedulers
50  */
51  template<typename Message>
52  class priority_scheduler : public ischeduler<Message> {
53  public:
54 
55  typedef Message message_type;
56 
57  typedef mutable_queue<lvid_type, double> queue_type;
58 
59  private:
60  atomic_add_vector2<message_type> messages;
61  std::vector<queue_type> queues;
62  std::vector<spinlock> locks;
63  size_t multi;
64  std::vector<size_t> current_queue;
65 
66  double min_priority;
67 
68  // Terminator
69 
70 
71 
72  public:
73 
74  priority_scheduler(size_t num_vertices,
75  const graphlab_options& opts) :
76  messages(num_vertices), multi(3),
77  current_queue(opts.get_ncpus()),
78  min_priority(-std::numeric_limits<double>::max()) {
79  set_options(opts);
80  }
81 
82  void set_options(const graphlab_options& opts) {
83  size_t new_ncpus = opts.get_ncpus();
84  // check if ncpus changed
85  if (new_ncpus != current_queue.size()) {
86  logstream(LOG_INFO) << "Changing ncpus from " << current_queue.size()
87  << " to " << new_ncpus << std::endl;
88  ASSERT_GE(new_ncpus, 1);
89  current_queue.resize(new_ncpus);
90  }
91 
92  std::vector<std::string> keys = opts.get_scheduler_args().get_option_keys();
93  foreach(std::string opt, keys) {
94  if (opt == "multi") {
95  opts.get_scheduler_args().get_option("multi", multi);
96  } else if (opt == "min_priority") {
97  opts.get_scheduler_args().get_option("min_priority", min_priority);
98  } else {
99  logstream(LOG_FATAL) << "Unexpected Scheduler Option: " << opt << std::endl;
100  }
101  }
102 
103  const size_t nqueues = std::max(multi*current_queue.size(), size_t(1));
104  // changing the number of queues.
105  // reinsert everything
106  if (nqueues != queues.size()) {
107  std::vector<queue_type> old_queues;
108  std::swap(old_queues, queues);
109  queues.resize(nqueues);
110  locks.resize(nqueues);
111 
112  size_t idx = 0;
113  for (size_t i = 0;i < old_queues.size(); ++i) {
114  while (!old_queues[i].empty()) {
115  queues[idx].push(old_queues[i].top().first,
116  old_queues[i].top().second);
117  old_queues[i].pop();
118  ++idx;
119  }
120  }
121  }
122  }
123 
124  void start() { }
125 
126 
127  void schedule(const lvid_type vid,
128  const message_type& msg) {
129  const size_t idx = vid % queues.size();
130  message_type combined_message;
131 
132  messages.add(vid, msg, combined_message);
133  // If the new priority will is above priority, put it in the queue
134  if (scheduler_impl::get_message_priority(combined_message) >= min_priority) {
135  locks[idx].lock();
136  queues[idx].push_or_update(vid,
137  scheduler_impl::get_message_priority(combined_message));
138  locks[idx].unlock();
139  }
140  } // end of schedule
141 
142 
143 
144  void schedule_from_execution_thread(const size_t cpuid,
145  const lvid_type vid) {
146  const size_t idx = vid % queues.size();
147  message_type combined_message;
148 
149  messages.peek(vid, combined_message);
150  // If the new priority will is above priority, put it in the queue
151  if (scheduler_impl::get_message_priority(combined_message) >= min_priority) {
152  locks[idx].lock();
153  queues[idx].push_or_update(vid,
154  scheduler_impl::get_message_priority(combined_message));
155  locks[idx].unlock();
156  }
157  } // end of schedule
158 
159 
160 
161  void schedule_all(const message_type& msg,
162  const std::string& order) {
163  if(order == "shuffle") {
164  std::vector<lvid_type> permutation =
165  random::permutation<lvid_type>(messages.size());
166  foreach(lvid_type vid, permutation) schedule(vid, msg);
167  } else {
168  for (lvid_type vid = 0; vid < messages.size(); ++vid)
169  schedule(vid, msg);
170  }
171  } // end of schedule_all
172 
173  void completed(const size_t cpuid,
174  const lvid_type vid,
175  const message_type& msg) { }
176 
177 
178  /** Get the next element in the queue */
179  sched_status::status_enum get_next(const size_t cpuid,
180  lvid_type& ret_vid,
181  message_type& ret_msg) {
182  while(1) {
183  /* Check all of my queues for a task */
184  for(size_t i = 0; i < multi; ++i) {
185  const size_t idx = (++current_queue[cpuid] % multi) + cpuid * multi;
186  locks[idx].lock();
187  if(!queues[idx].empty() &&
188  queues[idx].top().second >= min_priority) {
189  ret_vid = queues[idx].pop().first;
190  const bool get_success = messages.test_and_get(ret_vid, ret_msg);
191  locks[idx].unlock();
192  if(get_success) return sched_status::NEW_TASK;
193  else continue;
194  }
195  locks[idx].unlock();
196  }
197  /* Check all the queues */
198  for(size_t i = 0; i < queues.size(); ++i) {
199  const size_t idx = ++current_queue[cpuid] % queues.size();
200  if(!queues[idx].empty()) { // quick pretest
201  locks[idx].lock();
202  if(!queues[idx].empty() &&
203  queues[idx].top().second >= min_priority) {
204  ret_vid = queues[idx].pop().first;
205  const bool get_success = messages.test_and_get(ret_vid, ret_msg);
206  locks[idx].unlock();
207  if(get_success) return sched_status::NEW_TASK;
208  else continue;
209  }
210  locks[idx].unlock();
211  }
212  }
213  break;
214  }
215  return sched_status::EMPTY;
216  } // end of get_next_task
217 
218 
219  size_t num_joins() const {
220  return messages.num_joins();
221  }
222 
223 
224 
226  get_specific(lvid_type vid,
227  message_type& ret_msg) {
228  bool get_success = messages.test_and_get(vid, ret_msg);
229  if (get_success) return sched_status::NEW_TASK;
230  else return sched_status::EMPTY;
231  }
232 
233  void place(lvid_type vid,
234  const message_type& msg) {
235  messages.add(vid, msg);
236  }
237 
238 
239  static void print_options_help(std::ostream& out) {
240  out << "\t multi = [number of queues per thread. Default = 3].\n"
241  << "min_priority = [double, minimum priority required to receive \n"
242  << "\t a message, default = -inf]\n";
243  }
244 
245  }; // end of class priority scheduler
246 
247 
248 } // end of namespace graphlab
249 #include <graphlab/macros_undef.hpp>
250 
251 #endif
252