RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/gc/gc.cc
00001 /* gc.cc
00002    Jeremy Barnes, 26 September 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "gc.h"
00008 #include <urcu.h>
00009 #include <boost/thread.hpp>
00010 #include <iostream>
00011 #include "jml/utils/exc_assert.h"
00012 #include "jml/arch/atomic_ops.h"
00013 #include "jml/arch/rwlock.h"
00014 #include "jml/arch/thread_specific.h"
00015 
00016 using namespace std;
00017 
00018 
00019 namespace Datacratic {
00020 namespace MMap {
00021 
00022 
00023 struct doInit {
00024     doInit()
00025     {
00026         rcu_init();
00027     }
00028 } init;
00029 
00030 void registerThread()
00031 {
00032     rcu_register_thread();
00033     rcu_defer_register_thread();
00034 }
00035 
00036 void unregisterThread()
00037 {
00038     rcu_defer_unregister_thread();
00039     rcu_unregister_thread();
00040 }
00041 
00044 static volatile bool exclusiveMode = false;
00045 
00047 //static boost::shared_mutex exclusionMutex;
00048 static ML::RWLock exclusionMutex;
00049 
00050 #if 0
00051 
00058 struct ThreadData;
00059 
00060 struct ThreadDataDeleter {
00061     ThreadDataDeleter(ThreadData * data)
00062         : data(data)
00063     {
00064     }
00065 
00066     ThreadData * data;
00067 
00068     ~ThreadDataDeleter();
00069 };
00070 
00071 boost::thread_specific_ptr<ThreadDataDeleter> threadDataDeleter;
00072 
00073 #endif
00074 
00075 struct ThreadData {
00076 
00077     static int numInRead;
00078 
00079     ThreadData()
00080         : readLocked(0), writeLocked(0)
00081     {
00082         lockedExclusive = false;
00083         registerThread();
00084         initialized = true;
00085         threadNum = random();
00086 
00087         //threadDataDeleter.reset(new ThreadDataDeleter(this));
00088 
00089         //cerr << "initialized thread " << threadNum << " at " << this << endl;
00090     }
00091 
00092     ~ThreadData()
00093     {
00094         //cerr << "destroyThread " << threadNum << " at " << this << endl;
00095 
00096         if (writeLocked) {
00097             cerr << "warning: thread exiting with write lock held" << endl;
00098         }
00099 
00100         if (readLocked) {
00101             cerr << "warning: thread exiting with read lock held" << endl;
00102         }
00103         
00104         if (initialized)
00105             unregisterThread();
00106         initialized = false;
00107     }
00108     
00109     void readLock()
00110     {
00111         //cerr << "readLock" << " " << this << " readLocked " << readLocked << " writeLocked "
00112         //     << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive
00113         //     << endl;
00114         while (!readLocked && !writeLocked) {
00115             lockedExclusive = exclusiveMode;
00116 
00117             if (lockedExclusive)
00118                 exclusionMutex.lock_shared();
00119 
00120             rcu_read_lock();  // this also does a memory barrier...
00121 
00122             // Avoid racing with the update of the exlusive lock...
00123             // TODO: this needs to be well tested
00124             if (lockedExclusive != exclusiveMode) {
00125                 rcu_read_unlock();
00126                 continue;
00127             }
00128 
00129             //ML::atomic_inc(numInRead);
00130 
00131             break;
00132         }
00133 
00134         ++readLocked;
00135 
00136     }
00137 
00138     bool isReadLocked()
00139     {
00140         return readLocked || writeLocked;
00141     }
00142     
00143     void readUnlock()
00144     {
00145         //cerr << "readUnlock" << " " << this << " readLocked " << readLocked << " writeLocked "
00146         //     << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive
00147         //     << endl;
00148 
00149         if (readLocked <= 0)
00150             throw ML::Exception("bad read lock nesting");
00151         --readLocked;
00152         if (!readLocked && !writeLocked) {
00153             if (lockedExclusive) exclusionMutex.unlock_shared();
00154             //ML::atomic_dec(numInRead);
00155             rcu_read_unlock();
00156         }
00157     }
00158     
00159     void writeLock()
00160     {
00161         //cerr << "writeLock" << " " << this << " readLocked " << readLocked << " writeLocked "
00162         //     << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive
00163         //     << endl;
00164         if (readLocked)
00165             throw ML::Exception("can't acquire write lock with read lock held");
00166 
00167         if (!writeLocked) {
00168             exclusionMutex.lock();
00169             //cerr << "entering exclusive mode numInRead = " << numInRead << endl;
00170             ExcAssert(!exclusiveMode);
00171             exclusiveMode = true;
00172             //ML::memory_barrier();
00173             synchronize_rcu();  // wait for all readers to stop and block on lock
00174 
00175             //ExcAssertEqual(numInRead, 0);
00176         }
00177 
00178         ++writeLocked;
00179     }
00180 
00181     void writeUnlock()
00182     {
00183         //cerr << "writeUnlock" << " " << this << " readLocked " << readLocked << " writeLocked "
00184         //     << writeLocked << " excl " << exclusiveMode << " lexcl " << lockedExclusive
00185         //     << endl;
00186 
00187         if (writeLocked <= 0)
00188             throw ML::Exception("bad write lock nesting");
00189         --writeLocked;
00190         if (!writeLocked) {
00191             exclusiveMode = false;
00192             exclusionMutex.unlock();
00193         }
00194     }
00195 
00196     bool isWriteLocked()
00197     {
00198         return writeLocked;
00199     }
00200 
00201     void stopTheWorld()
00202     {
00203         writeLock();
00204         rcu_defer_barrier();
00205     }
00206     
00207     void restartTheWorld()
00208     {
00209         writeUnlock();
00210     }
00211 
00212     bool initialized;
00213     int readLocked;
00214     bool lockedExclusive;
00215     int writeLocked;
00216     int threadNum;
00217 
00218     pthread_key_t tssKey;
00219 };
00220 
00221 int ThreadData::numInRead = 0;
00222 
00223 #if 0
00224 ThreadDataDeleter::
00225 ~ThreadDataDeleter()
00226 {
00227     delete data;
00228 }
00229 
00230 // Will be leaked...
00231  __thread ThreadData * threadData = 0;
00232 
00233 
00234 #endif
00235 
00236 ML::Thread_Specific<ThreadData> threadData;
00237 
00238 ThreadData & getThreadData()
00239 {
00240     return *threadData;
00241 
00242 #if 0
00243     if (!threadData.get())
00244         threadData.reset(new ThreadData());
00245     return *threadData;
00246 #endif
00247 }
00248 
00249 int getThreadNum()
00250 {
00251     return getThreadData().threadNum;
00252 }
00253 
00254 void readLock()
00255 {
00256     getThreadData().readLock();
00257 }
00258 
00259 void readUnlock()
00260 {
00261     getThreadData().readUnlock();
00262 }
00263 
00264 bool isReadLocked()
00265 {
00266     return getThreadData().isReadLocked();
00267 }
00268 
00269 void waitForGC()
00270 {
00271     rcu_defer_barrier();
00272 }
00273 
00274 // These are stubs at the moment but will have to become something better...
00275 
00276 void writeLock()
00277 {
00278     getThreadData().writeLock();
00279 }
00280 
00281 void writeUnlock()
00282 {
00283     getThreadData().writeUnlock();
00284 }
00285 
00286 bool isWriteLocked()
00287 {
00288     return getThreadData().isWriteLocked();
00289 }
00290 
00291 void stopTheWorld()
00292 {
00293     getThreadData().stopTheWorld();
00294 }
00295 
00296 void restartTheWorld()
00297 {
00298     getThreadData().restartTheWorld();
00299 }
00300 
00301 void doDeferredGc(void * workBlock)
00302 {
00303     boost::function<void ()> * work
00304         = reinterpret_cast<boost::function<void ()> *>(workBlock);
00305     try {
00306         (*work)();
00307     } catch (...) {
00308         delete work;
00309         throw;
00310     }
00311     
00312     delete work;
00313 }
00314 
00315 void deferGC(const boost::function<void ()> & work)
00316 {
00317     defer_rcu(&doDeferredGc,
00318               new boost::function<void ()>(work));
00319 }
00320 
00321 
00322 //    boost::lock_guard<boost::mutex> lock(m);
00323 
00324 
00325 } // namespace MMap
00326 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator