GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
pthread_tools.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <graphlab/parallel/pthread_tools.hpp>
25 #include <boost/bind.hpp>
26 #include <graphlab/macros_def.hpp>
27 
28 namespace graphlab {
29 
30  // Some magic to ensure that keys are created at program startup =========>
31  void destroy_tls_data(void* ptr);
32  struct thread_keys {
33  pthread_key_t GRAPHLAB_TSD_ID;
34  thread_keys() : GRAPHLAB_TSD_ID(0) {
35  pthread_key_create(&GRAPHLAB_TSD_ID,
37  }
38  };
39  // This function is to be called prior to any thread starting
40  // execution to ensure that the static member keys is constructed
41  // prior to any threads launching
42  static pthread_key_t get_tsd_id() {
43  static thread_keys keys;
44  return keys.GRAPHLAB_TSD_ID;
45  }
46  // This forces get_tsd_id to be called prior to main.
47  static pthread_key_t __unused_init_keys__(get_tsd_id());
48 
49  // the combination of the two mechanisms above will force the
50  // thread local store to be initialized
51  // 1: before main
52  // 2: before any other global variables which spawn threads
53 
54  // END MAGIC =============================================================>
55 
56 // -----------------------------------------------------------------
57 // Thread Object Static Members
58 // -----------------------------------------------------------------
59 
60 
61  /**
62  * Create thread specific data
63  */
64  thread::tls_data* create_tls_data(size_t thread_id = 0) {
65  // Require that the data not yet exist
66  assert(pthread_getspecific(get_tsd_id()) == NULL);
67  // Create the data
68  thread::tls_data* data =
69  new thread::tls_data(thread_id);
70  assert(data != NULL);
71  // Set the data
72  pthread_setspecific(get_tsd_id(), data);
73  // Return the associated tsd
74  return data;
75  } // end create the thread specific data
76 
77  /**
78  * This function tries to get the thread specific data. If no
79  * thread specific data has been associated with the thread than it
80  * is created.
81  */
83  // get the tsd
84  tls_data* tsd =
85  reinterpret_cast<tls_data*>
86  (pthread_getspecific(get_tsd_id()));
87  // If no tsd be has been associated, create one
88  if(tsd == NULL) tsd = create_tls_data();
89  assert(tsd != NULL);
90  return *tsd;
91  } // end of get thread specific data
92 
93 
94  /**
95  * Create thread specific data
96  */
97  void destroy_tls_data(void* ptr) {
98  thread::tls_data* tsd =
99  reinterpret_cast<thread::tls_data*>(ptr);
100  if(tsd != NULL) {
101  delete tsd;
102  }
103  } // end destroy the thread specific data
104 
105 
106 
107 
108  //! Little helper function used to launch threads
109  void* thread::invoke(void *_args) {
110  void* retval = NULL;
111  thread::invoke_args* args = static_cast<thread::invoke_args*>(_args);
112  // Create the graphlab thread specific data
113  create_tls_data(args->m_thread_id);
114  //! Run the users thread code
115  try {
116  args->spawn_routine();
117  }
118  catch (const char* msg) {
119  retval = (void*)msg;
120  }
121  //! Delete the arguments
122  delete args;
123 
124  //! Properly kill the thread
125  thread_destroy_callback();
126  return retval;
127  } // end of invoke
128 
129 
130 
131 
132 
133  /**
134  * This static method joins the invoking thread with the other
135  * thread object. This thread will not return from the join
136  * routine until the other thread complets it run.
137  */
138  void thread::join(thread& other) {
139  void *status = NULL;
140  // joint the first element
141  int error = 0;
142  if(other.active()) {
143  error = pthread_join( other.m_p_thread, &status);
144  if (status != NULL) {
145  const char* strstatus = (const char*) status;
146  throw strstatus;
147  }
148  }
149  if(error) {
150  std::cout << "Major error in join" << std::endl;
151  std::cout << "pthread_join() returned error " << error << std::endl;
152  exit(EXIT_FAILURE);
153  }
154  } // end of join
155 
156 
157  /**
158  * Return the number processing units (individual cores) on this
159  * system
160  */
161  size_t thread::cpu_count() {
162 #if defined __linux__
163  return sysconf(_SC_NPROCESSORS_CONF);
164 #elif defined(__MACH__) && defined(_SC_NPROCESSORS_ONLN)
165  return sysconf (_SC_NPROCESSORS_ONLN);
166 #elif defined(__MACH__) && defined(HW_NCPU)
167  int ncpus = 1;
168  size_t len = sizeof(ncpus);
169  sysctl((int[2]) {CTL_HW, HW_NCPU}, 2, &ncpus, &len, NULL, 0);
170  return ncpus;
171 #else
172  return 0;
173 #endif
174  } // end of cpu count
175 
176  /**
177  * Allow defining a callback when thread is destroyed.
178  * This is needed at least from Java JNI, where we have to detach
179  * thread from JVM before it dies.
180  */
181  void (*__thr_callback)() = NULL;
182 
183  void thread::thread_destroy_callback() {
184  if (__thr_callback != NULL) __thr_callback();
185  }
186 
187  void thread::set_thread_destroy_callback(void (*callback)()) {
188  __thr_callback = callback;
189  }
190 
191 
192 // -----------------------------------------------------------------
193 // Thread Object Public Members
194 // -----------------------------------------------------------------
195 
196 
197  void thread::launch(const boost::function<void (void)> &spawn_routine) {
198  get_tsd_id();
199  ASSERT_FALSE(thread_started);
200  // fill in the thread attributes
201  pthread_attr_t attr;
202  int error = 0;
203  error = pthread_attr_init(&attr);
204  ASSERT_TRUE(!error);
205  error = pthread_attr_setstacksize(&attr, m_stack_size);
206  ASSERT_TRUE(!error);
207  error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
208  ASSERT_TRUE(!error);
209  error =
210  pthread_create(&m_p_thread,
211  &attr,
212  invoke,
213  static_cast<void*>(new invoke_args(m_thread_id,
214  spawn_routine)) );
215  thread_started = true;
216  if(error) {
217  std::cout << "Major error in thread_group.launch (pthread_create). Error: "
218  << error << std::endl;
219  exit(EXIT_FAILURE);
220  }
221  // destroy the attribute object
222  error = pthread_attr_destroy(&attr);
223  ASSERT_TRUE(!error);
224  }
225 
226  void thread::launch(const boost::function<void (void)> &spawn_routine,
227  size_t cpu_id){
228  get_tsd_id();
229  // if this is not a linux based system simply invoke start and
230  // return;
231 #ifndef __linux__
232  launch(spawn_routine);
233  return;
234 #else
235  ASSERT_FALSE(thread_started);
236  if (cpu_id == size_t(-1)) {
237  launch(spawn_routine);
238  return;
239  }
240  if (cpu_count() > 0) {
241  cpu_id = cpu_id % cpu_count();
242  }
243  else {
244  // unknown CPU count
245  launch(spawn_routine);
246  return;
247  }
248 
249  // fill in the thread attributes
250  pthread_attr_t attr;
251  int error = 0;
252  error = pthread_attr_init(&attr);
253  ASSERT_TRUE(!error);
254  error = pthread_attr_setstacksize(&attr, m_stack_size);
255  ASSERT_TRUE(!error);
256  error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
257  ASSERT_TRUE(!error);
258 
259 #ifdef HAS_SET_AFFINITY
260  // Set Processor Affinity masks (linux only)
261  cpu_set_t cpu_set;
262  CPU_ZERO(&cpu_set);
263  CPU_SET(cpu_id % CPU_SETSIZE, &cpu_set);
264 
265  pthread_attr_setaffinity_np(&attr, sizeof(cpu_set), &cpu_set);
266 #endif
267 
268  // Launch the thread
269  error = pthread_create(&m_p_thread,
270  &attr,
271  invoke,
272  static_cast<void*>(new invoke_args(m_thread_id,
273  spawn_routine)));
274  thread_started = true;
275  if(error) {
276  std::cout << "Major error in thread_group.launch" << std::endl;
277  std::cout << "pthread_create() returned error " << error << std::endl;
278  exit(EXIT_FAILURE);
279  }
280 
281 
282 
283  // destroy the attribute object
284  error = pthread_attr_destroy(&attr);
285  ASSERT_TRUE(!error);
286 #endif
287  }
288 
289  // -----------------------------------------------------------------
290  // Thread Group Object Public Members
291  // -----------------------------------------------------------------
292  // thread group exception forwarding is a little more complicated
293  // because it has to be able to catch it on a bunch of threads
294 
295  void thread_group::invoke(boost::function<void (void)> spawn_function,
296  thread_group *group) {
297  const char* retval = NULL;
298  try {
299  spawn_function();
300  }
301  catch (const char* c) {
302  // signal the thread group to join this thread
303  retval = c;
304  }
305  group->mut.lock();
306  group->joinqueue.push(std::make_pair(pthread_self(), retval));
307  group->cond.signal();
308  group->mut.unlock();
309 
310  }
311 
312 
313  void thread_group::launch(const boost::function<void (void)> &spawn_function) {
314  // Create a thread object and launch it.
315  // We do not need to keep a copy of the thread around
316  thread local_thread(m_thread_counter++);
317  mut.lock();
318  threads_running++;
319  mut.unlock();
320  local_thread.launch(boost::bind(thread_group::invoke, spawn_function, this));
321  }
322 
323 
324  void thread_group::launch(const boost::function<void (void)> &spawn_function,
325  size_t cpu_id) {
326  if (cpu_id == size_t(-1)) {
327  launch(spawn_function);
328  return;
329  }
330  // Create a thread object
331  thread local_thread(m_thread_counter++);
332  mut.lock();
333  threads_running++;
334  mut.unlock();
335  local_thread.launch(boost::bind(thread_group::invoke, spawn_function, this),
336  cpu_id);
337  } // end of launch
338 
340  mut.lock();
341  while(threads_running > 0) {
342  // if no threads are joining. wait
343  while (joinqueue.empty()) cond.wait(mut);
344  // a thread is joining
345  std::pair<pthread_t, const char*> joining_thread = joinqueue.front();
346  joinqueue.pop();
347  threads_running--;
348  // Reset the thread counter after killing all threads
349  if(threads_running == 0) m_thread_counter = 0;
350  // unlock here since I might be in join for a little while
351  mut.unlock();
352  void *unusedstatus = NULL;
353  pthread_join(joining_thread.first, &unusedstatus);
354  // if there is a return value
355  // throw it. It is safe to throw here since I have the mutex unlocked.
356  if (joining_thread.second) {
357  throw(joining_thread.second);
358  }
359  mut.lock();
360  }
361  mut.unlock();
362  } // end of join
363 
364 
365 } // end of namespace graphlab
366