![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rcu_lock.h -*- C++ -*- 00002 Jeremy Barnes, 20 November 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Garbage collection lock using userspace RCU. 00006 */ 00007 00008 #ifndef __mmap__rcu_lock_h__ 00009 #define __mmap__rcu_lock_h__ 00010 00011 #include <urcu.h> 00012 #include <vector> 00013 #include <boost/function.hpp> 00014 #include "jml/compiler/compiler.h" 00015 #include "jml/utils/exc_assert.h" 00016 #include "jml/arch/rwlock.h" 00017 #include "jml/arch/thread_specific.h" 00018 00019 namespace Datacratic { 00020 00021 struct RcuLock { 00022 00024 struct ThreadGcInfoEntry { 00025 ThreadGcInfoEntry() 00026 : readLocked(0), lockedExclusive(false), writeLocked(0) 00027 { 00028 } 00029 00030 int readLocked; 00031 bool lockedExclusive; 00032 int writeLocked; 00033 }; 00034 00036 struct ThreadGcInfo { 00037 ThreadGcInfo() 00038 { 00039 static int threadInfoNum = 0; 00040 threadNum = __sync_fetch_and_add(&threadInfoNum, 1); 00041 rcu_register_thread(); 00042 rcu_defer_register_thread(); 00043 } 00044 00045 ~ThreadGcInfo() 00046 { 00047 rcu_unregister_thread(); 00048 rcu_defer_unregister_thread(); 00049 } 00050 00051 int threadNum; 00052 00053 std::vector<ThreadGcInfoEntry> info; 00054 00055 ThreadGcInfoEntry & operator [] (int index) 00056 { 00057 ExcAssertGreaterEqual(index, 0); 00058 if (info.size() <= index) 00059 info.resize(index + 1); 00060 return info[index]; 00061 } 00062 }; 00063 00064 static ML::Thread_Specific<ThreadGcInfo> gcInfo; 00065 bool exclusiveMode; 00066 ML::RWLock exclusionMutex; 00067 int index; 00068 00069 void enterCS() 00070 { 00071 rcu_read_lock(); 00072 } 00073 00074 void exitCS() 00075 { 00076 rcu_read_unlock(); 00077 } 00078 00079 int myEpoch() const 00080 { 00081 return -1; 00082 } 00083 00084 int currentEpoch() const 00085 { 00086 return -1; 00087 } 00088 00089 JML_ALWAYS_INLINE ThreadGcInfoEntry & getEntry() const 00090 { 00091 ThreadGcInfo & info = *gcInfo; 00092 return info[index]; 00093 } 00094 00095 static int currentIndex; 00096 00097 RcuLock() 00098 : exclusiveMode(false), 00099 index(currentIndex + 1) 00100 { 00101 } 00102 00103 void lockShared() 00104 { 00105 ThreadGcInfoEntry & entry = getEntry(); 00106 00107 //cerr << "lockShared: readLocked " << entry.readLocked 00108 // << " writeLocked: " << entry.writeLocked 00109 // << " exclusive " << exclusiveMode << endl; 00110 00111 while (!entry.readLocked && !entry.writeLocked) { 00112 entry.lockedExclusive = exclusiveMode; 00113 00114 // If something else wanted an exclusive lock then we are in 00115 // exclusive mode and we have to acquire the RW lock beore we 00116 // can continue 00117 if (JML_UNLIKELY(entry.lockedExclusive)) 00118 exclusionMutex.lock_shared(); 00119 00120 //cerr << "entering" << endl; 00121 enterCS(); 00122 00123 // Avoid racing with the update of the exlusive lock... 00124 // TODO: this needs to be well tested 00125 if (entry.lockedExclusive != exclusiveMode) { 00126 //cerr << "reexiting" << endl; 00127 exitCS(); 00128 continue; 00129 } 00130 00131 break; 00132 } 00133 00134 ++entry.readLocked; 00135 } 00136 00137 void unlockShared() 00138 { 00139 ThreadGcInfoEntry & entry = getEntry(); 00140 00141 if (entry.readLocked <= 0) 00142 throw ML::Exception("bad read lock nesting"); 00143 --entry.readLocked; 00144 if (!entry.readLocked && !entry.writeLocked) { 00145 if (entry.lockedExclusive) exclusionMutex.unlock_shared(); 00146 exitCS(); 00147 } 00148 } 00149 00150 bool isLockedShared() 00151 { 00152 ThreadGcInfoEntry & entry = getEntry(); 00153 return entry.readLocked || entry.writeLocked; 00154 } 00155 00156 void lockExclusive() 00157 { 00158 ThreadGcInfoEntry & entry = getEntry(); 00159 00160 if (entry.readLocked) 00161 throw ML::Exception("can't acquire write lock with read lock held"); 00162 00163 if (!entry.writeLocked) { 00164 exclusionMutex.lock(); 00165 //cerr << "entering exclusive mode numInRead = " << numInRead << endl; 00166 ExcAssert(!exclusiveMode); 00167 exclusiveMode = true; 00168 //ML::memory_barrier(); 00169 visibleBarrier(); 00170 00171 //ExcAssertEqual(numInRead, 0); 00172 } 00173 00174 ++entry.writeLocked; 00175 } 00176 00177 void unlockExclusive() 00178 { 00179 ThreadGcInfoEntry & entry = getEntry(); 00180 00181 if (entry.writeLocked <= 0) 00182 throw ML::Exception("bad write lock nesting"); 00183 --entry.writeLocked; 00184 if (!entry.writeLocked) { 00185 exclusiveMode = false; 00186 exclusionMutex.unlock(); 00187 } 00188 } 00189 00190 bool isLockedExclusive() 00191 { 00192 ThreadGcInfoEntry & entry = getEntry(); 00193 return entry.writeLocked; 00194 } 00195 00196 void visibleBarrier() 00197 { 00198 synchronize_rcu(); 00199 } 00200 00201 void deferBarrier() 00202 { 00203 rcu_defer_barrier(); 00204 } 00205 00206 static void callFn(void * arg) 00207 { 00208 boost::function<void ()> * fn 00209 = reinterpret_cast<boost::function<void ()> *>(arg); 00210 try { 00211 (*fn)(); 00212 } catch (...) { 00213 delete fn; 00214 throw; 00215 } 00216 delete fn; 00217 } 00218 00219 void defer(boost::function<void ()> work) 00220 { 00221 defer(callFn, new boost::function<void ()>(work)); 00222 } 00223 00224 void defer(void (work) (void *), void * arg) 00225 { 00226 getEntry(); // make sure thread is registered 00227 defer_rcu(work, arg); 00228 } 00229 00230 void defer(void (work) (void *, void *), void * arg1, void * arg2) 00231 { 00232 defer([=] () { work(arg1, arg2); }); 00233 } 00234 00235 void defer(void (work) (void *, void *, void *), void * arg1, void * arg2, void * arg3) 00236 { 00237 defer([=] () { work(arg1, arg2, arg3); }); 00238 } 00239 00240 void dump() 00241 { 00242 } 00243 }; 00244 00245 } // namespace Datacratic 00246 00247 00248 #endif /* __mmap__rcu_lock_h__ */