RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/gc/rcu_lock.h
00001 /* rcu_lock.h                                                      -*- C++ -*-
00002    Jeremy Barnes, 20 November 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Garbage collection lock using userspace RCU.
00006 */
00007 
00008 #ifndef __mmap__rcu_lock_h__
00009 #define __mmap__rcu_lock_h__
00010 
00011 #include <urcu.h>
00012 #include <vector>
00013 #include <boost/function.hpp>
00014 #include "jml/compiler/compiler.h"
00015 #include "jml/utils/exc_assert.h"
00016 #include "jml/arch/rwlock.h"
00017 #include "jml/arch/thread_specific.h"
00018 
00019 namespace Datacratic {
00020 
00021 struct RcuLock {
00022 
00024     struct ThreadGcInfoEntry {
00025         ThreadGcInfoEntry()
00026             : readLocked(0), lockedExclusive(false), writeLocked(0)
00027         {
00028         }
00029 
00030         int readLocked;
00031         bool lockedExclusive;
00032         int writeLocked;
00033     };
00034 
00036     struct ThreadGcInfo {
00037         ThreadGcInfo()
00038         {
00039             static int threadInfoNum = 0;
00040             threadNum = __sync_fetch_and_add(&threadInfoNum, 1);
00041             rcu_register_thread();
00042             rcu_defer_register_thread();
00043         }
00044 
00045         ~ThreadGcInfo()
00046         {
00047             rcu_unregister_thread();
00048             rcu_defer_unregister_thread();
00049         }
00050 
00051         int threadNum;
00052 
00053         std::vector<ThreadGcInfoEntry> info;
00054 
00055         ThreadGcInfoEntry & operator [] (int index)
00056         {
00057             ExcAssertGreaterEqual(index, 0);
00058             if (info.size() <= index)
00059                 info.resize(index + 1);
00060             return info[index];
00061         }
00062     };
00063 
00064     static ML::Thread_Specific<ThreadGcInfo> gcInfo;  
00065     bool exclusiveMode;  
00066     ML::RWLock exclusionMutex;  
00067     int index;     
00068 
00069     void enterCS()
00070     {
00071         rcu_read_lock();
00072     }
00073 
00074     void exitCS()
00075     {
00076         rcu_read_unlock();
00077     }
00078 
00079     int myEpoch() const
00080     {
00081         return -1;
00082     }
00083 
00084     int currentEpoch() const
00085     {
00086         return -1;
00087     }
00088 
00089     JML_ALWAYS_INLINE ThreadGcInfoEntry & getEntry() const
00090     {
00091         ThreadGcInfo & info = *gcInfo;
00092         return info[index];
00093     }
00094 
00095     static int currentIndex;
00096 
00097     RcuLock()
00098         : exclusiveMode(false),
00099           index(currentIndex + 1)
00100     {
00101     }
00102 
00103     void lockShared()
00104     {
00105         ThreadGcInfoEntry & entry = getEntry();
00106 
00107         //cerr << "lockShared: readLocked " << entry.readLocked
00108         //     << " writeLocked: " << entry.writeLocked
00109         //     << " exclusive " << exclusiveMode << endl;
00110 
00111         while (!entry.readLocked && !entry.writeLocked) {
00112             entry.lockedExclusive = exclusiveMode;
00113 
00114             // If something else wanted an exclusive lock then we are in
00115             // exclusive mode and we have to acquire the RW lock beore we
00116             // can continue
00117             if (JML_UNLIKELY(entry.lockedExclusive))
00118                 exclusionMutex.lock_shared();
00119 
00120             //cerr << "entering" << endl;
00121             enterCS();
00122 
00123             // Avoid racing with the update of the exlusive lock...
00124             // TODO: this needs to be well tested
00125             if (entry.lockedExclusive != exclusiveMode) {
00126                 //cerr << "reexiting" << endl;
00127                 exitCS();
00128                 continue;
00129             }
00130             
00131             break;
00132         }
00133 
00134         ++entry.readLocked;
00135     }
00136 
00137     void unlockShared()
00138     {
00139         ThreadGcInfoEntry & entry = getEntry();
00140 
00141         if (entry.readLocked <= 0)
00142             throw ML::Exception("bad read lock nesting");
00143         --entry.readLocked;
00144         if (!entry.readLocked && !entry.writeLocked) {
00145             if (entry.lockedExclusive) exclusionMutex.unlock_shared();
00146             exitCS();
00147         }
00148     }
00149 
00150     bool isLockedShared()
00151     {
00152         ThreadGcInfoEntry & entry = getEntry();
00153         return entry.readLocked || entry.writeLocked;
00154     }
00155 
00156     void lockExclusive()
00157     {
00158         ThreadGcInfoEntry & entry = getEntry();
00159 
00160         if (entry.readLocked)
00161             throw ML::Exception("can't acquire write lock with read lock held");
00162 
00163         if (!entry.writeLocked) {
00164             exclusionMutex.lock();
00165             //cerr << "entering exclusive mode numInRead = " << numInRead << endl;
00166             ExcAssert(!exclusiveMode);
00167             exclusiveMode = true;
00168             //ML::memory_barrier();
00169             visibleBarrier();
00170 
00171             //ExcAssertEqual(numInRead, 0);
00172         }
00173 
00174         ++entry.writeLocked;
00175     }
00176 
00177     void unlockExclusive()
00178     {
00179         ThreadGcInfoEntry & entry = getEntry();
00180 
00181         if (entry.writeLocked <= 0)
00182             throw ML::Exception("bad write lock nesting");
00183         --entry.writeLocked;
00184         if (!entry.writeLocked) {
00185             exclusiveMode = false;
00186             exclusionMutex.unlock();
00187         }
00188     }
00189 
00190     bool isLockedExclusive()
00191     {
00192         ThreadGcInfoEntry & entry = getEntry();
00193         return entry.writeLocked;
00194     }
00195 
00196     void visibleBarrier()
00197     {
00198         synchronize_rcu();
00199     }
00200 
00201     void deferBarrier()
00202     {
00203         rcu_defer_barrier();
00204     }
00205 
00206     static void callFn(void * arg)
00207     {
00208         boost::function<void ()> * fn
00209             = reinterpret_cast<boost::function<void ()> *>(arg);
00210         try {
00211             (*fn)();
00212         } catch (...) {
00213             delete fn;
00214             throw;
00215         }
00216         delete fn;
00217     }
00218 
00219     void defer(boost::function<void ()> work)
00220     {
00221         defer(callFn, new boost::function<void ()>(work));
00222     }
00223 
00224     void defer(void (work) (void *), void * arg)
00225     {
00226         getEntry();  // make sure thread is registered
00227         defer_rcu(work, arg);
00228     }
00229 
00230     void defer(void (work) (void *, void *), void * arg1, void * arg2)
00231     {
00232         defer([=] () { work(arg1, arg2); });
00233     }
00234 
00235     void defer(void (work) (void *, void *, void *), void * arg1, void * arg2, void * arg3)
00236     {
00237         defer([=] () { work(arg1, arg2, arg3); });
00238     }
00239 
00240     void dump()
00241     {
00242     }
00243 };
00244 
00245 } // namespace Datacratic
00246 
00247 
00248 #endif /* __mmap__rcu_lock_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator