GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
thread_pool.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 
26 
27 #ifndef GRAPHLAB_THREAD_POOL_HPP
28 #define GRAPHLAB_THREAD_POOL_HPP
29 
30 #include <boost/bind.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/util/blocking_queue.hpp>
33 
34 namespace graphlab {
35 
36  /**
37  * \ingroup util
38  * Manages a pool of threads.
39  *
40  * The interface is nearly identical to the \ref thread_group.
41  * The key difference is internal behavior. The thread pool preallocates a
42  * collection of threads which it keeps asleep. When tasks are issued
43  * through the "launch" function, threads are woken up to perform the
44  * tasks.
45  *
46  * The thread_pool object performs limited exception forwarding.
47  * exception throws within a thread of type const char* will be caught
48  * and forwarded to the join() function.
49  * If the call to join() is wrapped by a try-catch block, the exception
50  * will be caught safely and thread cleanup will be completed properly.
51  *
52  * If multiple threads are running in the thread-group, the master should
53  * test if running_threads() is > 0, and retry the join().
54  *
55  */
56  class thread_pool {
57  private:
58 
59  thread_group threads;
61  size_t pool_size;
62 
63  // protects the exception queue, and the task counters
64  mutex mut;
65  conditional event_condition; // to wake up the joining thread
66  std::queue<const char*> exception_queue;
67  size_t tasks_inserted;
68  size_t tasks_completed;
69  bool waiting_on_join; // true if a thread is waiting in join
70 
71  bool cpu_affinity;
72  // not implemented
73  thread_pool& operator=(const thread_pool &thrgrp);
74  thread_pool(const thread_pool&);
75 
76  /**
77  Called by each thread. Loops around a queue of tasks.
78  */
79  void wait_for_task();
80 
81  /**
82  Creates all the threads in the thread pool.
83  Resets the task and exception queue
84  */
85  void spawn_thread_group();
86 
87  /**
88  Destroys the thread pool.
89  Also destroys the task queue
90  */
91  void destroy_all_threads();
92  public:
93 
94  /* Initializes a thread pool with nthreads.
95  * If affinity is set, the nthreads will by default stripe across
96  * the available cores on the system.
97  */
98  thread_pool(size_t nthreads = 2, bool affinity = false);
99 
100  /**
101  * Set the number of threads in the queue
102  */
103  void resize(size_t nthreads);
104 
105  /**
106  * Get the number of threads
107  */
108  size_t size() const;
109 
110 
111  /**
112  * Changes the CPU affinity. Note that pthread does not provide
113  * a way to change CPU affinity on a currently started thread.
114  * This function therefore waits for all threads in the pool
115  * to finish their current task, and destroy all the threads. Then
116  * new threads are created with the new affinity setting.
117  */
118  void set_cpu_affinity(bool affinity);
119 
120  /**
121  Gets the CPU affinity.
122  */
123  bool get_cpu_affinity() { return cpu_affinity; };
124 
125  /**
126  * Launch a single thread which calls spawn_function. If affinity
127  * is set on construction of the thread_pool, the thread handling the
128  * function will be locked on to one particular CPU.
129  *
130  * If virtual_threadid is set, the target thread will appear to have
131  * thread ID equal to the requested thread ID
132  */
133  void launch(const boost::function<void (void)> &spawn_function,
134  int virtual_threadid = -1);
135 
136 
137  /** Waits for all threads to become free. const char* exceptions
138  thrown by threads are forwarded to the join() function.
139  Once this function returns normally, the queue is empty.
140 
141  Note that this function may not return if producers continually insert
142  tasks through launch.
143  */
144  void join();
145 
146  //! Destructor. Cleans up all threads
147  ~thread_pool();
148  };
149 
150 }
151 #endif