GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
thread_pool.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 #include <graphlab/parallel/thread_pool.hpp>
26 #include <graphlab/logger/assertions.hpp>
27 
28 namespace graphlab {
29 
30 
31  thread_pool::thread_pool(size_t nthreads, bool affinity) {
32  waiting_on_join = false;
33  tasks_inserted = 0;
34  tasks_completed = 0;
35  cpu_affinity = affinity;
36  pool_size = nthreads;
37  spawn_thread_group();
38  } // end of thread_pool
39 
40 
41  void thread_pool::resize(size_t nthreads) {
42  // if the current pool size does not equal the requested number of
43  // threads shut the pool down and startup with correct number of
44  // threads. \todo: If the pool size is too small just add
45  // additional threads rather than destroying the pool
46  if(nthreads != pool_size) {
47  pool_size = nthreads;
48 
49  // stop the queue from blocking
50  spawn_queue.stop_blocking();
51 
52  // join the threads in the thread group
53  while(true) {
54  try {
55  threads.join(); break;
56  } catch (const char* error_str) {
57  // this should not be possible!
58  logstream(LOG_FATAL)
59  << "Unexpected exception caught in thread pool destructor: "
60  << error_str << std::endl;
61  }
62  }
63  spawn_queue.start_blocking();
64  spawn_thread_group();
65  }
66  } // end of set_nthreads
67 
68 
69  size_t thread_pool::size() const { return pool_size; }
70 
71 
72  /**
73  Creates the thread group
74  */
75  void thread_pool::spawn_thread_group() {
76  size_t ncpus = thread::cpu_count();
77  // start all the threads if CPU affinity is set
78  for (size_t i = 0;i < pool_size; ++i) {
79  if (cpu_affinity) {
80  threads.launch(boost::bind(&thread_pool::wait_for_task, this), i % ncpus);
81  }
82  else {
83  threads.launch(boost::bind(&thread_pool::wait_for_task, this));
84  }
85  }
86  } // end of spawn_thread_group
87 
88 
89  void thread_pool::destroy_all_threads() {
90  // wait for all execution to complete
91  spawn_queue.wait_until_empty();
92  // kill the queue
93  spawn_queue.stop_blocking();
94 
95  // join the threads in the thread group
96  while(1) {
97  try {
98  threads.join();
99  break;
100  }
101  catch (const char* c) {
102  // this should not be possible!
103  logstream(LOG_FATAL)
104  << "Unexpected exception caught in thread pool destructor: "
105  << c << std::endl;
106  ASSERT_TRUE(false);
107  }
108  }
109  } // end of destroy_all_threads
110 
111  void thread_pool::set_cpu_affinity(bool affinity) {
112  if (affinity != cpu_affinity) {
113  cpu_affinity = affinity;
114  // stop the queue from blocking
115  spawn_queue.stop_blocking();
116 
117  // join the threads in the thread group
118  while(1) {
119  try {
120  threads.join(); break;
121  } catch (const char* c) {
122  // this should not be possible!
123  logstream(LOG_FATAL)
124  << "Unexpected exception caught in thread pool destructor: "
125  << c << std::endl;
126  // ASSERT_TRUE(false); // unnecessary
127  }
128  }
129  spawn_queue.start_blocking();
130  spawn_thread_group();
131  }
132  } // end of set_cpu_affinity
133 
134 
135  void thread_pool::wait_for_task() {
136  while(1) {
137  std::pair<std::pair<boost::function<void (void)>, int>, bool> queue_entry;
138  // pop from the queue
139  queue_entry = spawn_queue.dequeue();
140  if (queue_entry.second) {
141  // try to run the function. remember to put it in a try catch
142  try {
143  int virtual_thread_id = queue_entry.first.second;
144  size_t cur_thread_id = thread::thread_id();
145  if (virtual_thread_id != -1) {
146  thread::set_thread_id(virtual_thread_id);
147  }
148  queue_entry.first.first();
149  thread::set_thread_id(cur_thread_id);
150  } catch(const char* ex) {
151  // if an exception was raised, put it in the exception queue
152  mut.lock();
153  exception_queue.push(ex);
154  event_condition.signal();
155  mut.unlock();
156  }
157 
158  mut.lock();
159  tasks_completed++;
160  // the waiting on join flag just prevents me from
161  // signaling every time completed == inserted. Which could be very
162  // very often
163  if (waiting_on_join &&
164  tasks_completed == tasks_inserted) event_condition.signal();
165  mut.unlock();
166  }
167  else {
168  // quit if the queue is dead
169  break;
170  }
171  }
172  } // end of wait_for_task
173 
174  void thread_pool::launch(const boost::function<void (void)> &spawn_function,
175  int thread_id) {
176  mut.lock();
177  tasks_inserted++;
178  spawn_queue.enqueue(std::make_pair(spawn_function, thread_id));
179  mut.unlock();
180  }
181 
183  std::pair<bool, bool> eventret;
184  // first we wait for the queue to empty
185  spawn_queue.wait_until_empty();
186 
187  mut.lock();
188  waiting_on_join = true;
189  while(1) {
190  // check the exception queue.
191  if (!exception_queue.empty()) {
192  // pop an exception
193  const char* ex = exception_queue.front();
194  exception_queue.pop();
195  // unlock and throw the event
196  waiting_on_join = false;
197  mut.unlock();
198  throw(ex);
199  }
200  // nothing to throw, check if all tasks were completed
201  if (tasks_completed == tasks_inserted) {
202  // yup
203  break;
204  }
205  event_condition.wait(mut);
206  }
207  waiting_on_join = false;
208  mut.unlock();
209  }
210 
211 
213  destroy_all_threads();
214  }
215 
216 }