RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/gc/gc_lock.cc
00001 /* gc_lock.cc
00002    Jeremy Barnes, 19 November 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 */
00005 
00006 #include "soa/gc/gc_lock.h"
00007 #include "jml/arch/tick_counter.h"
00008 #include "jml/arch/spinlock.h"
00009 #include "jml/arch/futex.h"
00010 #include "jml/utils/exc_check.h"
00011 #include "jml/utils/guard.h"
00012 
00013 #include <boost/interprocess/sync/named_mutex.hpp>
00014 #include <boost/interprocess/sync/scoped_lock.hpp>
00015 #include <boost/static_assert.hpp>
00016 #include <sys/mman.h>
00017 #include <fcntl.h>
00018 #include <unistd.h>
00019 #include <iostream>
00020 
00021 using namespace std;
00022 using namespace ML;
00023 namespace ipc = boost::interprocess;
00024 
00025 namespace Datacratic {
00026 
00027 
00028 /*****************************************************************************/
00029 /* Utility                                                                   */
00030 /*****************************************************************************/
00031 
00035 int32_t gcLockStartingEpoch = 0;
00036 
00040 template<typename T, size_t Bits = sizeof(T)*8>
00041 static inline
00042 int
00043 compareEpochs (T a, T b)
00044 {
00045     BOOST_STATIC_ASSERT(Bits >= 2);
00046 
00047     if (a == b) return 0;
00048 
00049     enum { MASK = (3ULL << (Bits - 2)) };
00050 
00051     // We use the last 2 bits for overflow detection.
00052     //   We don't use T to avoid problems with the sign bit.
00053     const uint64_t aMasked = a & MASK;
00054     const uint64_t bMasked = b & MASK;
00055 
00056     // Normal case.
00057     if (aMasked == bMasked) return a < b ? -1 : 1;
00058 
00059     // Check for overflow.
00060     else if (aMasked == 0) return bMasked == MASK ? 1 : -1;
00061     else if (bMasked == 0) return aMasked == MASK ? -1 : 1;
00062 
00063     // No overflow so just compare the masks.
00064     return aMasked < bMasked ? -1 : 1;
00065 }
00066 
00067 
00068 /*****************************************************************************/
00069 /* GC LOCK BASE                                                              */
00070 /*****************************************************************************/
00071 
00072 struct DeferredEntry1 {
00073     DeferredEntry1(void (fn) (void *) = 0, void * data = 0)
00074         : fn(fn), data(data)
00075     {
00076     }
00077 
00078     void run()
00079     {
00080         fn(data);
00081     }
00082         
00083     void (*fn) (void *);
00084     void * data;
00085 };
00086 
00087 struct DeferredEntry2 {
00088     DeferredEntry2(void (fn) (void *, void *) = 0, void * data1 = 0,
00089                    void * data2 = 0)
00090         : fn(fn), data1(data1), data2(data2)
00091     {
00092     }
00093         
00094     void run()
00095     {
00096         fn(data1, data2);
00097     }
00098 
00099 
00100     void (*fn) (void *, void *);
00101     void * data1;
00102     void * data2;
00103 };
00104 
00105 struct DeferredEntry3 {
00106     DeferredEntry3(void (fn) (void *, void *, void *) = 0, void * data1 = 0,
00107                    void * data2 = 0, void * data3 = 0)
00108         : fn(fn), data1(data1), data2(data2), data3(data3)
00109     {
00110     }
00111         
00112     void run()
00113     {
00114         fn(data1, data2, data3);
00115     }
00116 
00117 
00118     void (*fn) (void *, void *, void *);
00119     void * data1;
00120     void * data2;
00121     void * data3;
00122 };
00123 
00124 
00126 struct GcLockBase::DeferredList {
00127     DeferredList()
00128     {
00129     }
00130 
00131     ~DeferredList()
00132     {
00133         //if (lock.locked())
00134         //    throw ML::Exception("deleting deferred in locked condition");
00135         if (size() != 0) {
00136             cerr << "deleting non-empty deferred with " << size()
00137                  << " entries" << endl;
00138             //throw ML::Exception("deleting non-empty deferred");
00139         }
00140     }
00141 
00142     void swap(DeferredList & other)
00143     {
00144         //ExcAssertEqual(lock.locked(), 0);
00145         //ExcAssertEqual(other.lock.locked(), 0);
00146 
00147         //boost::lock_guard<ML::Spinlock> guard(lock);
00148         //boost::lock_guard<ML::Spinlock> guard2(other.lock);
00149 
00150         deferred1.swap(other.deferred1);
00151         deferred2.swap(other.deferred2);
00152         deferred3.swap(other.deferred3);
00153     }
00154 
00155     std::vector<DeferredEntry1> deferred1;
00156     std::vector<DeferredEntry2> deferred2;
00157     std::vector<DeferredEntry3> deferred3;
00158     //mutable ML::Spinlock lock;
00159 
00160     bool addDeferred(int forEpoch, void (fn) (void *), void * data)
00161     {
00162         //boost::lock_guard<ML::Spinlock> guard(lock);
00163         deferred1.push_back(DeferredEntry1(fn, data));
00164         return true;
00165     }
00166 
00167     bool addDeferred(int forEpoch, void (fn) (void *, void *),
00168                      void * data1, void * data2)
00169     {
00170         //boost::lock_guard<ML::Spinlock> guard(lock);
00171         deferred2.push_back(DeferredEntry2(fn, data1, data2));
00172         return true;
00173     }
00174 
00175     bool addDeferred(int forEpoch, void (fn) (void *, void *, void *),
00176                      void * data1, void * data2, void * data3)
00177     {
00178         //boost::lock_guard<ML::Spinlock> guard(lock);
00179         deferred3.push_back(DeferredEntry3(fn, data1, data2, data3));
00180         return true;
00181     }
00182         
00183     size_t size() const
00184     {
00185         //boost::lock_guard<ML::Spinlock> guard(lock);
00186         return deferred1.size() + deferred2.size() + deferred3.size();
00187     }
00188 
00189     void runAll()
00190     {
00191         // Spinlock should be unnecessary...
00192         //boost::lock_guard<ML::Spinlock> guard(lock);
00193 
00194         for (unsigned i = 0;  i < deferred1.size();  ++i) {
00195             try {
00196                 deferred1[i].run();
00197             } catch (...) {
00198             }
00199         }
00200 
00201         deferred1.clear();
00202 
00203         for (unsigned i = 0;  i < deferred2.size();  ++i) {
00204             try {
00205                 deferred2[i].run();
00206             } catch (...) {
00207             }
00208         }
00209 
00210         deferred2.clear();
00211 
00212         for (unsigned i = 0;  i < deferred3.size();  ++i) {
00213             try {
00214                 deferred3[i].run();
00215             } catch (...) {
00216             }
00217         }
00218 
00219         deferred3.clear();
00220     }
00221 };
00222 
00223 struct GcLockBase::Deferred {
00224     mutable ML::Spinlock lock;
00225     std::map<int32_t, DeferredList *> entries;
00226     std::vector<DeferredList *> spares;
00227 
00228     bool empty() const
00229     {
00230         boost::lock_guard<ML::Spinlock> guard(lock);
00231         return entries.empty();
00232     }
00233 };
00234 
00235 std::string
00236 GcLockBase::ThreadGcInfoEntry::
00237 print() const
00238 {
00239     return ML::format("inEpoch: %d, readLocked: %d, writeLocked: %d",
00240                       inEpoch, readLocked, writeLocked);
00241 }
00242 
00243 inline GcLockBase::Data::
00244 Data() :
00245     bits(0), bits2(0)
00246 {
00247     epoch = gcLockStartingEpoch; // makes it easier to test overflows.
00248     visibleEpoch = epoch;
00249 }
00250 
00251 inline GcLockBase::Data::
00252 Data(const Data & other)
00253 {
00254     //ML::ticks();
00255     q = other.q;
00256     //ML::ticks();
00257 }
00258 
00259 inline GcLockBase::Data &
00260 GcLockBase::Data::
00261 operator = (const Data & other)
00262 {
00263     //ML::ticks();
00264     this->q = other.q;
00265     //ML::ticks();
00266     return *this;
00267 }
00268 
00269 void
00270 GcLockBase::Data::
00271 validate() const
00272 {
00273     try {
00274         // Visible is at most 2 behind current
00275         ExcAssertGreaterEqual(compareEpochs(visibleEpoch, epoch - 2), 0);
00276 
00277         // If nothing is in a critical section then only the current is
00278         // visible
00279         if (inOld() == 0 && inCurrent() == 0)
00280             ExcAssertEqual(visibleEpoch, epoch);
00281 
00282         // If nothing is in the old critical section then it's not visible
00283         else if (inOld() == 0)
00284             ExcAssertEqual(visibleEpoch, epoch - 1);
00285 
00286         else ExcAssertEqual(visibleEpoch, epoch - 2);
00287     } catch (const std::exception & exc) {
00288         cerr << "exception validating GcLock: " << exc.what() << endl;
00289         cerr << "current: " << print() << endl;
00290         throw;
00291     }
00292 }
00293 
00294 inline bool
00295 GcLockBase::Data::
00296 calcVisibleEpoch()
00297 {
00298     Data old = *this;
00299 
00300     int oldValue = visibleEpoch;
00301 
00302     // Set the visible epoch
00303     if (inCurrent() == 0 && inOld() == 0)
00304         visibleEpoch = epoch;
00305     else if (inOld() == 0)
00306         visibleEpoch = epoch - 1;
00307     else visibleEpoch = epoch - 2;
00308 
00309     if (compareEpochs(visibleEpoch, oldValue) < 0) {
00310         cerr << "old = " << old.print() << endl;
00311         cerr << "new = " << print() << endl;
00312     }
00313 
00314     // Visible epoch must be monotonic increasing
00315     ExcAssertGreaterEqual(compareEpochs(visibleEpoch, oldValue), 0);
00316 
00317     return oldValue != visibleEpoch;
00318 }
00319         
00320 std::string
00321 GcLockBase::Data::
00322 print() const
00323 {
00324     return ML::format("epoch: %d, in: %d, in-1: %d, visible: %d, exclusive: %d",
00325                       epoch, inCurrent(), inOld(), visibleEpoch, exclusive);
00326 }
00327 
00328 GcLockBase::
00329 GcLockBase()
00330 {
00331     deferred = new Deferred();
00332 }
00333 
00334 GcLockBase::
00335 ~GcLockBase()
00336 {
00337     if (!deferred->empty()) {
00338         dump();
00339     }
00340 
00341     delete deferred;
00342 }
00343 
00344 bool
00345 GcLockBase::
00346 updateData(Data & oldValue, Data & newValue)
00347 {
00348     bool wake;
00349     try {
00350         ExcAssertGreaterEqual(compareEpochs(newValue.epoch, oldValue.epoch), 0);
00351         wake = newValue.calcVisibleEpoch();
00352     } catch (...) {
00353         cerr << "update: oldValue = " << oldValue.print() << endl;
00354         cerr << "newValue = " << newValue.print() << endl;
00355         throw;
00356     }
00357 
00358     newValue.validate();
00359 
00360 #if 0
00361     // Do an extra check before we assert lock
00362     Data upToDate = *data;
00363     if (upToDate != oldValue) {
00364         oldValue = upToDate;
00365         return false;
00366     }
00367 #endif
00368 
00369     if (!ML::cmp_xchg(data->q, oldValue.q, newValue.q)) return false;
00370 
00371     if (wake) {
00372         // We updated the current visible epoch.  We can now wake up
00373         // anything that was waiting for it to be visible and run any
00374         // deferred handlers.
00375         futex_wake(data->visibleEpoch);
00376         runDefers();
00377     }
00378 
00379     return true;
00380 }
00381 
00382 void
00383 GcLockBase::
00384 runDefers()
00385 {
00386     std::vector<DeferredList *> toRun;
00387     {
00388         boost::lock_guard<ML::Spinlock> guard(deferred->lock);
00389         toRun = checkDefers();
00390     }
00391 
00392     for (unsigned i = 0;  i < toRun.size();  ++i) {
00393         toRun[i]->runAll();
00394         delete toRun[i];
00395     }
00396 }
00397 
00398 std::vector<GcLockBase::DeferredList *>
00399 GcLockBase::
00400 checkDefers()
00401 {
00402     std::vector<DeferredList *> result;
00403 
00404     while (!deferred->entries.empty() &&
00405             compareEpochs(
00406                     deferred->entries.begin()->first,
00407                     data->visibleEpoch) <= 0)
00408     {
00409         result.reserve(deferred->entries.size());
00410 
00411         for (auto it = deferred->entries.begin(),
00412                  end = deferred->entries.end();
00413              it != end;  /* no inc */) {
00414 
00415             if (compareEpochs(it->first, data->visibleEpoch) > 0)
00416                 break;  // still visible
00417 
00418             ExcAssert(it->second);
00419             result.push_back(it->second);
00420             //it->second->runAll();
00421             auto toDelete = it;
00422             it = boost::next(it);
00423             deferred->entries.erase(toDelete);
00424         }
00425     }
00426 
00427     return result;
00428 }
00429 
00430 void
00431 GcLockBase::
00432 enterCS(ThreadGcInfoEntry * entry)
00433 {
00434     if (!entry) entry = &getEntry();
00435         
00436     ExcAssertEqual(entry->inEpoch, -1);
00437 
00438 #if 0 // later...
00439     // Be optimistic...
00440     int optimisticEpoch = data->epoch;
00441     if (__sync_add_and_fetch(data->in + (optimisticEpoch & 1), 1) > 1
00442         && data->epoch == optimisticEpoch) {
00443         entry->inEpoch = optimisticEpoch & 1;
00444         return;
00445     }
00446 
00447     // undo optimism
00448     __sync_add_and_fetch(data->in + (optimisticEpoch & 1), -1);
00449 #endif // optimistic
00450 
00451     Data current = *data;
00452 
00453     for (;;) {
00454         Data newValue = current;
00455 
00456         if (newValue.exclusive) {
00457             futex_wait(data->exclusive, 1);
00458             current = *data;
00459             continue;
00460         }
00461 
00462         if (newValue.inOld() == 0) {
00463             // We're entering a new epoch
00464             newValue.epoch += 1;
00465             newValue.setIn(newValue.epoch, 1);
00466         }
00467         else {
00468             // No new epoch as the old one isn't finished yet
00469             newValue.addIn(newValue.epoch, 1);
00470         }
00471 
00472         entry->inEpoch = newValue.epoch & 1;
00473             
00474         if (updateData(current, newValue)) break;
00475     }
00476 }
00477 
00478 void
00479 GcLockBase::
00480 exitCS(ThreadGcInfoEntry * entry)
00481 {
00482     if (!entry) entry = &getEntry();
00483 
00484     if (entry->inEpoch == -1)
00485         throw ML::Exception("not in a CS");
00486 
00487     // Fast path
00488     if (__sync_fetch_and_add(data->in + (entry->inEpoch & 1), -1) > 1) {
00489         entry->inEpoch = -1;
00490         return;
00491     }
00492         
00493     // Slow path; an epoch may have come to an end
00494     
00495     Data current = *data;
00496 
00497     for (;;) {
00498         Data newValue = current;
00499 
00500         //newValue.addIn(entry->inEpoch, -1);
00501 
00502         if (updateData(current, newValue)) break;
00503     }
00504 
00505     entry->inEpoch = -1;
00506 }
00507 
00508 void
00509 GcLockBase::
00510 enterCSExclusive(ThreadGcInfoEntry * entry)
00511 {
00512     if (!entry) entry = &getEntry();
00513         
00514     ExcAssertEqual(entry->inEpoch, -1);
00515 
00516     Data current = *data, newValue;
00517 
00518     for (;;) {
00519         if (current.exclusive) {
00520             futex_wait(data->exclusive, 1);
00521             current = *data;
00522             continue;
00523         }
00524 
00525         ExcAssertEqual(current.exclusive, 0);
00526 
00527         // TODO: single cmp/xchg on just exclusive rather than the whole lot?
00528         //int old = 0;
00529         //if (!ML::cmp_xchg(data->exclusive, old, 1))
00530         //    continue;
00531 
00532         newValue = current;
00533         newValue.exclusive = 1;
00534         if (updateData(current, newValue)) {
00535             current = newValue;
00536             break;
00537         }
00538     }
00539 
00540     ExcAssertEqual(data->exclusive, 1);
00541 
00542     // At this point, we have exclusive access... now wait for everything else
00543     // to exit.  This is kind of a critical section barrier.
00544     int startEpoch = current.epoch;
00545     
00546 #if 1
00547     visibleBarrier();
00548 #else
00549 
00550     for (unsigned i = 0;  ;  ++i, current = *data) {
00551 
00552         if (current.visibleEpoch == current.epoch
00553             && current.inCurrent() == 0 && current.inOld() == 0)
00554             break;
00555         
00556         long res = futex_wait(data->visibleEpoch, current.visibleEpoch);
00557         if (res == -1) {
00558             if (errno == EAGAIN) continue;
00559             throw ML::Exception(errno, "futex_wait");
00560         }
00561     }
00562 #endif
00563     
00564     ExcAssertEqual(data->epoch, startEpoch);
00565 
00566 
00567 #if 0
00568     // Testing
00569     for (unsigned i = 0;  i < 100;  ++i) {
00570         Data current = *data;
00571 
00572         try {
00573             ExcAssertEqual(current.exclusive, 1);
00574             ExcAssertEqual(current.inCurrent(), 0);
00575             ExcAssertEqual(current.inOld(), 0);
00576         } catch (...) {
00577             ThreadGcInfoEntry & entry = getEntry();
00578             cerr << "entry->inEpoch = " << entry->inEpoch << endl;
00579             cerr << "entry->readLocked = " << entry->readLocked << endl;
00580             cerr << "entry->writeLocked = " << entry->writeLocked << endl;
00581             cerr << "current: " << current.print() << endl;
00582             cerr << "data: " << data->print() << endl;
00583             throw;
00584         }
00585     }
00586 #endif
00587 
00588     ExcAssertEqual(data->epoch, startEpoch);
00589 
00590     entry->inEpoch = startEpoch & 1;
00591 }
00592 
00593 void
00594 GcLockBase::
00595 exitCSExclusive(ThreadGcInfoEntry * entry)
00596 {
00597     if (!entry) entry = &getEntry();
00598 #if 0
00599     Data current = *data;
00600 
00601     try {
00602         ExcAssertEqual(current.exclusive, 1);
00603         ExcAssertEqual(current.inCurrent(), 0);
00604         ExcAssertEqual(current.inOld(), 0);
00605     } catch (...) {
00606         cerr << "entry->inEpoch = " << entry->inEpoch << endl;
00607         cerr << "entry->readLocked = " << entry->readLocked << endl;
00608         cerr << "entry->writeLocked = " << entry->writeLocked << endl;
00609         cerr << "current: " << current.print() << endl;
00610         cerr << "data: " << data->print() << endl;
00611         throw;
00612     }
00613 #endif
00614 
00615     ML::memory_barrier();
00616 
00617     int old = 1;
00618     if (!ML::cmp_xchg(data->exclusive, old, 0))
00619         throw ML::Exception("error exiting exclusive mode");
00620 
00621     // Wake everything waiting on the exclusive lock
00622     futex_wake(data->exclusive);
00623 
00624     entry->inEpoch = -1;
00625 }
00626 
00627 void
00628 GcLockBase::
00629 visibleBarrier()
00630 {
00631     ML::memory_barrier();
00632     
00633     ThreadGcInfoEntry & entry = getEntry();
00634 
00635     if (entry.inEpoch != -1)
00636         throw ML::Exception("visibleBarrier called in critical section will "
00637                             "deadlock");
00638 
00639     Data current = *data;
00640     int startEpoch = data->epoch;
00641     //int startVisible = data.visibleEpoch;
00642     
00643     // Spin until we're visible
00644     for (unsigned i = 0;  ;  ++i, current = *data) {
00645         
00646         //int i = startEpoch & 1;
00647 
00648         // Have we moved on?  If we're 2 epochs ahead we're surely not visible
00649         if (current.epoch != startEpoch && current.epoch != startEpoch + 1) {
00650             //cerr << "epoch moved on" << endl;
00651             return;
00652         }
00653 
00654         // If there's nothing in a critical section then we're OK
00655         if (current.inCurrent() == 0 && current.inOld() == 0)
00656             return;
00657 
00658         if (current.visibleEpoch == startEpoch)
00659             return;
00660 
00661         if (i % 128 == 127 || true) {
00662             long res = futex_wait(data->visibleEpoch, current.visibleEpoch);
00663             if (res == -1) {
00664                 if (errno == EAGAIN) continue;
00665                 throw ML::Exception(errno, "futex_wait");
00666             }
00667         }
00668     }
00669 }
00670 
00671 void
00672 GcLockBase::
00673 deferBarrier()
00674 {
00675     // TODO: what is the interaction between a defer barrier and an exclusive
00676     // lock?
00677 
00678     ThreadGcInfoEntry & entry = getEntry();
00679 
00680     visibleBarrier();
00681 
00682     // Do it twice to make sure that everything is cycled over two different
00683     // epochs
00684     for (unsigned i = 0;  i < 2;  ++i) {
00685         
00686         // If we're in a critical section, we'll wait forever...
00687         ExcAssertEqual(entry.inEpoch, -1);
00688         
00689         // What does "defer barrier" mean?  It means that we wait until everything
00690         // that is currently enqueued to be deferred is finished.
00691         
00692         // TODO: this is a very inefficient implementation... we could do a lot
00693         // better especially in the non-contended case
00694         
00695         int lock = 0;
00696         
00697         defer(futex_unlock, &lock);
00698         
00699         ML::atomic_add(lock, -1);
00700         
00701         futex_wait(lock, -1);
00702     }
00703 
00704     // If certain threads aren't allowed to execute deferred work
00705     // then it's possible that not all deferred work will have been executed.
00706     // To be sure, we run any leftover work.
00707     runDefers();
00708 }
00709 
00711 static void callFn(void * arg)
00712 {
00713     boost::function<void ()> * fn
00714         = reinterpret_cast<boost::function<void ()> *>(arg);
00715     try {
00716         (*fn)();
00717     } catch (...) {
00718         delete fn;
00719         throw;
00720     }
00721     delete fn;
00722 }
00723 
00724 void
00725 GcLockBase::
00726 defer(boost::function<void ()> work)
00727 {
00728     defer(callFn, new boost::function<void ()>(work));
00729 }
00730 
00731 template<typename... Args>
00732 void
00733 GcLockBase::
00734 doDefer(void (fn) (Args...), Args... args)
00735 {
00736     // INVARIANT
00737     // If there is another thread that is provably in a critical section at
00738     // this moment, then the function will only be run when all such threads
00739     // have exited the critical section.
00740     //
00741     // If there are no threads in either the current or the old epoch, then
00742     // we can run it straight away.
00743     //
00744     // If there are threads in the old epoch but not the current epoch, then
00745     // we need to wait until all threads have exited the old epoch.  In other
00746     // words, it goes on the old epoch's defer queue.
00747     //
00748     // If there are threads in the current epoch (irrespective of the old
00749     // epoch) then we need to wait until the current epoch is done.
00750 
00751     Data current = *data;
00752 
00753     int32_t newestVisibleEpoch = current.epoch;
00754     if (current.inCurrent() == 0) --newestVisibleEpoch;
00755 
00756 #if 1
00757     // Nothing is in a critical section; we can run it inline
00758     if (current.inCurrent() + current.inOld() == 0) {
00759         fn(std::forward<Args>(args)...);
00760         return;
00761     }
00762 #endif
00763 
00764     for (int i = 0; i == 0; ++i) {
00765         // Lock the deferred structure
00766         boost::lock_guard<ML::Spinlock> guard(deferred->lock);
00767 
00768 #if 1
00769         // Get back to current again
00770         current = *data;
00771 
00772         // Find the oldest live epoch
00773         int oldestLiveEpoch = -1;
00774         if (current.inOld() > 0)
00775             oldestLiveEpoch = current.epoch - 1;
00776         else if (current.inCurrent() > 0)
00777             oldestLiveEpoch = current.epoch;
00778     
00779         if (oldestLiveEpoch == -1 || 
00780                 compareEpochs(oldestLiveEpoch, newestVisibleEpoch) > 0)
00781         {
00782             // Nothing in a critical section so we can run it now and exit
00783             break;
00784         }
00785     
00786         // Nothing is in a critical section; we can run it inline
00787         if (current.inCurrent() + current.inOld() == 0)
00788             break;
00789 #endif
00790 
00791         // OK, get the deferred list
00792         auto epochIt
00793             = deferred->entries.insert
00794             (make_pair(newestVisibleEpoch, (DeferredList *)0)).first;
00795         if (epochIt->second == 0) {
00796             // Create a new list
00797             epochIt->second = new DeferredList();
00798         }
00799         
00800         DeferredList & list = *epochIt->second;
00801         list.addDeferred(newestVisibleEpoch, fn, std::forward<Args>(args)...);
00802 
00803         // TODO: we only need to do this if the newestVisibleEpoch has
00804         // changed since we last calculated it...
00805         //checkDefers();
00806 
00807         return;
00808     }
00809     
00810     // If we got here we can run it straight away
00811     fn(std::forward<Args>(args)...);
00812     return;
00813 }
00814 
00815 void
00816 GcLockBase::
00817 defer(void (work) (void *), void * arg)
00818 {
00819     doDefer(work, arg);
00820 }
00821 
00822 void
00823 GcLockBase::
00824 defer(void (work) (void *, void *), void * arg1, void * arg2)
00825 {
00826     doDefer(work, arg1, arg2);
00827 }
00828 
00829 void
00830 GcLockBase::
00831 defer(void (work) (void *, void *, void *), void * arg1, void * arg2, void * arg3)
00832 {
00833     doDefer(work, arg1, arg2, arg3);
00834 }
00835 
00836 void
00837 GcLockBase::
00838 dump()
00839 {
00840     Data current = *data;
00841     cerr << "epoch " << current.epoch << " in " << current.inCurrent()
00842          << " in-1 " << current.inOld() << " vis " << current.visibleEpoch
00843          << " excl " << current.exclusive << endl;
00844     cerr << "deferred: ";
00845     {
00846         boost::lock_guard<ML::Spinlock> guard(deferred->lock);
00847         cerr << deferred->entries.size() << " epochs: ";
00848         
00849         for (auto it = deferred->entries.begin(), end = deferred->entries.end();
00850              it != end;  ++it) {
00851             cerr << " " << it->first << " (" << it->second->size()
00852                  << " entries)";
00853         }
00854     }
00855     cerr << endl;
00856 }
00857 
00858 
00859 /*****************************************************************************/
00860 /* GC LOCK                                                                   */
00861 /*****************************************************************************/
00862 
00863 GcLock::
00864 GcLock()
00865 {
00866     data = &localData;
00867 }
00868 
00869 GcLock::
00870 ~GcLock()
00871 {
00872     // Nothing to cleanup.
00873 }
00874 
00875 void
00876 GcLock::
00877 unlink()
00878 {
00879     // Nothing to cleanup.
00880 }
00881 
00882 
00883 /*****************************************************************************/
00884 /* SHARED GC LOCK                                                            */
00885 /*****************************************************************************/
00886 
00887 // We want to mmap the file so it has to be the size of a page.
00888 
00889 namespace { size_t GcLockFileSize = 1ULL << 12; }
00890 
00891 
00892 GcCreate GC_CREATE; 
00893 GcOpen GC_OPEN;     
00894 
00895 void
00896 SharedGcLock::
00897 doOpen(bool create)
00898 {
00899     int flags = O_RDWR | O_CREAT;
00900     if (create) flags |= O_EXCL;
00901 
00902     ipc::named_mutex mutex(ipc::open_or_create, name.c_str());
00903     {
00904         // Lock is used to create and truncate the file atomically.
00905         ipc::scoped_lock<ipc::named_mutex> lock(mutex);
00906 
00907         // We don't want the locks to be persisted so an shm_open will do fine.
00908         fd = shm_open(name.c_str(), flags, 0644);
00909         ExcCheckErrno(fd >= 0, "shm_open failed");
00910 
00911         struct stat stats;
00912         int res = fstat(fd, &stats);
00913         ExcCheckErrno(!res, "failed to get the file size");
00914 
00915         if (stats.st_size != GcLockFileSize) {
00916             int res = ftruncate(fd, GcLockFileSize);
00917             ExcCheckErrno(!res, "failed to resize the file.");
00918         }
00919     }
00920 
00921     // Map the region so that all the processes can see the writes.
00922     addr = mmap(0, GcLockFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
00923     ExcCheckErrno(addr != MAP_FAILED, "failed to map the shm file");
00924 
00925     // Initialize and set the member used by GcLockBase.
00926     if (create) new (addr) Data();
00927     data = reinterpret_cast<Data*>(addr);
00928 }
00929 
00930 SharedGcLock::
00931 SharedGcLock(GcCreate, const string& name) :
00932     name("gc." + name)
00933 {
00934     doOpen(true);
00935 }
00936 
00937 SharedGcLock::
00938 SharedGcLock(GcOpen, const string& name) :
00939     name("gc." + name)
00940 {
00941     doOpen(false);
00942 }
00943 
00944 SharedGcLock::
00945 ~SharedGcLock()
00946 {
00947     munmap(addr, GcLockFileSize);
00948     close(fd);
00949 }
00950 
00951 void
00952 SharedGcLock::
00953 unlink()
00954 {
00955     shm_unlink(name.c_str());
00956     (void) ipc::named_mutex::remove(name.c_str());
00957 }
00958 
00959 
00960 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator