GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
sweep_scheduler.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_SWEEP_SCHEDULER_HPP
24 #define GRAPHLAB_SWEEP_SCHEDULER_HPP
25 
26 #include <queue>
27 #include <cmath>
28 #include <cassert>
29 
30 #include <graphlab/scheduler/ischeduler.hpp>
31 #include <graphlab/parallel/atomic_add_vector2.hpp>
32 #include <graphlab/graph/graph_basic_types.hpp>
33 
34 #include <graphlab/scheduler/get_message_priority.hpp>
35 #include <graphlab/options/graphlab_options.hpp>
36 
37 #include <graphlab/macros_def.hpp>
38 
39 namespace graphlab {
40 
41  /** \ingroup group_schedulers
42  */
43  template<typename Message>
44  class sweep_scheduler : public ischeduler<Message> {
45  public:
46  typedef Message message_type;
47 
48  private:
49 
50  size_t ncpus;
51 
52  bool strict_round_robin;
53  atomic<size_t> rr_index;
54  size_t max_iterations;
55 
56 
57  std::vector<lvid_type> vids;
58  std::vector<uint16_t> vid2cpu;
59  std::vector<lvid_type> cpu2index;
60 
61  atomic_add_vector2<message_type> messages;
62  double min_priority;
63  std::string ordering;
64 
65  public:
66  sweep_scheduler(size_t num_vertices,
67  const graphlab_options& opts) :
68  ncpus(opts.get_ncpus()),
69  strict_round_robin(false),
70  max_iterations(std::numeric_limits<size_t>::max()),
71  vids(num_vertices),
72  messages(num_vertices),
73  min_priority(-std::numeric_limits<double>::max()) {
74  // initialize defaults
75  ordering = "random";
76  set_options(opts);
77 
78  for(size_t i = 0; i < vids.size(); ++i) vids[i] = i;
79  if (ordering == "ascending") {
80  logstream(LOG_INFO) << "Using an ascending ordering of the vertices." << std::endl;
81  } else if(ordering == "random") {
82  logstream(LOG_INFO) << "Using a random ordering of the vertices." << std::endl;
83  random::shuffle(vids);
84  }
85 
86  if(strict_round_robin) {
87  logstream(LOG_INFO)
88  << "Using a strict round robin schedule." << std::endl;
89  // Max iterations only applies to strict round robin
90  if(max_iterations != std::numeric_limits<size_t>::max()) {
91  logstream(LOG_INFO)
92  << "Using maximum iterations: " << max_iterations << std::endl;
93  }
94  rr_index = 0;
95  } else {
96  // each cpu is responsible for its own subset of vertices
97  // Initialize the cpu2index counters
98  cpu2index.resize(ncpus);
99  for(size_t i = 0; i < cpu2index.size(); ++i) cpu2index[i] = i;
100  // Initialze the reverse map vid2cpu assignment
101  vid2cpu.resize(vids.size());
102  for(size_t i = 0; i < vids.size(); ++i) vid2cpu[vids[i]] = i % ncpus;
103  }
104 
105  } // end of constructor
106 
107 
108 
109  void set_options(const graphlab_options& opts) {
110  size_t new_ncpus = opts.get_ncpus();
111  if (new_ncpus != ncpus) {
112  logstream(LOG_INFO) << "Changing ncpus from " << ncpus << " to " << new_ncpus << std::endl;
113  ASSERT_GE(new_ncpus, 1);
114  ncpus = new_ncpus;
115  }
116  std::vector<std::string> keys = opts.get_scheduler_args().get_option_keys();
117  foreach(std::string opt, keys) {
118  if (opt == "order") {
119  opts.get_scheduler_args().get_option("order", ordering);
120  ASSERT_TRUE(ordering == "random" || ordering == "ascending");
121  } else if (opt == "strict") {
122  opts.get_scheduler_args().get_option("strict", strict_round_robin);
123  } else if (opt == "max_iterations") {
124  opts.get_scheduler_args().get_option("max_iterations", max_iterations);
125  } else if (opt == "min_priority") {
126  opts.get_scheduler_args().get_option("min_priority", min_priority);
127  } else {
128  logstream(LOG_FATAL) << "Unexpected Scheduler Option: " << opt << std::endl;
129  }
130  }
131  }
132 
133  void start() {
134  }
135 
136  void schedule(const lvid_type vid,
137  const message_type& msg) {
138  messages.add(vid, msg);
139  } // end of schedule
140 
141 
142  void schedule_from_execution_thread(const size_t cpuid,
143  const lvid_type vid) {
144  } // end of schedule
145 
146 
147  void schedule_all(const message_type& msg,
148  const std::string& order) {
149  for (lvid_type vid = 0; vid < messages.size(); ++vid)
150  schedule(vid, msg);
151  } // end of schedule_all
152 
153 
155  get_specific(lvid_type vid,
156  message_type& ret_msg) {
157  bool get_success = messages.test_and_get(vid, ret_msg);
158  if (get_success) return sched_status::NEW_TASK;
159  else return sched_status::EMPTY;
160  }
161 
162  void place(lvid_type vid,
163  const message_type& msg) {
164  messages.add(vid, msg);
165  }
166 
167  sched_status::status_enum get_next(const size_t cpuid,
168  lvid_type& ret_vid,
169  message_type& ret_msg) {
170  const size_t nverts = vids.size();
171  const size_t max_fails = (nverts/ncpus) + 1;
172  // Check to see if max iterations have been achieved
173  if(strict_round_robin && (rr_index / nverts) >= max_iterations)
174  return sched_status::EMPTY;
175  // Loop through all vertices that are associated with this
176  // processor searching for a vertex with an active task
177  for(size_t idx = get_and_inc_index(cpuid), fails = 0;
178  fails <= max_fails; //
179  idx = get_and_inc_index(cpuid), ++fails) {
180  // It is possible that the get_and_inc_index could return an
181  // invalid index if the number of cpus exceeds the number of
182  // vertices. In This case we alwasy return empty
183  if(__builtin_expect(idx >= nverts, false)) return sched_status::EMPTY;
184  const lvid_type vid = vids[idx];
185  bool success = messages.test_and_get(vid, ret_msg);
186  while(success) { // Job found now decide whether to keep it
187  if(scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
188  ret_vid = vid; return sched_status::NEW_TASK;
189  } else {
190  // Priority is insufficient so return to the schedule
191  message_type combined_message;
192  messages.add(vid, ret_msg, combined_message);
193  double ret_priority = scheduler_impl::get_message_priority(combined_message);
194  // when the job was added back it could boost the
195  // priority. If the priority is sufficiently high we have
196  // to try and remove it again. Now it is possible that if
197  // strict ordering is used it could be taken again so we
198  // may need to repeat the process.
199  if(ret_priority >= min_priority) {
200  success = messages.test_and_get(vid, ret_msg);
201  } else {
202  success = false;
203  }
204  }
205  }// end of while loop over success
206  } // end of for loop
207  return sched_status::EMPTY;
208  } // end of get_next
209 
210 
211  void completed(const size_t cpuid,
212  const lvid_type vid,
213  const message_type& msg) {
214  } // end of completed
215 
216 
217  size_t num_joins() const {
218  return messages.num_joins();
219  }
220 
221 
222 
223  static void print_options_help(std::ostream &out) {
224  out << "order = [string: {random, ascending} default=random]\n"
225  << "strict = [bool, use strict round robin schedule, default=false]\n"
226  << "min_priority = [double, minimum priority required to receive \n"
227  << "\t a message, default = -inf]\n"
228  << "max_iterations = [integer, maximum number of iterations "
229  << " (requires strict=true) \n"
230  << "\t default = inf]\n";
231  } // end of print_options_help
232 
233 
234  private:
235  inline size_t get_and_inc_index(const size_t cpuid) {
236  const size_t nverts = vids.size();
237  if (strict_round_robin) {
238  return rr_index++ % nverts;
239  } else {
240  const size_t index = cpu2index[cpuid];
241  cpu2index[cpuid] += ncpus;
242  // Address loop around
243  if (__builtin_expect(cpu2index[cpuid] >= nverts, false))
244  cpu2index[cpuid] = cpuid;
245  return index;
246  }
247  }// end of next index
248 
249  };
250 
251 
252 } // end of namespace graphlab
253 #include <graphlab/macros_undef.hpp>
254 
255 #endif
256