24 #ifndef GRAPHLAB_QUEUED_FIFO_SCHEDULER_HPP
25 #define GRAPHLAB_QUEUED_FIFO_SCHEDULER_HPP
31 #include <graphlab/graph/graph_basic_types.hpp>
32 #include <graphlab/parallel/pthread_tools.hpp>
33 #include <graphlab/parallel/atomic.hpp>
36 #include <graphlab/scheduler/ischeduler.hpp>
37 #include <graphlab/parallel/atomic_add_vector2.hpp>
39 #include <graphlab/scheduler/get_message_priority.hpp>
40 #include <graphlab/options/graphlab_options.hpp>
43 #include <graphlab/macros_def.hpp>
56 template<
typename Message>
61 typedef Message message_type;
63 typedef std::deque<lvid_type> queue_type;
67 std::deque<queue_type> master_queue;
69 size_t sub_queue_size;
70 std::vector<queue_type> in_queues;
71 std::vector<mutex> in_queue_locks;
72 std::vector<queue_type> out_queues;
79 messages(num_vertices),
83 min_priority(-std::numeric_limits<double>::max()){
90 if (new_ncpus != in_queues.size()) {
91 logstream(
LOG_INFO) <<
"Changing ncpus from " << in_queues.size()
92 <<
" to " << new_ncpus << std::endl;
93 ASSERT_GE(new_ncpus, 1);
96 for (
size_t i = 0;i < in_queues.size(); ++i) {
97 master_queue.push_back(in_queues[i]);
101 in_queues.resize(new_ncpus);
102 in_queue_locks.resize(new_ncpus);
103 out_queues.resize(new_ncpus);
107 std::vector<std::string>
keys = opts.get_scheduler_args().get_option_keys();
108 foreach(std::string opt, keys) {
109 if (opt ==
"queuesize") {
110 opts.get_scheduler_args().
get_option(
"queuesize", sub_queue_size);
111 }
else if (opt ==
"min_priority") {
112 opts.get_scheduler_args().
get_option(
"min_priority", min_priority);
114 logstream(
LOG_FATAL) <<
"Unexpected Scheduler Option: " << opt << std::endl;
121 for (
size_t i = 0;i < in_queues.size(); ++i) {
122 master_queue.push_back(in_queues[i]);
123 in_queues[i].clear();
129 const message_type& msg) {
132 if (messages.
add(vid, msg)) {
134 in_queue_locks[cpuid].lock();
135 queue_type& queue = in_queues[cpuid];
136 queue.push_back(vid);
137 if(queue.size() > sub_queue_size) {
140 master_queue.push_back(emptyq);
141 master_queue.back().swap(queue);
144 in_queue_locks[cpuid].unlock();
150 if (!messages.empty(vid)) {
151 ASSERT_LT(cpuid, in_queues.size());
152 in_queue_locks[cpuid].lock();
153 queue_type& queue = in_queues[cpuid];
154 queue.push_back(vid);
155 if(queue.size() > sub_queue_size) {
158 master_queue.push_back(emptyq);
159 master_queue.back().swap(queue);
162 in_queue_locks[cpuid].unlock();
167 const std::string& order) {
168 if(order ==
"shuffle") {
170 random::permutation<lvid_type>(messages.size());
173 for (
lvid_type vid = 0; vid < messages.size(); ++vid)
180 const message_type& msg) {
186 message_type& ret_msg) {
193 const message_type& msg) {
194 messages.
add(vid, msg);
201 message_type& ret_msg) {
204 if(out_queues[cpuid].empty()) {
206 if(!master_queue.empty()) {
207 out_queues[cpuid].swap(master_queue.front());
208 master_queue.pop_front();
214 in_queue_locks[cpuid].lock();
215 if(out_queues[cpuid].empty() && !in_queues[cpuid].empty()) {
216 out_queues[cpuid].swap(in_queues[cpuid]);
218 in_queue_locks[cpuid].unlock();
220 queue_type& queue = out_queues[cpuid];
222 ret_vid = queue.front();
225 if (scheduler_impl::get_message_priority(ret_msg) >= min_priority) {
230 message_type combined_message;
231 messages.
add(ret_vid, ret_msg, combined_message);
232 double ret_priority = scheduler_impl::get_message_priority(combined_message);
233 if(ret_priority >= min_priority) {
236 in_queue_locks[cpuid].lock();
237 in_queues[cpuid].push_back(ret_vid);
238 in_queue_locks[cpuid].unlock();
250 return messages.num_joins();
257 out <<
"\t queuesize: [the size at which a subqueue is "
258 <<
"placed in the master queue. default = 100]\n"
259 <<
"min_priority = [double, minimum priority required to receive \n"
260 <<
"\t a message, default = -inf]\n";
268 #include <graphlab/macros_undef.hpp>