28 #ifndef GRAPHLAB_PRIORITY_SCHEDULER_HPP
29 #define GRAPHLAB_PRIORITY_SCHEDULER_HPP
36 #include <graphlab/graph/graph_basic_types.hpp>
37 #include <graphlab/parallel/pthread_tools.hpp>
39 #include <graphlab/util/mutable_queue.hpp>
40 #include <graphlab/scheduler/ischeduler.hpp>
41 #include <graphlab/parallel/atomic_add_vector2.hpp>
42 #include <graphlab/scheduler/get_message_priority.hpp>
43 #include <graphlab/options/graphlab_options.hpp>
46 #include <graphlab/macros_def.hpp>
51 template<
typename Message>
52 class priority_scheduler :
public ischeduler<Message> {
55 typedef Message message_type;
57 typedef mutable_queue<lvid_type, double> queue_type;
60 atomic_add_vector2<message_type> messages;
61 std::vector<queue_type> queues;
62 std::vector<spinlock> locks;
64 std::vector<size_t> current_queue;
74 priority_scheduler(
size_t num_vertices,
75 const graphlab_options& opts) :
76 messages(num_vertices), multi(3),
77 current_queue(opts.get_ncpus()),
78 min_priority(-std::numeric_limits<double>::max()) {
82 void set_options(
const graphlab_options& opts) {
83 size_t new_ncpus = opts.get_ncpus();
85 if (new_ncpus != current_queue.size()) {
86 logstream(
LOG_INFO) <<
"Changing ncpus from " << current_queue.size()
87 <<
" to " << new_ncpus << std::endl;
88 ASSERT_GE(new_ncpus, 1);
89 current_queue.resize(new_ncpus);
92 std::vector<std::string>
keys = opts.get_scheduler_args().get_option_keys();
93 foreach(std::string opt, keys) {
95 opts.get_scheduler_args().get_option(
"multi", multi);
96 }
else if (opt ==
"min_priority") {
97 opts.get_scheduler_args().get_option(
"min_priority", min_priority);
99 logstream(
LOG_FATAL) <<
"Unexpected Scheduler Option: " << opt << std::endl;
103 const size_t nqueues = std::max(multi*current_queue.size(), size_t(1));
106 if (nqueues != queues.size()) {
107 std::vector<queue_type> old_queues;
108 std::swap(old_queues, queues);
109 queues.resize(nqueues);
110 locks.resize(nqueues);
113 for (
size_t i = 0;i < old_queues.size(); ++i) {
114 while (!old_queues[i].empty()) {
115 queues[idx].push(old_queues[i].top().first,
116 old_queues[i].top().second);
128 const message_type& msg) {
129 const size_t idx = vid % queues.size();
130 message_type combined_message;
132 messages.add(vid, msg, combined_message);
134 if (scheduler_impl::get_message_priority(combined_message) >= min_priority) {
136 queues[idx].push_or_update(vid,
137 scheduler_impl::get_message_priority(combined_message));
146 const size_t idx = vid % queues.size();
147 message_type combined_message;
149 messages.peek(vid, combined_message);
151 if (scheduler_impl::get_message_priority(combined_message) >= min_priority) {
153 queues[idx].push_or_update(vid,
154 scheduler_impl::get_message_priority(combined_message));
161 void schedule_all(
const message_type& msg,
162 const std::string& order) {
163 if(order ==
"shuffle") {
165 random::permutation<lvid_type>(messages.size());
168 for (
lvid_type vid = 0; vid < messages.size(); ++vid)
173 void completed(
const size_t cpuid,
175 const message_type& msg) { }
181 message_type& ret_msg) {
184 for(
size_t i = 0; i < multi; ++i) {
185 const size_t idx = (++current_queue[cpuid] % multi) + cpuid * multi;
187 if(!queues[idx].empty() &&
188 queues[idx].top().second >= min_priority) {
189 ret_vid = queues[idx].pop().first;
190 const bool get_success = messages.test_and_get(ret_vid, ret_msg);
198 for(
size_t i = 0; i < queues.size(); ++i) {
199 const size_t idx = ++current_queue[cpuid] % queues.size();
200 if(!queues[idx].empty()) {
202 if(!queues[idx].empty() &&
203 queues[idx].top().second >= min_priority) {
204 ret_vid = queues[idx].pop().first;
205 const bool get_success = messages.test_and_get(ret_vid, ret_msg);
219 size_t num_joins()
const {
220 return messages.num_joins();
227 message_type& ret_msg) {
228 bool get_success = messages.test_and_get(vid, ret_msg);
234 const message_type& msg) {
235 messages.add(vid, msg);
239 static void print_options_help(std::ostream& out) {
240 out <<
"\t multi = [number of queues per thread. Default = 3].\n"
241 <<
"min_priority = [double, minimum priority required to receive \n"
242 <<
"\t a message, default = -inf]\n";
249 #include <graphlab/macros_undef.hpp>