25 #include <graphlab/parallel/thread_pool.hpp>
26 #include <graphlab/logger/assertions.hpp>
31 thread_pool::thread_pool(
size_t nthreads,
bool affinity) {
32 waiting_on_join =
false;
35 cpu_affinity = affinity;
46 if(nthreads != pool_size) {
55 threads.
join();
break;
56 }
catch (
const char* error_str) {
59 <<
"Unexpected exception caught in thread pool destructor: "
60 << error_str << std::endl;
75 void thread_pool::spawn_thread_group() {
78 for (
size_t i = 0;i < pool_size; ++i) {
80 threads.
launch(boost::bind(&thread_pool::wait_for_task,
this), i % ncpus);
83 threads.
launch(boost::bind(&thread_pool::wait_for_task,
this));
89 void thread_pool::destroy_all_threads() {
101 catch (
const char* c) {
104 <<
"Unexpected exception caught in thread pool destructor: "
112 if (affinity != cpu_affinity) {
113 cpu_affinity = affinity;
120 threads.
join();
break;
121 }
catch (
const char* c) {
124 <<
"Unexpected exception caught in thread pool destructor: "
130 spawn_thread_group();
135 void thread_pool::wait_for_task() {
137 std::pair<std::pair<boost::function<void (void)>,
int>,
bool> queue_entry;
139 queue_entry = spawn_queue.
dequeue();
140 if (queue_entry.second) {
143 int virtual_thread_id = queue_entry.first.second;
145 if (virtual_thread_id != -1) {
148 queue_entry.first.first();
150 }
catch(
const char* ex) {
153 exception_queue.push(ex);
163 if (waiting_on_join &&
164 tasks_completed == tasks_inserted) event_condition.
signal();
178 spawn_queue.
enqueue(std::make_pair(spawn_function, thread_id));
183 std::pair<bool, bool> eventret;
188 waiting_on_join =
true;
191 if (!exception_queue.empty()) {
193 const char* ex = exception_queue.front();
194 exception_queue.pop();
196 waiting_on_join =
false;
201 if (tasks_completed == tasks_inserted) {
205 event_condition.
wait(mut);
207 waiting_on_join =
false;
213 destroy_all_threads();