GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
pthread_tools.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_PTHREAD_TOOLS_HPP
25 #define GRAPHLAB_PTHREAD_TOOLS_HPP
26 
27 
28 #include <cstdlib>
29 #include <pthread.h>
30 #include <semaphore.h>
31 #include <sched.h>
32 #include <signal.h>
33 #include <sys/time.h>
34 #include <vector>
35 #include <list>
36 #include <queue>
37 #include <iostream>
38 #include <boost/function.hpp>
39 #include <graphlab/logger/assertions.hpp>
40 #include <graphlab/parallel/atomic_ops.hpp>
41 #include <graphlab/util/generics/any.hpp>
42 #include <graphlab/util/branch_hints.hpp>
43 #include <boost/unordered_map.hpp>
44 #undef _POSIX_SPIN_LOCKS
45 #define _POSIX_SPIN_LOCKS -1
46 
47 
48 #include <graphlab/parallel/mutex.hpp>
49 
50 
51 
52 
53 namespace graphlab {
54 
55 
56 
57 
58 
59 #if _POSIX_SPIN_LOCKS >= 0
60  /**
61  * \ingroup util
62  *
63  * Wrapper around pthread's spinlock.
64  *
65  * Before you use, see \ref parallel_object_intricacies.
66  */
67  class spinlock {
68  private:
69  // mutable not actually needed
70  mutable pthread_spinlock_t m_spin;
71  public:
72  /// constructs a spinlock
73  spinlock () {
74  int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
75  ASSERT_TRUE(!error);
76  }
77 
78  /** Copy constructor which does not copy. Do not use!
79  Required for compatibility with some STL implementations (LLVM).
80  which use the copy constructor for vector resize,
81  rather than the standard constructor. */
82  spinlock(const spinlock&) {
83  int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
84  ASSERT_TRUE(!error);
85  }
86 
87  // not copyable
88  void operator=(const spinlock& m) { }
89 
90 
91  /// Acquires a lock on the spinlock
92  inline void lock() const {
93  int error = pthread_spin_lock( &m_spin );
94  ASSERT_TRUE(!error);
95  }
96  /// Releases a lock on the spinlock
97  inline void unlock() const {
98  int error = pthread_spin_unlock( &m_spin );
99  ASSERT_TRUE(!error);
100  }
101  /// Non-blocking attempt to acquire a lock on the spinlock
102  inline bool try_lock() const {
103  return pthread_spin_trylock( &m_spin ) == 0;
104  }
105  ~spinlock(){
106  int error = pthread_spin_destroy( &m_spin );
107  ASSERT_TRUE(!error);
108  }
109  friend class conditional;
110  }; // End of spinlock
111 #define SPINLOCK_SUPPORTED 1
112 #else
113  //! if spinlock not supported, it is typedef it to a mutex.
114  typedef mutex spinlock;
115 #define SPINLOCK_SUPPORTED 0
116 #endif
117 
118 
119  /**
120  * \ingroup util
121  *If pthread spinlock is not implemented,
122  * this provides a simple alternate spin lock implementation.
123  *
124  * Before you use, see \ref parallel_object_intricacies.
125  */
127  private:
128  // mutable not actually needed
129  mutable volatile char spinner;
130  public:
131  /// constructs a spinlock
133  spinner = 0;
134  }
135 
136  /** Copy constructor which does not copy. Do not use!
137  Required for compatibility with some STL implementations (LLVM).
138  which use the copy constructor for vector resize,
139  rather than the standard constructor. */
141  spinner = 0;
142  }
143 
144  // not copyable
145  void operator=(const simple_spinlock& m) { }
146 
147 
148  /// Acquires a lock on the spinlock
149  inline void lock() const {
150  while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
151  }
152  /// Releases a lock on the spinlock
153  inline void unlock() const {
154  __sync_synchronize();
155  spinner = 0;
156  }
157  /// Non-blocking attempt to acquire a lock on the spinlock
158  inline bool try_lock() const {
159  return (__sync_lock_test_and_set(&spinner, 1) == 0);
160  }
161  ~simple_spinlock(){
162  ASSERT_TRUE(spinner == 0);
163  }
164  };
165 
166 
167  /**
168  * \ingroup util
169  * Wrapper around pthread's condition variable
170  *
171  * Before you use, see \ref parallel_object_intricacies.
172  */
173  class conditional {
174  private:
175  mutable pthread_cond_t m_cond;
176 
177  public:
178  conditional() {
179  int error = pthread_cond_init(&m_cond, NULL);
180  ASSERT_TRUE(!error);
181  }
182 
183  /** Copy constructor which does not copy. Do not use!
184  Required for compatibility with some STL implementations (LLVM).
185  which use the copy constructor for vector resize,
186  rather than the standard constructor. */
188  int error = pthread_cond_init(&m_cond, NULL);
189  ASSERT_TRUE(!error);
190  }
191 
192  // not copyable
193  void operator=(const conditional& m) { }
194 
195 
196  /// Waits on condition. The mutex must already be acquired. Caller
197  /// must be careful about spurious wakes.
198  inline void wait(const mutex& mut) const {
199  int error = pthread_cond_wait(&m_cond, &mut.m_mut);
200  ASSERT_TRUE(!error);
201  }
202  /// Like wait() but with a time limit of "sec" seconds
203  inline int timedwait(const mutex& mut, size_t sec) const {
204  struct timespec timeout;
205  struct timeval tv;
206  struct timezone tz;
207  gettimeofday(&tv, &tz);
208  timeout.tv_nsec = tv.tv_usec * 1000;
209  timeout.tv_sec = tv.tv_sec + (time_t)sec;
210  return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
211  }
212  /// Like wait() but with a time limit of "ms" milliseconds
213  inline int timedwait_ms(const mutex& mut, size_t ms) const {
214  struct timespec timeout;
215  struct timeval tv;
216  gettimeofday(&tv, NULL);
217  // convert ms to s and ns
218  size_t s = ms / 1000;
219  ms = ms % 1000;
220  size_t ns = ms * 1000000;
221  // convert timeval to timespec
222  timeout.tv_nsec = tv.tv_usec * 1000;
223  timeout.tv_sec = tv.tv_sec;
224 
225  // add the time
226  timeout.tv_nsec += (suseconds_t)ns;
227  timeout.tv_sec += (time_t)s;
228  // shift the nsec to sec if overflow
229  if (timeout.tv_nsec > 1000000000) {
230  timeout.tv_sec ++;
231  timeout.tv_nsec -= 1000000000;
232  }
233  return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
234  }
235  /// Like wait() but with a time limit of "ns" nanoseconds
236  inline int timedwait_ns(const mutex& mut, size_t ns) const {
237  struct timespec timeout;
238  struct timeval tv;
239  gettimeofday(&tv, NULL);
240  assert(ns > 0);
241  // convert ns to s and ns
242  size_t s = ns / 1000000;
243  ns = ns % 1000000;
244 
245  // convert timeval to timespec
246  timeout.tv_nsec = tv.tv_usec * 1000;
247  timeout.tv_sec = tv.tv_sec;
248 
249  // add the time
250  timeout.tv_nsec += (suseconds_t)ns;
251  timeout.tv_sec += (time_t)s;
252  // shift the nsec to sec if overflow
253  if (timeout.tv_nsec > 1000000000) {
254  timeout.tv_sec ++;
255  timeout.tv_nsec -= 1000000000;
256  }
257  return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
258  }
259  /// Signals one waiting thread to wake up
260  inline void signal() const {
261  int error = pthread_cond_signal(&m_cond);
262  ASSERT_TRUE(!error);
263  }
264  /// Wakes up all waiting threads
265  inline void broadcast() const {
266  int error = pthread_cond_broadcast(&m_cond);
267  ASSERT_TRUE(!error);
268  }
269  ~conditional() {
270  int error = pthread_cond_destroy(&m_cond);
271  ASSERT_TRUE(!error);
272  }
273  }; // End conditional
274 
275 
276 #ifdef __APPLE__
277  /**
278  * Custom implementation of a semaphore.
279  *
280  * Before you use, see \ref parallel_object_intricacies.
281  */
282  class semaphore {
283  private:
284  conditional cond;
285  mutex mut;
286  mutable volatile size_t semvalue;
287  mutable volatile size_t waitercount;
288 
289  public:
290  semaphore() {
291  semvalue = 0;
292  waitercount = 0;
293  }
294  /** Copy constructor which does not copy. Do not use!
295  Required for compatibility with some STL implementations (LLVM).
296  which use the copy constructor for vector resize,
297  rather than the standard constructor. */
298  semaphore(const semaphore&) {
299  semvalue = 0;
300  waitercount = 0;
301  }
302 
303  // not copyable
304  void operator=(const semaphore& m) { }
305 
306  inline void post() const {
307  mut.lock();
308  if (waitercount > 0) {
309  cond.signal();
310  }
311  semvalue++;
312  mut.unlock();
313  }
314  inline void wait() const {
315  mut.lock();
316  waitercount++;
317  while (semvalue == 0) {
318  cond.wait(mut);
319  }
320  waitercount--;
321  semvalue--;
322  mut.unlock();
323  }
324  ~semaphore() {
325  ASSERT_TRUE(waitercount == 0);
326  ASSERT_TRUE(semvalue == 0);
327  }
328  }; // End semaphore
329 #else
330  /**
331  * Wrapper around pthread's semaphore
332  *
333  * Before you use, see \ref parallel_object_intricacies.
334  */
335  class semaphore {
336  private:
337  mutable sem_t m_sem;
338 
339  public:
340  semaphore() {
341  int error = sem_init(&m_sem, 0,0);
342  ASSERT_TRUE(!error);
343  }
344 
345  /** Copy constructor with does not copy. Do not use!
346  Required for compatibility with some STL implementations (LLVM).
347  which use the copy constructor for vector resize,
348  rather than the standard constructor. */
350  int error = sem_init(&m_sem, 0,0);
351  ASSERT_TRUE(!error);
352  }
353 
354  // not copyable
355  void operator=(const semaphore& m) { }
356 
357  inline void post() const {
358  int error = sem_post(&m_sem);
359  ASSERT_TRUE(!error);
360  }
361  inline void wait() const {
362  int error = sem_wait(&m_sem);
363  ASSERT_TRUE(!error);
364  }
365  ~semaphore() {
366  int error = sem_destroy(&m_sem);
367  ASSERT_TRUE(!error);
368  }
369  }; // End semaphore
370 #endif
371 
372 
373 #define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
374 #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
375 #define atomic_inc(P) __sync_add_and_fetch((P), 1)
376 #define atomic_add(P, V) __sync_add_and_fetch((P), (V))
377 #define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
378 #define cpu_relax() asm volatile("pause\n": : :"memory")
379 
380  /**
381  * \class spinrwlock
382  * rwlock built around "spinning"
383  * source adapted from http://locklessinc.com/articles/locks/
384  * "Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors"
385  * John Mellor-Crummey and Michael Scott
386  */
387  class spinrwlock {
388 
389  union rwticket {
390  unsigned u;
391  unsigned short us;
392  __extension__ struct {
393  unsigned char write;
394  unsigned char read;
395  unsigned char users;
396  } s;
397  };
398  mutable bool writing;
399  mutable volatile rwticket l;
400  public:
401  spinrwlock() {
402  memset(const_cast<rwticket*>(&l), 0, sizeof(rwticket));
403  }
404  inline void writelock() const {
405  unsigned me = atomic_xadd(&l.u, (1<<16));
406  unsigned char val = (unsigned char)(me >> 16);
407 
408  while (val != l.s.write) asm volatile("pause\n": : :"memory");
409  writing = true;
410  }
411 
412  inline void wrunlock() const{
413  rwticket t = *const_cast<rwticket*>(&l);
414 
415  t.s.write++;
416  t.s.read++;
417 
418  *(volatile unsigned short *) (&l) = t.us;
419  writing = false;
420  __asm("mfence");
421  }
422 
423  inline void readlock() const {
424  unsigned me = atomic_xadd(&l.u, (1<<16));
425  unsigned char val = (unsigned char)(me >> 16);
426 
427  while (val != l.s.read) asm volatile("pause\n": : :"memory");
428  l.s.read++;
429  }
430 
431  inline void rdunlock() const {
432  atomic_inc(&l.s.write);
433  }
434 
435  inline void unlock() const {
436  if (!writing) rdunlock();
437  else wrunlock();
438  }
439  };
440 
441 
442 
443 #define RW_WAIT_BIT 0
444 #define RW_WRITE_BIT 1
445 #define RW_READ_BIT 2
446 
447 #define RW_WAIT 1
448 #define RW_WRITE 2
449 #define RW_READ 4
450 
451  struct spinrwlock2 {
452  mutable unsigned int l;
453 
454  spinrwlock2():l(0) {}
455  void writelock() const {
456  while (1) {
457  unsigned state = l;
458 
459  /* No readers or writers? */
460  if (state < RW_WRITE)
461  {
462  /* Turn off RW_WAIT, and turn on RW_WRITE */
463  if (cmpxchg(&l, state, RW_WRITE) == state) return;
464 
465  /* Someone else got there... time to wait */
466  state = l;
467  }
468 
469  /* Turn on writer wait bit */
470  if (!(state & RW_WAIT)) atomic_set_bit(&l, RW_WAIT_BIT);
471 
472  /* Wait until can try to take the lock */
473  while (l > RW_WAIT) cpu_relax();
474  }
475  }
476 
477  void wrunlock() const {
478  atomic_add(&l, -RW_WRITE);
479  }
480 
481  void readlock() const {
482  while (1) {
483  /* A writer exists? */
484  while (l & (RW_WAIT | RW_WRITE)) cpu_relax();
485 
486  /* Try to get read lock */
487  if (!(atomic_xadd(&l, RW_READ) & (RW_WAIT | RW_WRITE))) return;
488 
489  /* Undo */
490  atomic_add(&l, -RW_READ);
491  }
492  }
493 
494  void rdunlock() const {
495  atomic_add(&l, -RW_READ);
496  }
497  };
498 
499 #undef atomic_xadd
500 #undef cmpxchg
501 #undef atomic_inc
502 #undef atomic_set_bit
503 #undef atomic_add
504 #undef RW_WAIT_BIT
505 #undef RW_WRITE_BIT
506 #undef RW_READ_BIT
507 #undef RW_WAIT
508 #undef RW_WRITE
509 #undef RW_READ
510 
511 
512  /**
513  * \class rwlock
514  * Wrapper around pthread's rwlock
515  *
516  * Before you use, see \ref parallel_object_intricacies.
517  */
518  class rwlock {
519  private:
520  mutable pthread_rwlock_t m_rwlock;
521  public:
522  rwlock() {
523  int error = pthread_rwlock_init(&m_rwlock, NULL);
524  ASSERT_TRUE(!error);
525  }
526  ~rwlock() {
527  int error = pthread_rwlock_destroy(&m_rwlock);
528  ASSERT_TRUE(!error);
529  }
530 
531  // not copyable
532  void operator=(const rwlock& m) { }
533 
534  /**
535  * \todo: Remove!
536  *
537  * Copy constructor which does not copy. Do not use! Required for
538  * compatibility with some STL implementations (LLVM). which use
539  * the copy constructor for vector resize, rather than the
540  * standard constructor. */
541  rwlock(const rwlock &) {
542  int error = pthread_rwlock_init(&m_rwlock, NULL);
543  ASSERT_TRUE(!error);
544  }
545 
546  inline void readlock() const {
547  pthread_rwlock_rdlock(&m_rwlock);
548  //ASSERT_TRUE(!error);
549  }
550  inline void writelock() const {
551  pthread_rwlock_wrlock(&m_rwlock);
552  //ASSERT_TRUE(!error);
553  }
554  inline bool try_readlock() const {
555  return pthread_rwlock_tryrdlock(&m_rwlock) == 0;
556  }
557  inline bool try_writelock() const {
558  return pthread_rwlock_trywrlock(&m_rwlock) == 0;
559  }
560  inline void unlock() const {
561  pthread_rwlock_unlock(&m_rwlock);
562  //ASSERT_TRUE(!error);
563  }
564  inline void rdunlock() const {
565  unlock();
566  }
567  inline void wrunlock() const {
568  unlock();
569  }
570  }; // End rwlock
571 
572 
573 
574 
575 
576  /**
577  * \ingroup util
578  * This is a simple sense-reversing barrier implementation.
579  * In addition to standard barrier functionality, this also
580  * provides a "cancel" function which can be used to destroy
581  * the barrier, releasing all threads stuck in the barrier.
582  *
583  * Before you use, see \ref parallel_object_intricacies.
584  */
586  private:
589  mutable int needed;
590  mutable int called;
591 
592  mutable bool barrier_sense;
593  mutable bool barrier_release;
594  bool alive;
595 
596  // not copyconstructible
598 
599 
600  public:
601  /// Construct a barrier which will only fall when numthreads enter
602  cancellable_barrier(size_t numthreads) {
603  needed = numthreads;
604  called = 0;
605  barrier_sense = false;
606  barrier_release = true;
607  alive = true;
608  }
609 
610  // not copyable
611  void operator=(const cancellable_barrier& m) { }
612 
613  void resize_unsafe(size_t numthreads) {
614  needed = numthreads;
615  }
616 
617  /**
618  * \warning: This barrier is safely NOT reusable with this cancel
619  * definition
620  */
621  inline void cancel() {
622  alive = false;
624  }
625  /// Wait on the barrier until numthreads has called wait
626  inline void wait() const {
627  if (!alive) return;
628  mutex.lock();
629  // set waiting;
630  called++;
631  bool listening_on = barrier_sense;
632  if (called == needed) {
633  // if I have reached the required limit, wait up. Set waiting
634  // to 0 to make sure everyone wakes up
635  called = 0;
636  barrier_release = barrier_sense;
637  barrier_sense = !barrier_sense;
638  // clear all waiting
640  } else {
641  // while no one has broadcasted, sleep
642  while(barrier_release != listening_on && alive) conditional.wait(mutex);
643  }
644  mutex.unlock();
645  }
646  }; // end of conditional
647 
648 
649 
650  /**
651  * \class barrier
652  * Wrapper around pthread's barrier
653  *
654  * Before you use, see \ref parallel_object_intricacies.
655  */
656 #ifdef __linux__
657  /**
658  * \ingroup util
659  * Wrapper around pthread's barrier
660  */
661  class barrier {
662  private:
663  mutable pthread_barrier_t m_barrier;
664  // not copyconstructable
665  barrier(const barrier&) { }
666  public:
667  /// Construct a barrier which will only fall when numthreads enter
668  barrier(size_t numthreads) {
669  pthread_barrier_init(&m_barrier, NULL, (unsigned)numthreads); }
670  // not copyable
671  void operator=(const barrier& m) { }
672  void resize_unsafe(size_t numthreads) {
673  pthread_barrier_destroy(&m_barrier);
674  pthread_barrier_init(&m_barrier, NULL, (unsigned)numthreads);
675  }
676  ~barrier() { pthread_barrier_destroy(&m_barrier); }
677  /// Wait on the barrier until numthreads has called wait
678  inline void wait() const { pthread_barrier_wait(&m_barrier); }
679  };
680 
681 #else
682  /* In some systems, pthread_barrier is not available.
683  */
684  typedef cancellable_barrier barrier;
685 #endif
686 
687 
688 
689  inline void prefetch_range(void *addr, size_t len) {
690  char *cp;
691  char *end = (char*)(addr) + len;
692 
693  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
694  }
695  inline void prefetch_range_write(void *addr, size_t len) {
696  char *cp;
697  char *end = (char*)(addr) + len;
698 
699  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
700  }
701 
702 
703 
704 
705 
706 
707 
708 
709 
710  /**
711  * \ingroup util
712  * A collection of routines for creating and managing threads.
713  *
714  * The thread object performs limited exception forwarding.
715  * exception throws within a thread of type const char* will be caught
716  * and forwarded to the join() function.
717  * If the call to join() is wrapped by a try-catch block, the exception
718  * will be caught safely and thread cleanup will be completed properly.
719  */
720  class thread {
721  public:
722 
723  /**
724  * This class contains the data unique to each thread. All threads
725  * are gauranteed to have an associated graphlab thread_specific
726  * data. The thread object is copyable.
727  */
728  class tls_data {
729  public:
730  inline tls_data(size_t thread_id) : thread_id_(thread_id) { }
731  inline size_t thread_id() { return thread_id_; }
732  inline void set_thread_id(size_t t) { thread_id_ = t; }
733  any& operator[](const size_t& id) { return local_data[id]; }
734  bool contains(const size_t& id) const {
735  return local_data.find(id) != local_data.end();
736  }
737  size_t erase(const size_t& id) {
738  return local_data.erase(id);
739  }
740  private:
741  size_t thread_id_;
742  boost::unordered_map<size_t, any> local_data;
743  }; // end of thread specific data
744 
745 
746 
747  /// Static helper routines
748  // ===============================================================
749 
750  /**
751  * Get the thread specific data associated with this thread
752  */
753  static tls_data& get_tls_data();
754 
755  /** Get the id of the calling thread. This will typically be the
756  index in the thread group. Between 0 to ncpus. */
757  static inline size_t thread_id() { return get_tls_data().thread_id(); }
758 
759  /** Set the id of the calling thread. This will typically be the
760  index in the thread group. Between 0 to ncpus. */
761  static inline void set_thread_id(size_t t) { get_tls_data().set_thread_id(t); }
762 
763  /**
764  * Get a reference to an any object
765  */
766  static inline any& get_local(const size_t& id) {
767  return get_tls_data()[id];
768  }
769 
770  /**
771  * Check to see if there is an entry in the local map
772  */
773  static inline bool contains(const size_t& id) {
774  return get_tls_data().contains(id);
775  }
776 
777  /**
778  * Removes the entry from the local map.
779  * @return number of elements erased.
780  */
781  static inline size_t erase(const size_t& id){
782  return get_tls_data().erase(id);
783  }
784 
785  /**
786  * This static method joins the invoking thread with the other
787  * thread object. This thread will not return from the join
788  * routine until the other thread complets it run.
789  */
790  static void join(thread& other);
791 
792  // Called just before thread exits. Can be used
793  // to do special cleanup... (need for Java JNI)
794  static void thread_destroy_callback();
795  static void set_thread_destroy_callback(void (*callback)());
796 
797 
798  /**
799  * Return the number processing units (individual cores) on this
800  * system
801  */
802  static size_t cpu_count();
803 
804 
805  private:
806 
807  struct invoke_args{
808  size_t m_thread_id;
809  boost::function<void(void)> spawn_routine;
810  invoke_args(size_t m_thread_id, const boost::function<void(void)> &spawn_routine)
811  : m_thread_id(m_thread_id), spawn_routine(spawn_routine) { };
812  };
813 
814  //! Little helper function used to launch threads
815  static void* invoke(void *_args);
816 
817  public:
818 
819  /**
820  * Creates a thread with a user-defined associated thread ID
821  */
822  inline thread(size_t thread_id = 0) :
823  m_stack_size(0),
824  m_p_thread(0),
825  m_thread_id(thread_id),
826  thread_started(false){
827  // Calculate the stack size in in bytes;
828  const int BYTES_PER_MB = 1048576;
829  const int DEFAULT_SIZE_IN_MB = 8;
830  m_stack_size = DEFAULT_SIZE_IN_MB * BYTES_PER_MB;
831  }
832 
833  /**
834  * execute this function to spawn a new thread running spawn_function
835  * routine
836  */
837  void launch(const boost::function<void (void)> &spawn_routine);
838 
839  /**
840  * Same as launch() except that you can specify a CPU on which to
841  * run the thread. This only currently supported in Linux and if
842  * invoked on a non Linux based system this will be equivalent to
843  * start().
844  */
845  void launch(const boost::function<void (void)> &spawn_routine, size_t cpu_id);
846 
847 
848  /**
849  * Join the calling thread with this thread.
850  * const char* exceptions
851  * thrown by the thread is forwarded to the join() function.
852  */
853  inline void join() {
854  if(this == NULL) {
855  std::cout << "Failure on join()" << std::endl;
856  exit(EXIT_FAILURE);
857  }
858  join(*this);
859  }
860 
861  /// Returns true if the thread is still running
862  inline bool active() const {
863  return thread_started;
864  }
865 
866  inline ~thread() { }
867 
868  /// Returns the pthread thread id
869  inline pthread_t pthreadid() {
870  return m_p_thread;
871  }
872  private:
873 
874 
875  //! The size of the internal stack for this thread
876  size_t m_stack_size;
877 
878  //! The internal pthread object
879  pthread_t m_p_thread;
880 
881  //! the threads id
882  size_t m_thread_id;
883 
884  bool thread_started;
885  }; // End of class thread
886 
887 
888 
889 
890 
891  /**
892  * \ingroup util
893  * Manages a collection of threads.
894  *
895  * The thread_group object performs limited exception forwarding.
896  * exception throws within a thread of type const char* will be caught
897  * and forwarded to the join() function.
898  * If the call to join() is wrapped by a try-catch block, the exception
899  * will be caught safely and thread cleanup will be completed properly.
900  *
901  * If multiple threads are running in the thread-group, the master should
902  * test if running_threads() is > 0, and retry the join().
903  */
904  class thread_group {
905  private:
906  size_t m_thread_counter;
907  size_t threads_running;
908  mutex mut;
909  conditional cond;
910  std::queue<std::pair<pthread_t, const char*> > joinqueue;
911  // not implemented
912  thread_group& operator=(const thread_group &thrgrp);
913  thread_group(const thread_group&);
914  static void invoke(boost::function<void (void)> spawn_function, thread_group *group);
915  public:
916  /**
917  * Initializes a thread group.
918  */
919  thread_group() : m_thread_counter(0), threads_running(0) { }
920 
921  /**
922  * Launch a single thread which calls spawn_function No CPU affinity is
923  * set so which core it runs on is up to the OS Scheduler
924  */
925  void launch(const boost::function<void (void)> &spawn_function);
926 
927  /**
928  * Launch a single thread which calls spawn_function Also sets CPU
929  * Affinity
930  */
931  void launch(const boost::function<void (void)> &spawn_function, size_t cpu_id);
932 
933  /** Waits for all threads to complete execution. const char* exceptions
934  thrown by threads are forwarded to the join() function.
935  */
936  void join();
937 
938  /// Returns the number of running threads.
939  inline size_t running_threads() {
940  return threads_running;
941  }
942  //! Destructor. Waits for all threads to complete execution
943  inline ~thread_group(){ join(); }
944 
945  }; // End of thread group
946 
947 
948  /// Runs f in a new thread. convenience function for creating a new thread quickly.
949  inline thread launch_in_new_thread(const boost::function<void (void)> &f,
950  size_t cpuid = size_t(-1)) {
951  thread thr;
952  if (cpuid != size_t(-1)) thr.launch(f, cpuid);
953  else thr.launch(f);
954  return thr;
955  }
956 
957  /// an integer value padded to 64 bytes
958  struct padded_integer {
959  size_t val;
960  char __pad__[64 - sizeof(size_t)];
961  };
962 }; // End Namespace
963 
964 #endif
965