23 #ifndef GRAPHLAB_SWEEP_SCHEDULER_HPP
24 #define GRAPHLAB_SWEEP_SCHEDULER_HPP
30 #include <graphlab/scheduler/ischeduler.hpp>
31 #include <graphlab/parallel/atomic_add_vector2.hpp>
32 #include <graphlab/graph/graph_basic_types.hpp>
34 #include <graphlab/scheduler/get_message_priority.hpp>
35 #include <graphlab/options/graphlab_options.hpp>
37 #include <graphlab/macros_def.hpp>
43 template<
typename Message>
44 class sweep_scheduler :
public ischeduler<Message> {
46 typedef Message message_type;
52 bool strict_round_robin;
53 atomic<size_t> rr_index;
54 size_t max_iterations;
57 std::vector<lvid_type> vids;
58 std::vector<uint16_t> vid2cpu;
59 std::vector<lvid_type> cpu2index;
61 atomic_add_vector2<message_type> messages;
66 sweep_scheduler(
size_t num_vertices,
67 const graphlab_options& opts) :
68 ncpus(opts.get_ncpus()),
69 strict_round_robin(false),
70 max_iterations(std::numeric_limits<size_t>::max()),
72 messages(num_vertices),
73 min_priority(-std::numeric_limits<double>::max()) {
78 for(
size_t i = 0; i < vids.size(); ++i) vids[i] = i;
79 if (ordering ==
"ascending") {
80 logstream(
LOG_INFO) <<
"Using an ascending ordering of the vertices." << std::endl;
81 }
else if(ordering ==
"random") {
82 logstream(
LOG_INFO) <<
"Using a random ordering of the vertices." << std::endl;
86 if(strict_round_robin) {
88 <<
"Using a strict round robin schedule." << std::endl;
90 if(max_iterations != std::numeric_limits<size_t>::max()) {
92 <<
"Using maximum iterations: " << max_iterations << std::endl;
98 cpu2index.resize(ncpus);
99 for(
size_t i = 0; i < cpu2index.size(); ++i) cpu2index[i] = i;
101 vid2cpu.resize(vids.size());
102 for(
size_t i = 0; i < vids.size(); ++i) vid2cpu[vids[i]] = i % ncpus;
109 void set_options(
const graphlab_options& opts) {
110 size_t new_ncpus = opts.get_ncpus();
111 if (new_ncpus != ncpus) {
112 logstream(
LOG_INFO) <<
"Changing ncpus from " << ncpus <<
" to " << new_ncpus << std::endl;
113 ASSERT_GE(new_ncpus, 1);
116 std::vector<std::string>
keys = opts.get_scheduler_args().get_option_keys();
117 foreach(std::string opt, keys) {
118 if (opt ==
"order") {
119 opts.get_scheduler_args().get_option(
"order", ordering);
120 ASSERT_TRUE(ordering ==
"random" || ordering ==
"ascending");
121 }
else if (opt ==
"strict") {
122 opts.get_scheduler_args().get_option(
"strict", strict_round_robin);
123 }
else if (opt ==
"max_iterations") {
124 opts.get_scheduler_args().get_option(
"max_iterations", max_iterations);
125 }
else if (opt ==
"min_priority") {
126 opts.get_scheduler_args().get_option(
"min_priority", min_priority);
128 logstream(
LOG_FATAL) <<
"Unexpected Scheduler Option: " << opt << std::endl;
137 const message_type& msg) {
138 messages.add(vid, msg);
147 void schedule_all(
const message_type& msg,
148 const std::string& order) {
149 for (
lvid_type vid = 0; vid < messages.size(); ++vid)
156 message_type& ret_msg) {
157 bool get_success = messages.test_and_get(vid, ret_msg);
163 const message_type& msg) {
164 messages.add(vid, msg);
169 message_type& ret_msg) {
170 const size_t nverts = vids.size();
171 const size_t max_fails = (nverts/ncpus) + 1;
173 if(strict_round_robin && (rr_index / nverts) >= max_iterations)
177 for(
size_t idx = get_and_inc_index(cpuid), fails = 0;
179 idx = get_and_inc_index(cpuid), ++fails) {
185 bool success = messages.test_and_get(vid, ret_msg);
187 if(scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
191 message_type combined_message;
192 messages.add(vid, ret_msg, combined_message);
193 double ret_priority = scheduler_impl::get_message_priority(combined_message);
199 if(ret_priority >= min_priority) {
200 success = messages.test_and_get(vid, ret_msg);
211 void completed(
const size_t cpuid,
213 const message_type& msg) {
217 size_t num_joins()
const {
218 return messages.num_joins();
223 static void print_options_help(std::ostream &out) {
224 out <<
"order = [string: {random, ascending} default=random]\n"
225 <<
"strict = [bool, use strict round robin schedule, default=false]\n"
226 <<
"min_priority = [double, minimum priority required to receive \n"
227 <<
"\t a message, default = -inf]\n"
228 <<
"max_iterations = [integer, maximum number of iterations "
229 <<
" (requires strict=true) \n"
230 <<
"\t default = inf]\n";
235 inline size_t get_and_inc_index(
const size_t cpuid) {
236 const size_t nverts = vids.size();
237 if (strict_round_robin) {
238 return rr_index++ % nverts;
240 const size_t index = cpu2index[cpuid];
241 cpu2index[cpuid] += ncpus;
243 if (__builtin_expect(cpu2index[cpuid] >= nverts,
false))
244 cpu2index[cpuid] = cpuid;
253 #include <graphlab/macros_undef.hpp>