RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* gc_lock.cc 00002 Jeremy Barnes, 19 November 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 */ 00005 00006 #include "soa/gc/gc_lock.h" 00007 #include "jml/arch/tick_counter.h" 00008 #include "jml/arch/spinlock.h" 00009 #include "jml/arch/futex.h" 00010 #include "jml/utils/exc_check.h" 00011 #include "jml/utils/guard.h" 00012 00013 #include <boost/interprocess/sync/named_mutex.hpp> 00014 #include <boost/interprocess/sync/scoped_lock.hpp> 00015 #include <boost/static_assert.hpp> 00016 #include <sys/mman.h> 00017 #include <fcntl.h> 00018 #include <unistd.h> 00019 #include <iostream> 00020 00021 using namespace std; 00022 using namespace ML; 00023 namespace ipc = boost::interprocess; 00024 00025 namespace Datacratic { 00026 00027 00028 /*****************************************************************************/ 00029 /* Utility */ 00030 /*****************************************************************************/ 00031 00035 int32_t gcLockStartingEpoch = 0; 00036 00040 template<typename T, size_t Bits = sizeof(T)*8> 00041 static inline 00042 int 00043 compareEpochs (T a, T b) 00044 { 00045 BOOST_STATIC_ASSERT(Bits >= 2); 00046 00047 if (a == b) return 0; 00048 00049 enum { MASK = (3ULL << (Bits - 2)) }; 00050 00051 // We use the last 2 bits for overflow detection. 00052 // We don't use T to avoid problems with the sign bit. 00053 const uint64_t aMasked = a & MASK; 00054 const uint64_t bMasked = b & MASK; 00055 00056 // Normal case. 00057 if (aMasked == bMasked) return a < b ? -1 : 1; 00058 00059 // Check for overflow. 00060 else if (aMasked == 0) return bMasked == MASK ? 1 : -1; 00061 else if (bMasked == 0) return aMasked == MASK ? -1 : 1; 00062 00063 // No overflow so just compare the masks. 00064 return aMasked < bMasked ? -1 : 1; 00065 } 00066 00067 00068 /*****************************************************************************/ 00069 /* GC LOCK BASE */ 00070 /*****************************************************************************/ 00071 00072 struct DeferredEntry1 { 00073 DeferredEntry1(void (fn) (void *) = 0, void * data = 0) 00074 : fn(fn), data(data) 00075 { 00076 } 00077 00078 void run() 00079 { 00080 fn(data); 00081 } 00082 00083 void (*fn) (void *); 00084 void * data; 00085 }; 00086 00087 struct DeferredEntry2 { 00088 DeferredEntry2(void (fn) (void *, void *) = 0, void * data1 = 0, 00089 void * data2 = 0) 00090 : fn(fn), data1(data1), data2(data2) 00091 { 00092 } 00093 00094 void run() 00095 { 00096 fn(data1, data2); 00097 } 00098 00099 00100 void (*fn) (void *, void *); 00101 void * data1; 00102 void * data2; 00103 }; 00104 00105 struct DeferredEntry3 { 00106 DeferredEntry3(void (fn) (void *, void *, void *) = 0, void * data1 = 0, 00107 void * data2 = 0, void * data3 = 0) 00108 : fn(fn), data1(data1), data2(data2), data3(data3) 00109 { 00110 } 00111 00112 void run() 00113 { 00114 fn(data1, data2, data3); 00115 } 00116 00117 00118 void (*fn) (void *, void *, void *); 00119 void * data1; 00120 void * data2; 00121 void * data3; 00122 }; 00123 00124 00126 struct GcLockBase::DeferredList { 00127 DeferredList() 00128 { 00129 } 00130 00131 ~DeferredList() 00132 { 00133 //if (lock.locked()) 00134 // throw ML::Exception("deleting deferred in locked condition"); 00135 if (size() != 0) { 00136 cerr << "deleting non-empty deferred with " << size() 00137 << " entries" << endl; 00138 //throw ML::Exception("deleting non-empty deferred"); 00139 } 00140 } 00141 00142 void swap(DeferredList & other) 00143 { 00144 //ExcAssertEqual(lock.locked(), 0); 00145 //ExcAssertEqual(other.lock.locked(), 0); 00146 00147 //boost::lock_guard<ML::Spinlock> guard(lock); 00148 //boost::lock_guard<ML::Spinlock> guard2(other.lock); 00149 00150 deferred1.swap(other.deferred1); 00151 deferred2.swap(other.deferred2); 00152 deferred3.swap(other.deferred3); 00153 } 00154 00155 std::vector<DeferredEntry1> deferred1; 00156 std::vector<DeferredEntry2> deferred2; 00157 std::vector<DeferredEntry3> deferred3; 00158 //mutable ML::Spinlock lock; 00159 00160 bool addDeferred(int forEpoch, void (fn) (void *), void * data) 00161 { 00162 //boost::lock_guard<ML::Spinlock> guard(lock); 00163 deferred1.push_back(DeferredEntry1(fn, data)); 00164 return true; 00165 } 00166 00167 bool addDeferred(int forEpoch, void (fn) (void *, void *), 00168 void * data1, void * data2) 00169 { 00170 //boost::lock_guard<ML::Spinlock> guard(lock); 00171 deferred2.push_back(DeferredEntry2(fn, data1, data2)); 00172 return true; 00173 } 00174 00175 bool addDeferred(int forEpoch, void (fn) (void *, void *, void *), 00176 void * data1, void * data2, void * data3) 00177 { 00178 //boost::lock_guard<ML::Spinlock> guard(lock); 00179 deferred3.push_back(DeferredEntry3(fn, data1, data2, data3)); 00180 return true; 00181 } 00182 00183 size_t size() const 00184 { 00185 //boost::lock_guard<ML::Spinlock> guard(lock); 00186 return deferred1.size() + deferred2.size() + deferred3.size(); 00187 } 00188 00189 void runAll() 00190 { 00191 // Spinlock should be unnecessary... 00192 //boost::lock_guard<ML::Spinlock> guard(lock); 00193 00194 for (unsigned i = 0; i < deferred1.size(); ++i) { 00195 try { 00196 deferred1[i].run(); 00197 } catch (...) { 00198 } 00199 } 00200 00201 deferred1.clear(); 00202 00203 for (unsigned i = 0; i < deferred2.size(); ++i) { 00204 try { 00205 deferred2[i].run(); 00206 } catch (...) { 00207 } 00208 } 00209 00210 deferred2.clear(); 00211 00212 for (unsigned i = 0; i < deferred3.size(); ++i) { 00213 try { 00214 deferred3[i].run(); 00215 } catch (...) { 00216 } 00217 } 00218 00219 deferred3.clear(); 00220 } 00221 }; 00222 00223 struct GcLockBase::Deferred { 00224 mutable ML::Spinlock lock; 00225 std::map<int32_t, DeferredList *> entries; 00226 std::vector<DeferredList *> spares; 00227 00228 bool empty() const 00229 { 00230 boost::lock_guard<ML::Spinlock> guard(lock); 00231 return entries.empty(); 00232 } 00233 }; 00234 00235 std::string 00236 GcLockBase::ThreadGcInfoEntry:: 00237 print() const 00238 { 00239 return ML::format("inEpoch: %d, readLocked: %d, writeLocked: %d", 00240 inEpoch, readLocked, writeLocked); 00241 } 00242 00243 inline GcLockBase::Data:: 00244 Data() : 00245 bits(0), bits2(0) 00246 { 00247 epoch = gcLockStartingEpoch; // makes it easier to test overflows. 00248 visibleEpoch = epoch; 00249 } 00250 00251 inline GcLockBase::Data:: 00252 Data(const Data & other) 00253 { 00254 //ML::ticks(); 00255 q = other.q; 00256 //ML::ticks(); 00257 } 00258 00259 inline GcLockBase::Data & 00260 GcLockBase::Data:: 00261 operator = (const Data & other) 00262 { 00263 //ML::ticks(); 00264 this->q = other.q; 00265 //ML::ticks(); 00266 return *this; 00267 } 00268 00269 void 00270 GcLockBase::Data:: 00271 validate() const 00272 { 00273 try { 00274 // Visible is at most 2 behind current 00275 ExcAssertGreaterEqual(compareEpochs(visibleEpoch, epoch - 2), 0); 00276 00277 // If nothing is in a critical section then only the current is 00278 // visible 00279 if (inOld() == 0 && inCurrent() == 0) 00280 ExcAssertEqual(visibleEpoch, epoch); 00281 00282 // If nothing is in the old critical section then it's not visible 00283 else if (inOld() == 0) 00284 ExcAssertEqual(visibleEpoch, epoch - 1); 00285 00286 else ExcAssertEqual(visibleEpoch, epoch - 2); 00287 } catch (const std::exception & exc) { 00288 cerr << "exception validating GcLock: " << exc.what() << endl; 00289 cerr << "current: " << print() << endl; 00290 throw; 00291 } 00292 } 00293 00294 inline bool 00295 GcLockBase::Data:: 00296 calcVisibleEpoch() 00297 { 00298 Data old = *this; 00299 00300 int oldValue = visibleEpoch; 00301 00302 // Set the visible epoch 00303 if (inCurrent() == 0 && inOld() == 0) 00304 visibleEpoch = epoch; 00305 else if (inOld() == 0) 00306 visibleEpoch = epoch - 1; 00307 else visibleEpoch = epoch - 2; 00308 00309 if (compareEpochs(visibleEpoch, oldValue) < 0) { 00310 cerr << "old = " << old.print() << endl; 00311 cerr << "new = " << print() << endl; 00312 } 00313 00314 // Visible epoch must be monotonic increasing 00315 ExcAssertGreaterEqual(compareEpochs(visibleEpoch, oldValue), 0); 00316 00317 return oldValue != visibleEpoch; 00318 } 00319 00320 std::string 00321 GcLockBase::Data:: 00322 print() const 00323 { 00324 return ML::format("epoch: %d, in: %d, in-1: %d, visible: %d, exclusive: %d", 00325 epoch, inCurrent(), inOld(), visibleEpoch, exclusive); 00326 } 00327 00328 GcLockBase:: 00329 GcLockBase() 00330 { 00331 deferred = new Deferred(); 00332 } 00333 00334 GcLockBase:: 00335 ~GcLockBase() 00336 { 00337 if (!deferred->empty()) { 00338 dump(); 00339 } 00340 00341 delete deferred; 00342 } 00343 00344 bool 00345 GcLockBase:: 00346 updateData(Data & oldValue, Data & newValue) 00347 { 00348 bool wake; 00349 try { 00350 ExcAssertGreaterEqual(compareEpochs(newValue.epoch, oldValue.epoch), 0); 00351 wake = newValue.calcVisibleEpoch(); 00352 } catch (...) { 00353 cerr << "update: oldValue = " << oldValue.print() << endl; 00354 cerr << "newValue = " << newValue.print() << endl; 00355 throw; 00356 } 00357 00358 newValue.validate(); 00359 00360 #if 0 00361 // Do an extra check before we assert lock 00362 Data upToDate = *data; 00363 if (upToDate != oldValue) { 00364 oldValue = upToDate; 00365 return false; 00366 } 00367 #endif 00368 00369 if (!ML::cmp_xchg(data->q, oldValue.q, newValue.q)) return false; 00370 00371 if (wake) { 00372 // We updated the current visible epoch. We can now wake up 00373 // anything that was waiting for it to be visible and run any 00374 // deferred handlers. 00375 futex_wake(data->visibleEpoch); 00376 runDefers(); 00377 } 00378 00379 return true; 00380 } 00381 00382 void 00383 GcLockBase:: 00384 runDefers() 00385 { 00386 std::vector<DeferredList *> toRun; 00387 { 00388 boost::lock_guard<ML::Spinlock> guard(deferred->lock); 00389 toRun = checkDefers(); 00390 } 00391 00392 for (unsigned i = 0; i < toRun.size(); ++i) { 00393 toRun[i]->runAll(); 00394 delete toRun[i]; 00395 } 00396 } 00397 00398 std::vector<GcLockBase::DeferredList *> 00399 GcLockBase:: 00400 checkDefers() 00401 { 00402 std::vector<DeferredList *> result; 00403 00404 while (!deferred->entries.empty() && 00405 compareEpochs( 00406 deferred->entries.begin()->first, 00407 data->visibleEpoch) <= 0) 00408 { 00409 result.reserve(deferred->entries.size()); 00410 00411 for (auto it = deferred->entries.begin(), 00412 end = deferred->entries.end(); 00413 it != end; /* no inc */) { 00414 00415 if (compareEpochs(it->first, data->visibleEpoch) > 0) 00416 break; // still visible 00417 00418 ExcAssert(it->second); 00419 result.push_back(it->second); 00420 //it->second->runAll(); 00421 auto toDelete = it; 00422 it = boost::next(it); 00423 deferred->entries.erase(toDelete); 00424 } 00425 } 00426 00427 return result; 00428 } 00429 00430 void 00431 GcLockBase:: 00432 enterCS(ThreadGcInfoEntry * entry) 00433 { 00434 if (!entry) entry = &getEntry(); 00435 00436 ExcAssertEqual(entry->inEpoch, -1); 00437 00438 #if 0 // later... 00439 // Be optimistic... 00440 int optimisticEpoch = data->epoch; 00441 if (__sync_add_and_fetch(data->in + (optimisticEpoch & 1), 1) > 1 00442 && data->epoch == optimisticEpoch) { 00443 entry->inEpoch = optimisticEpoch & 1; 00444 return; 00445 } 00446 00447 // undo optimism 00448 __sync_add_and_fetch(data->in + (optimisticEpoch & 1), -1); 00449 #endif // optimistic 00450 00451 Data current = *data; 00452 00453 for (;;) { 00454 Data newValue = current; 00455 00456 if (newValue.exclusive) { 00457 futex_wait(data->exclusive, 1); 00458 current = *data; 00459 continue; 00460 } 00461 00462 if (newValue.inOld() == 0) { 00463 // We're entering a new epoch 00464 newValue.epoch += 1; 00465 newValue.setIn(newValue.epoch, 1); 00466 } 00467 else { 00468 // No new epoch as the old one isn't finished yet 00469 newValue.addIn(newValue.epoch, 1); 00470 } 00471 00472 entry->inEpoch = newValue.epoch & 1; 00473 00474 if (updateData(current, newValue)) break; 00475 } 00476 } 00477 00478 void 00479 GcLockBase:: 00480 exitCS(ThreadGcInfoEntry * entry) 00481 { 00482 if (!entry) entry = &getEntry(); 00483 00484 if (entry->inEpoch == -1) 00485 throw ML::Exception("not in a CS"); 00486 00487 // Fast path 00488 if (__sync_fetch_and_add(data->in + (entry->inEpoch & 1), -1) > 1) { 00489 entry->inEpoch = -1; 00490 return; 00491 } 00492 00493 // Slow path; an epoch may have come to an end 00494 00495 Data current = *data; 00496 00497 for (;;) { 00498 Data newValue = current; 00499 00500 //newValue.addIn(entry->inEpoch, -1); 00501 00502 if (updateData(current, newValue)) break; 00503 } 00504 00505 entry->inEpoch = -1; 00506 } 00507 00508 void 00509 GcLockBase:: 00510 enterCSExclusive(ThreadGcInfoEntry * entry) 00511 { 00512 if (!entry) entry = &getEntry(); 00513 00514 ExcAssertEqual(entry->inEpoch, -1); 00515 00516 Data current = *data, newValue; 00517 00518 for (;;) { 00519 if (current.exclusive) { 00520 futex_wait(data->exclusive, 1); 00521 current = *data; 00522 continue; 00523 } 00524 00525 ExcAssertEqual(current.exclusive, 0); 00526 00527 // TODO: single cmp/xchg on just exclusive rather than the whole lot? 00528 //int old = 0; 00529 //if (!ML::cmp_xchg(data->exclusive, old, 1)) 00530 // continue; 00531 00532 newValue = current; 00533 newValue.exclusive = 1; 00534 if (updateData(current, newValue)) { 00535 current = newValue; 00536 break; 00537 } 00538 } 00539 00540 ExcAssertEqual(data->exclusive, 1); 00541 00542 // At this point, we have exclusive access... now wait for everything else 00543 // to exit. This is kind of a critical section barrier. 00544 int startEpoch = current.epoch; 00545 00546 #if 1 00547 visibleBarrier(); 00548 #else 00549 00550 for (unsigned i = 0; ; ++i, current = *data) { 00551 00552 if (current.visibleEpoch == current.epoch 00553 && current.inCurrent() == 0 && current.inOld() == 0) 00554 break; 00555 00556 long res = futex_wait(data->visibleEpoch, current.visibleEpoch); 00557 if (res == -1) { 00558 if (errno == EAGAIN) continue; 00559 throw ML::Exception(errno, "futex_wait"); 00560 } 00561 } 00562 #endif 00563 00564 ExcAssertEqual(data->epoch, startEpoch); 00565 00566 00567 #if 0 00568 // Testing 00569 for (unsigned i = 0; i < 100; ++i) { 00570 Data current = *data; 00571 00572 try { 00573 ExcAssertEqual(current.exclusive, 1); 00574 ExcAssertEqual(current.inCurrent(), 0); 00575 ExcAssertEqual(current.inOld(), 0); 00576 } catch (...) { 00577 ThreadGcInfoEntry & entry = getEntry(); 00578 cerr << "entry->inEpoch = " << entry->inEpoch << endl; 00579 cerr << "entry->readLocked = " << entry->readLocked << endl; 00580 cerr << "entry->writeLocked = " << entry->writeLocked << endl; 00581 cerr << "current: " << current.print() << endl; 00582 cerr << "data: " << data->print() << endl; 00583 throw; 00584 } 00585 } 00586 #endif 00587 00588 ExcAssertEqual(data->epoch, startEpoch); 00589 00590 entry->inEpoch = startEpoch & 1; 00591 } 00592 00593 void 00594 GcLockBase:: 00595 exitCSExclusive(ThreadGcInfoEntry * entry) 00596 { 00597 if (!entry) entry = &getEntry(); 00598 #if 0 00599 Data current = *data; 00600 00601 try { 00602 ExcAssertEqual(current.exclusive, 1); 00603 ExcAssertEqual(current.inCurrent(), 0); 00604 ExcAssertEqual(current.inOld(), 0); 00605 } catch (...) { 00606 cerr << "entry->inEpoch = " << entry->inEpoch << endl; 00607 cerr << "entry->readLocked = " << entry->readLocked << endl; 00608 cerr << "entry->writeLocked = " << entry->writeLocked << endl; 00609 cerr << "current: " << current.print() << endl; 00610 cerr << "data: " << data->print() << endl; 00611 throw; 00612 } 00613 #endif 00614 00615 ML::memory_barrier(); 00616 00617 int old = 1; 00618 if (!ML::cmp_xchg(data->exclusive, old, 0)) 00619 throw ML::Exception("error exiting exclusive mode"); 00620 00621 // Wake everything waiting on the exclusive lock 00622 futex_wake(data->exclusive); 00623 00624 entry->inEpoch = -1; 00625 } 00626 00627 void 00628 GcLockBase:: 00629 visibleBarrier() 00630 { 00631 ML::memory_barrier(); 00632 00633 ThreadGcInfoEntry & entry = getEntry(); 00634 00635 if (entry.inEpoch != -1) 00636 throw ML::Exception("visibleBarrier called in critical section will " 00637 "deadlock"); 00638 00639 Data current = *data; 00640 int startEpoch = data->epoch; 00641 //int startVisible = data.visibleEpoch; 00642 00643 // Spin until we're visible 00644 for (unsigned i = 0; ; ++i, current = *data) { 00645 00646 //int i = startEpoch & 1; 00647 00648 // Have we moved on? If we're 2 epochs ahead we're surely not visible 00649 if (current.epoch != startEpoch && current.epoch != startEpoch + 1) { 00650 //cerr << "epoch moved on" << endl; 00651 return; 00652 } 00653 00654 // If there's nothing in a critical section then we're OK 00655 if (current.inCurrent() == 0 && current.inOld() == 0) 00656 return; 00657 00658 if (current.visibleEpoch == startEpoch) 00659 return; 00660 00661 if (i % 128 == 127 || true) { 00662 long res = futex_wait(data->visibleEpoch, current.visibleEpoch); 00663 if (res == -1) { 00664 if (errno == EAGAIN) continue; 00665 throw ML::Exception(errno, "futex_wait"); 00666 } 00667 } 00668 } 00669 } 00670 00671 void 00672 GcLockBase:: 00673 deferBarrier() 00674 { 00675 // TODO: what is the interaction between a defer barrier and an exclusive 00676 // lock? 00677 00678 ThreadGcInfoEntry & entry = getEntry(); 00679 00680 visibleBarrier(); 00681 00682 // Do it twice to make sure that everything is cycled over two different 00683 // epochs 00684 for (unsigned i = 0; i < 2; ++i) { 00685 00686 // If we're in a critical section, we'll wait forever... 00687 ExcAssertEqual(entry.inEpoch, -1); 00688 00689 // What does "defer barrier" mean? It means that we wait until everything 00690 // that is currently enqueued to be deferred is finished. 00691 00692 // TODO: this is a very inefficient implementation... we could do a lot 00693 // better especially in the non-contended case 00694 00695 int lock = 0; 00696 00697 defer(futex_unlock, &lock); 00698 00699 ML::atomic_add(lock, -1); 00700 00701 futex_wait(lock, -1); 00702 } 00703 00704 // If certain threads aren't allowed to execute deferred work 00705 // then it's possible that not all deferred work will have been executed. 00706 // To be sure, we run any leftover work. 00707 runDefers(); 00708 } 00709 00711 static void callFn(void * arg) 00712 { 00713 boost::function<void ()> * fn 00714 = reinterpret_cast<boost::function<void ()> *>(arg); 00715 try { 00716 (*fn)(); 00717 } catch (...) { 00718 delete fn; 00719 throw; 00720 } 00721 delete fn; 00722 } 00723 00724 void 00725 GcLockBase:: 00726 defer(boost::function<void ()> work) 00727 { 00728 defer(callFn, new boost::function<void ()>(work)); 00729 } 00730 00731 template<typename... Args> 00732 void 00733 GcLockBase:: 00734 doDefer(void (fn) (Args...), Args... args) 00735 { 00736 // INVARIANT 00737 // If there is another thread that is provably in a critical section at 00738 // this moment, then the function will only be run when all such threads 00739 // have exited the critical section. 00740 // 00741 // If there are no threads in either the current or the old epoch, then 00742 // we can run it straight away. 00743 // 00744 // If there are threads in the old epoch but not the current epoch, then 00745 // we need to wait until all threads have exited the old epoch. In other 00746 // words, it goes on the old epoch's defer queue. 00747 // 00748 // If there are threads in the current epoch (irrespective of the old 00749 // epoch) then we need to wait until the current epoch is done. 00750 00751 Data current = *data; 00752 00753 int32_t newestVisibleEpoch = current.epoch; 00754 if (current.inCurrent() == 0) --newestVisibleEpoch; 00755 00756 #if 1 00757 // Nothing is in a critical section; we can run it inline 00758 if (current.inCurrent() + current.inOld() == 0) { 00759 fn(std::forward<Args>(args)...); 00760 return; 00761 } 00762 #endif 00763 00764 for (int i = 0; i == 0; ++i) { 00765 // Lock the deferred structure 00766 boost::lock_guard<ML::Spinlock> guard(deferred->lock); 00767 00768 #if 1 00769 // Get back to current again 00770 current = *data; 00771 00772 // Find the oldest live epoch 00773 int oldestLiveEpoch = -1; 00774 if (current.inOld() > 0) 00775 oldestLiveEpoch = current.epoch - 1; 00776 else if (current.inCurrent() > 0) 00777 oldestLiveEpoch = current.epoch; 00778 00779 if (oldestLiveEpoch == -1 || 00780 compareEpochs(oldestLiveEpoch, newestVisibleEpoch) > 0) 00781 { 00782 // Nothing in a critical section so we can run it now and exit 00783 break; 00784 } 00785 00786 // Nothing is in a critical section; we can run it inline 00787 if (current.inCurrent() + current.inOld() == 0) 00788 break; 00789 #endif 00790 00791 // OK, get the deferred list 00792 auto epochIt 00793 = deferred->entries.insert 00794 (make_pair(newestVisibleEpoch, (DeferredList *)0)).first; 00795 if (epochIt->second == 0) { 00796 // Create a new list 00797 epochIt->second = new DeferredList(); 00798 } 00799 00800 DeferredList & list = *epochIt->second; 00801 list.addDeferred(newestVisibleEpoch, fn, std::forward<Args>(args)...); 00802 00803 // TODO: we only need to do this if the newestVisibleEpoch has 00804 // changed since we last calculated it... 00805 //checkDefers(); 00806 00807 return; 00808 } 00809 00810 // If we got here we can run it straight away 00811 fn(std::forward<Args>(args)...); 00812 return; 00813 } 00814 00815 void 00816 GcLockBase:: 00817 defer(void (work) (void *), void * arg) 00818 { 00819 doDefer(work, arg); 00820 } 00821 00822 void 00823 GcLockBase:: 00824 defer(void (work) (void *, void *), void * arg1, void * arg2) 00825 { 00826 doDefer(work, arg1, arg2); 00827 } 00828 00829 void 00830 GcLockBase:: 00831 defer(void (work) (void *, void *, void *), void * arg1, void * arg2, void * arg3) 00832 { 00833 doDefer(work, arg1, arg2, arg3); 00834 } 00835 00836 void 00837 GcLockBase:: 00838 dump() 00839 { 00840 Data current = *data; 00841 cerr << "epoch " << current.epoch << " in " << current.inCurrent() 00842 << " in-1 " << current.inOld() << " vis " << current.visibleEpoch 00843 << " excl " << current.exclusive << endl; 00844 cerr << "deferred: "; 00845 { 00846 boost::lock_guard<ML::Spinlock> guard(deferred->lock); 00847 cerr << deferred->entries.size() << " epochs: "; 00848 00849 for (auto it = deferred->entries.begin(), end = deferred->entries.end(); 00850 it != end; ++it) { 00851 cerr << " " << it->first << " (" << it->second->size() 00852 << " entries)"; 00853 } 00854 } 00855 cerr << endl; 00856 } 00857 00858 00859 /*****************************************************************************/ 00860 /* GC LOCK */ 00861 /*****************************************************************************/ 00862 00863 GcLock:: 00864 GcLock() 00865 { 00866 data = &localData; 00867 } 00868 00869 GcLock:: 00870 ~GcLock() 00871 { 00872 // Nothing to cleanup. 00873 } 00874 00875 void 00876 GcLock:: 00877 unlink() 00878 { 00879 // Nothing to cleanup. 00880 } 00881 00882 00883 /*****************************************************************************/ 00884 /* SHARED GC LOCK */ 00885 /*****************************************************************************/ 00886 00887 // We want to mmap the file so it has to be the size of a page. 00888 00889 namespace { size_t GcLockFileSize = 1ULL << 12; } 00890 00891 00892 GcCreate GC_CREATE; 00893 GcOpen GC_OPEN; 00894 00895 void 00896 SharedGcLock:: 00897 doOpen(bool create) 00898 { 00899 int flags = O_RDWR | O_CREAT; 00900 if (create) flags |= O_EXCL; 00901 00902 ipc::named_mutex mutex(ipc::open_or_create, name.c_str()); 00903 { 00904 // Lock is used to create and truncate the file atomically. 00905 ipc::scoped_lock<ipc::named_mutex> lock(mutex); 00906 00907 // We don't want the locks to be persisted so an shm_open will do fine. 00908 fd = shm_open(name.c_str(), flags, 0644); 00909 ExcCheckErrno(fd >= 0, "shm_open failed"); 00910 00911 struct stat stats; 00912 int res = fstat(fd, &stats); 00913 ExcCheckErrno(!res, "failed to get the file size"); 00914 00915 if (stats.st_size != GcLockFileSize) { 00916 int res = ftruncate(fd, GcLockFileSize); 00917 ExcCheckErrno(!res, "failed to resize the file."); 00918 } 00919 } 00920 00921 // Map the region so that all the processes can see the writes. 00922 addr = mmap(0, GcLockFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 00923 ExcCheckErrno(addr != MAP_FAILED, "failed to map the shm file"); 00924 00925 // Initialize and set the member used by GcLockBase. 00926 if (create) new (addr) Data(); 00927 data = reinterpret_cast<Data*>(addr); 00928 } 00929 00930 SharedGcLock:: 00931 SharedGcLock(GcCreate, const string& name) : 00932 name("gc." + name) 00933 { 00934 doOpen(true); 00935 } 00936 00937 SharedGcLock:: 00938 SharedGcLock(GcOpen, const string& name) : 00939 name("gc." + name) 00940 { 00941 doOpen(false); 00942 } 00943 00944 SharedGcLock:: 00945 ~SharedGcLock() 00946 { 00947 munmap(addr, GcLockFileSize); 00948 close(fd); 00949 } 00950 00951 void 00952 SharedGcLock:: 00953 unlink() 00954 { 00955 shm_unlink(name.c_str()); 00956 (void) ipc::named_mutex::remove(name.c_str()); 00957 } 00958 00959 00960 } // namespace Datacratic