24 #ifndef GRAPHLAB_FIFO_SCHEDULER_HPP
25 #define GRAPHLAB_FIFO_SCHEDULER_HPP
30 #include <graphlab/graph/graph_basic_types.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/parallel/atomic.hpp>
34 #include <graphlab/util/random.hpp>
35 #include <graphlab/scheduler/ischeduler.hpp>
36 #include <graphlab/parallel/atomic_add_vector2.hpp>
38 #include <graphlab/scheduler/get_message_priority.hpp>
39 #include <graphlab/options/graphlab_options.hpp>
41 #include <graphlab/macros_def.hpp>
54 template<
typename Message>
59 typedef Message message_type;
62 typedef std::deque<lvid_type> queue_type;
67 std::vector<queue_type> queues;
68 std::vector<spinlock> locks;
70 std::vector<size_t> current_queue;
79 messages(num_vertices), multi(3),
81 min_priority(-std::numeric_limits<double>::max()) {
89 if (new_ncpus != current_queue.size()) {
90 logstream(
LOG_INFO) <<
"Changing ncpus from " << current_queue.size()
91 <<
" to " << new_ncpus << std::endl;
92 ASSERT_GE(new_ncpus, 1);
93 current_queue.resize(new_ncpus);
96 std::vector<std::string>
keys = opts.get_scheduler_args().get_option_keys();
97 foreach(std::string opt, keys) {
99 opts.get_scheduler_args().
get_option(
"multi", multi);
100 }
else if (opt ==
"min_priority") {
101 opts.get_scheduler_args().
get_option(
"min_priority", min_priority);
103 logstream(
LOG_FATAL) <<
"Unexpected Scheduler Option: " << opt << std::endl;
107 const size_t nqueues = std::max(multi*current_queue.size(), size_t(1));
110 if (nqueues != queues.size()) {
111 std::vector<queue_type> old_queues;
112 std::swap(old_queues, queues);
113 queues.resize(nqueues);
114 locks.resize(nqueues);
117 for (
size_t i = 0;i < old_queues.size(); ++i) {
118 while (!old_queues[i].empty()) {
119 queues[idx].push_back(old_queues[i].front());
120 old_queues[i].pop_front();
133 const message_type& msg) {
136 if (messages.
add(vid, msg)) {
146 if(queues.size() > 1) {
147 const uint32_t prod =
149 uint32_t(queues.size() * queues.size() - 1));
150 const uint32_t r1 = prod / queues.size();
151 const uint32_t r2 = prod % queues.size();
152 idx = (queues[r1].size() < queues[r2].size()) ? r1 : r2;
154 locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
162 if(queues.size() > 1) {
163 const uint32_t prod =
165 uint32_t(queues.size() * queues.size() - 1));
166 const uint32_t r1 = prod / queues.size();
167 const uint32_t r2 = prod % queues.size();
168 idx = (queues[r1].size() < queues[r2].size()) ? r1 : r2;
170 locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
173 const std::string& order) {
174 if(order ==
"shuffle") {
177 random::permutation<lvid_type>(messages.size());
179 if(messages.
add(vid,msg)) {
180 const size_t idx = vid % queues.size();
181 locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
186 for (
lvid_type vid = 0; vid < messages.size(); ++vid) {
187 if(messages.
add(vid,msg)) {
188 const size_t idx = vid % queues.size();
189 locks[idx].lock(); queues[idx].push_back(vid); locks[idx].unlock();
197 const message_type& msg) { }
203 message_type& ret_msg) {
206 for(
size_t i = 0; i < multi; ++i) {
207 const size_t idx = (++current_queue[cpuid] % multi) + cpuid * multi;
209 if(!queues[idx].empty()) {
210 ret_vid = queues[idx].front();
211 queues[idx].pop_front();
213 const bool get_success = messages.
test_and_get(ret_vid, ret_msg);
217 if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
222 message_type combined_message;
223 messages.
add(ret_vid, ret_msg, combined_message);
224 double ret_priority = scheduler_impl::get_message_priority(combined_message);
225 if(ret_priority >= min_priority) {
227 queues[idx].push_back(ret_vid);
239 for(
size_t i = 0; i < queues.size(); ++i) {
240 const size_t idx = ++current_queue[cpuid] % queues.size();
241 if(!queues[idx].empty()) {
243 if(!queues[idx].empty()) {
244 ret_vid = queues[idx].front();
245 queues[idx].pop_front();
247 const bool get_success = messages.
test_and_get(ret_vid, ret_msg);
250 if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
255 message_type combined_message;
256 messages.
add(ret_vid, ret_msg, combined_message);
257 double ret_priority = scheduler_impl::get_message_priority(combined_message);
258 if(ret_priority >= min_priority) {
260 queues[idx].push_back(ret_vid);
280 message_type& ret_msg) {
287 const message_type& msg) {
288 messages.
add(vid, msg);
293 return messages.num_joins();
297 out <<
"\t multi = [number of queues per thread. Default = 3].\n"
298 <<
"min_priority = [double, minimum priority required to receive \n"
299 <<
"\t a message, default = -inf]\n";
307 #include <graphlab/macros_undef.hpp>