![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* gc.cc 00002 Jeremy Barnes, 26 September 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "gc.h" 00008 #include <urcu.h> 00009 #include <boost/thread.hpp> 00010 #include <iostream> 00011 #include "jml/utils/exc_assert.h" 00012 #include "jml/arch/atomic_ops.h" 00013 #include "jml/arch/rwlock.h" 00014 #include "jml/arch/thread_specific.h" 00015 00016 using namespace std; 00017 00018 00019 namespace Datacratic { 00020 namespace MMap { 00021 00022 00023 struct doInit { 00024 doInit() 00025 { 00026 rcu_init(); 00027 } 00028 } init; 00029 00030 void registerThread() 00031 { 00032 rcu_register_thread(); 00033 rcu_defer_register_thread(); 00034 } 00035 00036 void unregisterThread() 00037 { 00038 rcu_defer_unregister_thread(); 00039 rcu_unregister_thread(); 00040 } 00041 00044 static volatile bool exclusiveMode = false; 00045 00047 //static boost::shared_mutex exclusionMutex; 00048 static ML::RWLock exclusionMutex; 00049 00050 #if 0 00051 00058 struct ThreadData; 00059 00060 struct ThreadDataDeleter { 00061 ThreadDataDeleter(ThreadData * data) 00062 : data(data) 00063 { 00064 } 00065 00066 ThreadData * data; 00067 00068 ~ThreadDataDeleter(); 00069 }; 00070 00071 boost::thread_specific_ptr<ThreadDataDeleter> threadDataDeleter; 00072 00073 #endif 00074 00075 struct ThreadData { 00076 00077 static int numInRead; 00078 00079 ThreadData() 00080 : readLocked(0), writeLocked(0) 00081 { 00082 lockedExclusive = false; 00083 registerThread(); 00084 initialized = true; 00085 threadNum = random(); 00086 00087 //threadDataDeleter.reset(new ThreadDataDeleter(this)); 00088 00089 //cerr << "initialized thread " << threadNum << " at " << this << endl; 00090 } 00091 00092 ~ThreadData() 00093 { 00094 //cerr << "destroyThread " << threadNum << " at " << this << endl; 00095 00096 if (writeLocked) { 00097 cerr << "warning: thread exiting with write lock held" << endl; 00098 } 00099 00100 if (readLocked) { 00101 cerr << "warning: thread exiting with read lock held" << endl; 00102 } 00103 00104 if (initialized) 00105 unregisterThread(); 00106 initialized = false; 00107 } 00108 00109 void readLock() 00110 { 00111 //cerr << "readLock" << " " << this << " readLocked " << readLocked << " writeLocked " 00112 // << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive 00113 // << endl; 00114 while (!readLocked && !writeLocked) { 00115 lockedExclusive = exclusiveMode; 00116 00117 if (lockedExclusive) 00118 exclusionMutex.lock_shared(); 00119 00120 rcu_read_lock(); // this also does a memory barrier... 00121 00122 // Avoid racing with the update of the exlusive lock... 00123 // TODO: this needs to be well tested 00124 if (lockedExclusive != exclusiveMode) { 00125 rcu_read_unlock(); 00126 continue; 00127 } 00128 00129 //ML::atomic_inc(numInRead); 00130 00131 break; 00132 } 00133 00134 ++readLocked; 00135 00136 } 00137 00138 bool isReadLocked() 00139 { 00140 return readLocked || writeLocked; 00141 } 00142 00143 void readUnlock() 00144 { 00145 //cerr << "readUnlock" << " " << this << " readLocked " << readLocked << " writeLocked " 00146 // << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive 00147 // << endl; 00148 00149 if (readLocked <= 0) 00150 throw ML::Exception("bad read lock nesting"); 00151 --readLocked; 00152 if (!readLocked && !writeLocked) { 00153 if (lockedExclusive) exclusionMutex.unlock_shared(); 00154 //ML::atomic_dec(numInRead); 00155 rcu_read_unlock(); 00156 } 00157 } 00158 00159 void writeLock() 00160 { 00161 //cerr << "writeLock" << " " << this << " readLocked " << readLocked << " writeLocked " 00162 // << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive 00163 // << endl; 00164 if (readLocked) 00165 throw ML::Exception("can't acquire write lock with read lock held"); 00166 00167 if (!writeLocked) { 00168 exclusionMutex.lock(); 00169 //cerr << "entering exclusive mode numInRead = " << numInRead << endl; 00170 ExcAssert(!exclusiveMode); 00171 exclusiveMode = true; 00172 //ML::memory_barrier(); 00173 synchronize_rcu(); // wait for all readers to stop and block on lock 00174 00175 //ExcAssertEqual(numInRead, 0); 00176 } 00177 00178 ++writeLocked; 00179 } 00180 00181 void writeUnlock() 00182 { 00183 //cerr << "writeUnlock" << " " << this << " readLocked " << readLocked << " writeLocked " 00184 // << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive 00185 // << endl; 00186 00187 if (writeLocked <= 0) 00188 throw ML::Exception("bad write lock nesting"); 00189 --writeLocked; 00190 if (!writeLocked) { 00191 exclusiveMode = false; 00192 exclusionMutex.unlock(); 00193 } 00194 } 00195 00196 bool isWriteLocked() 00197 { 00198 return writeLocked; 00199 } 00200 00201 void stopTheWorld() 00202 { 00203 writeLock(); 00204 rcu_defer_barrier(); 00205 } 00206 00207 void restartTheWorld() 00208 { 00209 writeUnlock(); 00210 } 00211 00212 bool initialized; 00213 int readLocked; 00214 bool lockedExclusive; 00215 int writeLocked; 00216 int threadNum; 00217 00218 pthread_key_t tssKey; 00219 }; 00220 00221 int ThreadData::numInRead = 0; 00222 00223 #if 0 00224 ThreadDataDeleter:: 00225 ~ThreadDataDeleter() 00226 { 00227 delete data; 00228 } 00229 00230 // Will be leaked... 00231 __thread ThreadData * threadData = 0; 00232 00233 00234 #endif 00235 00236 ML::Thread_Specific<ThreadData> threadData; 00237 00238 ThreadData & getThreadData() 00239 { 00240 return *threadData; 00241 00242 #if 0 00243 if (!threadData.get()) 00244 threadData.reset(new ThreadData()); 00245 return *threadData; 00246 #endif 00247 } 00248 00249 int getThreadNum() 00250 { 00251 return getThreadData().threadNum; 00252 } 00253 00254 void readLock() 00255 { 00256 getThreadData().readLock(); 00257 } 00258 00259 void readUnlock() 00260 { 00261 getThreadData().readUnlock(); 00262 } 00263 00264 bool isReadLocked() 00265 { 00266 return getThreadData().isReadLocked(); 00267 } 00268 00269 void waitForGC() 00270 { 00271 rcu_defer_barrier(); 00272 } 00273 00274 // These are stubs at the moment but will have to become something better... 00275 00276 void writeLock() 00277 { 00278 getThreadData().writeLock(); 00279 } 00280 00281 void writeUnlock() 00282 { 00283 getThreadData().writeUnlock(); 00284 } 00285 00286 bool isWriteLocked() 00287 { 00288 return getThreadData().isWriteLocked(); 00289 } 00290 00291 void stopTheWorld() 00292 { 00293 getThreadData().stopTheWorld(); 00294 } 00295 00296 void restartTheWorld() 00297 { 00298 getThreadData().restartTheWorld(); 00299 } 00300 00301 void doDeferredGc(void * workBlock) 00302 { 00303 boost::function<void ()> * work 00304 = reinterpret_cast<boost::function<void ()> *>(workBlock); 00305 try { 00306 (*work)(); 00307 } catch (...) { 00308 delete work; 00309 throw; 00310 } 00311 00312 delete work; 00313 } 00314 00315 void deferGC(const boost::function<void ()> & work) 00316 { 00317 defer_rcu(&doDeferredGc, 00318 new boost::function<void ()>(work)); 00319 } 00320 00321 00322 // boost::lock_guard<boost::mutex> lock(m); 00323 00324 00325 } // namespace MMap 00326 } // namespace Datacratic