24 #ifndef GRAPHLAB_PTHREAD_TOOLS_HPP
25 #define GRAPHLAB_PTHREAD_TOOLS_HPP
30 #include <semaphore.h>
38 #include <boost/function.hpp>
39 #include <graphlab/logger/assertions.hpp>
40 #include <graphlab/parallel/atomic_ops.hpp>
41 #include <graphlab/util/generics/any.hpp>
42 #include <graphlab/util/branch_hints.hpp>
43 #include <boost/unordered_map.hpp>
44 #undef _POSIX_SPIN_LOCKS
45 #define _POSIX_SPIN_LOCKS -1
48 #include <graphlab/parallel/mutex.hpp>
59 #if _POSIX_SPIN_LOCKS >= 0
70 mutable pthread_spinlock_t m_spin;
74 int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
83 int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
88 void operator=(
const spinlock& m) { }
92 inline void lock()
const {
93 int error = pthread_spin_lock( &m_spin );
97 inline void unlock()
const {
98 int error = pthread_spin_unlock( &m_spin );
103 return pthread_spin_trylock( &m_spin ) == 0;
106 int error = pthread_spin_destroy( &m_spin );
109 friend class conditional;
111 #define SPINLOCK_SUPPORTED 1
115 #define SPINLOCK_SUPPORTED 0
129 mutable volatile char spinner;
150 while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
154 __sync_synchronize();
159 return (__sync_lock_test_and_set(&spinner, 1) == 0);
162 ASSERT_TRUE(spinner == 0);
175 mutable pthread_cond_t m_cond;
179 int error = pthread_cond_init(&m_cond, NULL);
188 int error = pthread_cond_init(&m_cond, NULL);
199 int error = pthread_cond_wait(&m_cond, &mut.m_mut);
204 struct timespec timeout;
207 gettimeofday(&tv, &tz);
208 timeout.tv_nsec = tv.tv_usec * 1000;
209 timeout.tv_sec = tv.tv_sec + (time_t)sec;
210 return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
214 struct timespec timeout;
216 gettimeofday(&tv, NULL);
218 size_t s = ms / 1000;
220 size_t ns = ms * 1000000;
222 timeout.tv_nsec = tv.tv_usec * 1000;
223 timeout.tv_sec = tv.tv_sec;
226 timeout.tv_nsec += (suseconds_t)ns;
227 timeout.tv_sec += (time_t)s;
229 if (timeout.tv_nsec > 1000000000) {
231 timeout.tv_nsec -= 1000000000;
233 return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
237 struct timespec timeout;
239 gettimeofday(&tv, NULL);
242 size_t s = ns / 1000000;
246 timeout.tv_nsec = tv.tv_usec * 1000;
247 timeout.tv_sec = tv.tv_sec;
250 timeout.tv_nsec += (suseconds_t)ns;
251 timeout.tv_sec += (time_t)s;
253 if (timeout.tv_nsec > 1000000000) {
255 timeout.tv_nsec -= 1000000000;
257 return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
261 int error = pthread_cond_signal(&m_cond);
266 int error = pthread_cond_broadcast(&m_cond);
270 int error = pthread_cond_destroy(&m_cond);
286 mutable volatile size_t semvalue;
287 mutable volatile size_t waitercount;
298 semaphore(
const semaphore&) {
304 void operator=(
const semaphore& m) { }
306 inline void post()
const {
308 if (waitercount > 0) {
314 inline void wait()
const {
317 while (semvalue == 0) {
325 ASSERT_TRUE(waitercount == 0);
326 ASSERT_TRUE(semvalue == 0);
341 int error = sem_init(&m_sem, 0,0);
350 int error = sem_init(&m_sem, 0,0);
357 inline void post()
const {
358 int error = sem_post(&m_sem);
361 inline void wait()
const {
362 int error = sem_wait(&m_sem);
366 int error = sem_destroy(&m_sem);
373 #define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
374 #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
375 #define atomic_inc(P) __sync_add_and_fetch((P), 1)
376 #define atomic_add(P, V) __sync_add_and_fetch((P), (V))
377 #define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
378 #define cpu_relax() asm volatile("pause\n": : :"memory")
392 __extension__
struct {
398 mutable bool writing;
399 mutable volatile rwticket l;
402 memset(const_cast<rwticket*>(&l), 0,
sizeof(rwticket));
404 inline void writelock()
const {
405 unsigned me = atomic_xadd(&l.u, (1<<16));
406 unsigned char val = (
unsigned char)(me >> 16);
408 while (val != l.s.write)
asm volatile(
"pause\n": : :
"memory");
412 inline void wrunlock()
const{
413 rwticket t = *
const_cast<rwticket*
>(&l);
418 *(
volatile unsigned short *) (&l) = t.us;
423 inline void readlock()
const {
424 unsigned me = atomic_xadd(&l.u, (1<<16));
425 unsigned char val = (
unsigned char)(me >> 16);
427 while (val != l.s.read)
asm volatile(
"pause\n": : :
"memory");
431 inline void rdunlock()
const {
432 atomic_inc(&l.s.write);
435 inline void unlock()
const {
436 if (!writing) rdunlock();
443 #define RW_WAIT_BIT 0
444 #define RW_WRITE_BIT 1
445 #define RW_READ_BIT 2
452 mutable unsigned int l;
454 spinrwlock2():l(0) {}
455 void writelock()
const {
460 if (state < RW_WRITE)
463 if (cmpxchg(&l, state, RW_WRITE) == state)
return;
470 if (!(state & RW_WAIT)) atomic_set_bit(&l, RW_WAIT_BIT);
473 while (l > RW_WAIT) cpu_relax();
477 void wrunlock()
const {
478 atomic_add(&l, -RW_WRITE);
481 void readlock()
const {
484 while (l & (RW_WAIT | RW_WRITE)) cpu_relax();
487 if (!(atomic_xadd(&l, RW_READ) & (RW_WAIT | RW_WRITE)))
return;
490 atomic_add(&l, -RW_READ);
494 void rdunlock()
const {
495 atomic_add(&l, -RW_READ);
502 #undef atomic_set_bit
520 mutable pthread_rwlock_t m_rwlock;
523 int error = pthread_rwlock_init(&m_rwlock, NULL);
527 int error = pthread_rwlock_destroy(&m_rwlock);
532 void operator=(
const rwlock& m) { }
542 int error = pthread_rwlock_init(&m_rwlock, NULL);
546 inline void readlock()
const {
547 pthread_rwlock_rdlock(&m_rwlock);
550 inline void writelock()
const {
551 pthread_rwlock_wrlock(&m_rwlock);
554 inline bool try_readlock()
const {
555 return pthread_rwlock_tryrdlock(&m_rwlock) == 0;
557 inline bool try_writelock()
const {
558 return pthread_rwlock_trywrlock(&m_rwlock) == 0;
560 inline void unlock()
const {
561 pthread_rwlock_unlock(&m_rwlock);
564 inline void rdunlock()
const {
567 inline void wrunlock()
const {
592 mutable bool barrier_sense;
593 mutable bool barrier_release;
605 barrier_sense =
false;
606 barrier_release =
true;
613 void resize_unsafe(
size_t numthreads) {
631 bool listening_on = barrier_sense;
632 if (called == needed) {
636 barrier_release = barrier_sense;
637 barrier_sense = !barrier_sense;
663 mutable pthread_barrier_t m_barrier;
669 pthread_barrier_init(&m_barrier, NULL, (
unsigned)numthreads); }
671 void operator=(
const barrier& m) { }
672 void resize_unsafe(
size_t numthreads) {
673 pthread_barrier_destroy(&m_barrier);
674 pthread_barrier_init(&m_barrier, NULL, (
unsigned)numthreads);
676 ~
barrier() { pthread_barrier_destroy(&m_barrier); }
678 inline void wait()
const { pthread_barrier_wait(&m_barrier); }
684 typedef cancellable_barrier
barrier;
689 inline void prefetch_range(
void *addr,
size_t len) {
691 char *end = (
char*)(addr) + len;
693 for (cp = (
char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
695 inline void prefetch_range_write(
void *addr,
size_t len) {
697 char *end = (
char*)(addr) + len;
699 for (cp = (
char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
730 inline tls_data(
size_t thread_id) : thread_id_(thread_id) { }
731 inline size_t thread_id() {
return thread_id_; }
732 inline void set_thread_id(
size_t t) { thread_id_ = t; }
733 any& operator[](
const size_t&
id) {
return local_data[id]; }
734 bool contains(
const size_t&
id)
const {
735 return local_data.find(
id) != local_data.end();
737 size_t erase(
const size_t&
id) {
738 return local_data.erase(
id);
742 boost::unordered_map<size_t, any> local_data;
781 static inline size_t erase(
const size_t&
id){
794 static void thread_destroy_callback();
795 static void set_thread_destroy_callback(
void (*callback)());
809 boost::function<void(void)> spawn_routine;
810 invoke_args(
size_t m_thread_id,
const boost::function<
void(
void)> &spawn_routine)
811 : m_thread_id(m_thread_id), spawn_routine(spawn_routine) { };
815 static void* invoke(
void *_args);
826 thread_started(false){
828 const int BYTES_PER_MB = 1048576;
829 const int DEFAULT_SIZE_IN_MB = 8;
830 m_stack_size = DEFAULT_SIZE_IN_MB * BYTES_PER_MB;
837 void launch(
const boost::function<
void (
void)> &spawn_routine);
845 void launch(
const boost::function<
void (
void)> &spawn_routine,
size_t cpu_id);
855 std::cout <<
"Failure on join()" << std::endl;
863 return thread_started;
879 pthread_t m_p_thread;
906 size_t m_thread_counter;
907 size_t threads_running;
910 std::queue<std::pair<pthread_t, const char*> > joinqueue;
914 static void invoke(boost::function<
void (
void)> spawn_function,
thread_group *group);
925 void launch(
const boost::function<
void (
void)> &spawn_function);
931 void launch(
const boost::function<
void (
void)> &spawn_function,
size_t cpu_id);
940 return threads_running;
950 size_t cpuid =
size_t(-1)) {
952 if (cpuid !=
size_t(-1)) thr.
launch(f, cpuid);
960 char __pad__[64 -
sizeof(size_t)];