24 #include <graphlab/parallel/pthread_tools.hpp>
25 #include <boost/bind.hpp>
26 #include <graphlab/macros_def.hpp>
33 pthread_key_t GRAPHLAB_TSD_ID;
34 thread_keys() : GRAPHLAB_TSD_ID(0) {
35 pthread_key_create(&GRAPHLAB_TSD_ID,
42 static pthread_key_t get_tsd_id() {
43 static thread_keys
keys;
44 return keys.GRAPHLAB_TSD_ID;
47 static pthread_key_t __unused_init_keys__(get_tsd_id());
66 assert(pthread_getspecific(get_tsd_id()) == NULL);
72 pthread_setspecific(get_tsd_id(), data);
86 (pthread_getspecific(get_tsd_id()));
109 void* thread::invoke(
void *_args) {
111 thread::invoke_args* args =
static_cast<thread::invoke_args*
>(_args);
116 args->spawn_routine();
118 catch (
const char* msg) {
125 thread_destroy_callback();
143 error = pthread_join( other.m_p_thread, &status);
144 if (status != NULL) {
145 const char* strstatus = (
const char*) status;
150 std::cout <<
"Major error in join" << std::endl;
151 std::cout <<
"pthread_join() returned error " << error << std::endl;
162 #if defined __linux__
163 return sysconf(_SC_NPROCESSORS_CONF);
164 #elif defined(__MACH__) && defined(_SC_NPROCESSORS_ONLN)
165 return sysconf (_SC_NPROCESSORS_ONLN);
166 #elif defined(__MACH__) && defined(HW_NCPU)
168 size_t len =
sizeof(ncpus);
169 sysctl((
int[2]) {CTL_HW, HW_NCPU}, 2, &ncpus, &len, NULL, 0);
183 void thread::thread_destroy_callback() {
187 void thread::set_thread_destroy_callback(
void (*callback)()) {
199 ASSERT_FALSE(thread_started);
203 error = pthread_attr_init(&attr);
205 error = pthread_attr_setstacksize(&attr, m_stack_size);
207 error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
210 pthread_create(&m_p_thread,
213 static_cast<void*>(
new invoke_args(m_thread_id,
215 thread_started =
true;
217 std::cout <<
"Major error in thread_group.launch (pthread_create). Error: "
218 << error << std::endl;
222 error = pthread_attr_destroy(&attr);
235 ASSERT_FALSE(thread_started);
236 if (cpu_id ==
size_t(-1)) {
252 error = pthread_attr_init(&attr);
254 error = pthread_attr_setstacksize(&attr, m_stack_size);
256 error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
259 #ifdef HAS_SET_AFFINITY
263 CPU_SET(cpu_id % CPU_SETSIZE, &cpu_set);
265 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set), &cpu_set);
269 error = pthread_create(&m_p_thread,
272 static_cast<void*>(
new invoke_args(m_thread_id,
274 thread_started =
true;
276 std::cout <<
"Major error in thread_group.launch" << std::endl;
277 std::cout <<
"pthread_create() returned error " << error << std::endl;
284 error = pthread_attr_destroy(&attr);
295 void thread_group::invoke(boost::function<
void (
void)> spawn_function,
297 const char* retval = NULL;
301 catch (
const char* c) {
306 group->joinqueue.push(std::make_pair(pthread_self(), retval));
316 thread local_thread(m_thread_counter++);
320 local_thread.
launch(boost::bind(thread_group::invoke, spawn_function,
this));
326 if (cpu_id ==
size_t(-1)) {
331 thread local_thread(m_thread_counter++);
335 local_thread.
launch(boost::bind(thread_group::invoke, spawn_function,
this),
341 while(threads_running > 0) {
343 while (joinqueue.empty()) cond.
wait(mut);
345 std::pair<pthread_t, const char*> joining_thread = joinqueue.front();
349 if(threads_running == 0) m_thread_counter = 0;
352 void *unusedstatus = NULL;
353 pthread_join(joining_thread.first, &unusedstatus);
356 if (joining_thread.second) {
357 throw(joining_thread.second);