RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/message_loop.cc
00001 
00007 #include "soa/service//message_loop.h"
00008 #include "jml/utils/smart_ptr_utils.h"
00009 #include <boost/make_shared.hpp>
00010 #include <iostream>
00011 #include "jml/arch/exception.h"
00012 #include "jml/arch/timers.h"
00013 #include "jml/arch/demangle.h"
00014 #include <time.h>
00015 #include <limits.h>
00016 #include "jml/arch/futex.h"
00017 #include "soa/types/date.h"
00018 #include <sys/epoll.h>
00019 #include "jml/arch/backtrace.h"
00020 #include <thread>
00021 
00022 
00023 using namespace std;
00024 using namespace ML;
00025 
00026 
00027 namespace Datacratic {
00028 
00029 
00030 /*****************************************************************************/
00031 /* ASYNC EVENT SOURCE                                                        */
00032 /*****************************************************************************/
00033 
00034 
00035 
00036 /*****************************************************************************/
00037 /* MESSAGE LOOP                                                              */
00038 /*****************************************************************************/
00039 
00040 MessageLoop::
00041 MessageLoop(int numThreads, double maxAddedLatency)
00042     : numThreadsCreated(0), totalSleepTime(0.0)
00043 {
00044     init(numThreads, maxAddedLatency);
00045 }
00046 
00047 MessageLoop::
00048 ~MessageLoop()
00049 {
00050     shutdown();
00051 }
00052 
00053 void
00054 MessageLoop::
00055 init(int numThreads, double maxAddedLatency)
00056 {
00057     if (maxAddedLatency == 0)
00058         cerr << "warning: MessageLoop with maxAddedLatency of zero will busy wait" << endl;
00059 
00060     Epoller::init(16384);
00061     this->shutdown_ = false;
00062     this->maxAddedLatency_ = maxAddedLatency;
00063     this->handleEvent = std::bind(&MessageLoop::handleEpollEvent,
00064                                    this,
00065                                    std::placeholders::_1);
00066     debug_ = false;
00067 }
00068 
00069 void
00070 MessageLoop::
00071 start(std::function<void ()> onStop)
00072 {
00073     if (numThreadsCreated)
00074         throw ML::Exception("already have started message loop");
00075 
00076     //cerr << "starting thread from " << this << endl;
00077     //ML::backtrace();
00078 
00079     auto runfn = [=] ()
00080         {
00081             this->runWorkerThread();
00082             if (onStop) onStop();
00083         };
00084 
00085     threads.create_thread(runfn);
00086 
00087     ++numThreadsCreated;
00088 }
00089     
00090 void
00091 MessageLoop::
00092 startSync()
00093 {
00094     if (numThreadsCreated)
00095         throw ML::Exception("already have started message loop");
00096 
00097     ++numThreadsCreated;
00098 
00099     runWorkerThread();
00100 }
00101     
00102 void
00103 MessageLoop::
00104 shutdown()
00105 {
00106     if (shutdown_)
00107         return;
00108 
00109     shutdown_ = true;
00110     futex_wake((int &)shutdown_);
00111 
00112     threads.join_all();
00113 
00114     numThreadsCreated = 0;
00115 }
00116 
00117 void
00118 MessageLoop::
00119 addSource(const std::string & name,
00120           AsyncEventSource & source, int priority)
00121 {
00122     addSource(name, make_unowned_std_sp(source), priority);
00123 }
00124 
00125 void
00126 MessageLoop::
00127 addSource(const std::string & name,
00128           std::shared_ptr<AsyncEventSource> source, int priority)
00129 {
00130     Guard guard(lock);
00131     addSourceImpl(name, source, priority);
00132 }
00133 
00134 void
00135 MessageLoop::
00136 addPeriodic(const std::string & name,
00137             double timePeriodSeconds,
00138             std::function<void (uint64_t)> toRun,
00139             int priority)
00140 {
00141     addSource(name,
00142               std::make_shared<PeriodicEventSource>(timePeriodSeconds,
00143                                                       toRun),
00144               priority);
00145 }
00146 
00147 void
00148 MessageLoop::
00149 addSourceDeferred(const std::string & name,
00150                   AsyncEventSource & source, int priority)
00151 {
00152     //cerr << "addSourceDeferred" << endl;
00153     // TODO: assert that lock is held by this thread
00154     deferredSources.push_back(make_tuple(name, make_unowned_std_sp(source), priority));
00155 }
00156 
00157 void
00158 MessageLoop::
00159 addSourceDeferred(const std::string & name,
00160                   std::shared_ptr<AsyncEventSource> source, int priority)
00161 {
00162     //cerr << "addSourceDeferred" << endl;
00163     // TODO: assert that lock is held by this thread
00164     deferredSources.push_back(make_tuple(name, source, priority));
00165 }
00166 
00167 void
00168 MessageLoop::
00169 addPeriodicDeferred(const std::string & name,
00170                     double timePeriodSeconds,
00171                     std::function<void (uint64_t)> toRun,
00172                     int priority)
00173 {
00174     addSourceDeferred(name,
00175                       std::make_shared<PeriodicEventSource>(timePeriodSeconds,
00176                                                               toRun),
00177                       priority);
00178 }
00179 
00180 void
00181 MessageLoop::
00182 removeSource(AsyncEventSource * source)
00183 {
00184     // When we free the elements in our destructor, they will try to remove themselves.
00185     // we just make it a nop.
00186     if (shutdown_)
00187         return;
00188 
00189     Guard guard(lock);
00190     for (unsigned i = 0;  i < sources.size();  ++i) {
00191         if (sources[i].second.get() != source)
00192             continue;
00193         int fd = source->selectFd();
00194         if (fd != -1) {
00195             removeFd(fd);
00196 
00197             // Make sure that our and our parent's value of needsPoll is up to date
00198             bool sourceNeedsPoll = source->needsPoll;
00199             if (needsPoll && sourceNeedsPoll) {
00200                 bool oldNeedsPoll = needsPoll;
00201                 checkNeedsPoll();
00202                 if (oldNeedsPoll != needsPoll && parent_)
00203                     parent_->checkNeedsPoll();
00204             }
00205         }
00206         sources.erase(sources.begin() + i);
00207         return;
00208     }
00209 
00210     throw ML::Exception("couldn't remove message loop source");
00211 }
00212 
00213 void
00214 MessageLoop::
00215 wakeupMainThread()
00216 {
00217     // TODO: do
00218 }
00219 
00220 void
00221 MessageLoop::
00222 startSubordinateThread(const SubordinateThreadFn & thread)
00223 {
00224     Guard guard(lock);
00225     int64_t id = 0;
00226     threads.create_thread(std::bind(thread, std::ref(shutdown_),
00227                                     id));
00228 }
00229 
00230 void
00231 MessageLoop::
00232 runWorkerThread()
00233 {
00234     Date lastCheck = Date::now();
00235 
00236     ML::Duty_Cycle_Timer duty;
00237 
00238     while (!shutdown_) {
00239 
00240 
00241         Date start = Date::now();
00242 
00243         bool more = true;
00244 
00245         if (debug_) {
00246             cerr << "handling events from " << sources.size()
00247                  << " sources with needsPoll " << needsPoll << endl;
00248             for (unsigned i = 0;  i < sources.size();  ++i)
00249                 cerr << sources[i].first << " " << sources[i].second->needsPoll << endl;
00250         }
00251 
00252         while (more) {
00253             more = processOne();
00254         }
00255         
00256         Date end = Date::now();
00257 
00258         double elapsed = end.secondsSince(start);
00259         double sleepTime = maxAddedLatency_ - elapsed;
00260 
00261         duty.notifyBeforeSleep();
00262         if (sleepTime > 0) {
00263             totalSleepTime += sleepTime;
00264             ML::sleep(sleepTime);
00265         }
00266         duty.notifyAfterSleep();
00267 
00268         if (lastCheck.secondsUntil(end) > 10.0) {
00269             auto stats = duty.stats();
00270             //cerr << "message loop: wakeups " << stats.numWakeups
00271             //     << " duty " << stats.duty_cycle() << endl;
00272             lastCheck = end;
00273             duty.clear();
00274         }
00275     }
00276 }
00277 
00278 bool
00279 MessageLoop::
00280 handleEpollEvent(epoll_event & event)
00281 {
00282     bool debug = false;
00283 
00284     if (debug) {
00285         cerr << "handleEvent" << endl;
00286         int mask = event.events;
00287                 
00288         cerr << "events " 
00289              << (mask & EPOLLIN ? "I" : "")
00290              << (mask & EPOLLOUT ? "O" : "")
00291              << (mask & EPOLLPRI ? "P" : "")
00292              << (mask & EPOLLERR ? "E" : "")
00293              << (mask & EPOLLHUP ? "H" : "")
00294              << (mask & EPOLLRDHUP ? "R" : "")
00295              << endl;
00296     }            
00297     
00298     AsyncEventSource * source
00299         = reinterpret_cast<AsyncEventSource *>(event.data.ptr);
00300     
00301     //cerr << "source = " << source << " of type "
00302     //     << ML::type_name(*source) << endl;
00303 
00304     if (source == 0) return true;  // wakeup for shutdown
00305 
00306     source->processOne();
00307 
00308     return false;
00309 }
00310 
00311 bool
00312 MessageLoop::
00313 poll() const
00314 {
00315     if (needsPoll) {
00316         Guard guard(lock);
00317         for (auto & s: sources)
00318             if (s.second->poll())
00319                 return true;
00320         return false;
00321     }
00322     else return Epoller::poll();
00323 }
00324 
00325 bool
00326 MessageLoop::
00327 processOne()
00328 {
00329     bool more = false;
00330 
00331     if (needsPoll) {
00332         Guard guard(lock);
00333             
00334         for (unsigned i = 0;  i < sources.size();  ++i) {
00335             try {
00336                 bool hasMore = sources[i].second->processOne();
00337                 if (debug_) {
00338                     cerr << "source " << sources[i].first << " has " << hasMore << endl;
00339                 }
00340                 //if (hasMore)
00341                 //    cerr << "source " << sources[i].first << " has more" << endl;
00342                 more = more || hasMore;
00343             } catch (...) {
00344                 cerr << "exception processing source " << sources[i].first
00345                      << endl;
00346                 throw;
00347             }
00348         }
00349     }
00350     else more = Epoller::processOne();
00351 
00352     // Add in any deferred sources that have been queued by the event handlers
00353     if (!deferredSources.empty()) {
00354         // Taking this lock after calling empty() instead of before is dodgy,
00355         // but looking at the g++ STL it should never crash and it saves us
00356         // from a spurious lock on every iteration through.
00357         // TODO: a better scheme, maybe with a boolean for hasDeferredSources
00358         Guard guard(lock);
00359 
00360         //cerr << "adding " << deferredSources.size() << " deferred sources"
00361         //     << endl;
00362 
00363         for (auto s: deferredSources)
00364             addSourceImpl(std::get<0>(s), std::get<1>(s), std::get<2>(s));
00365         
00366         deferredSources.clear();
00367     }
00368 
00369     return more;
00370 }
00371 
00372 void
00373 MessageLoop::
00374 addSourceImpl(const std::string & name,
00375               std::shared_ptr<AsyncEventSource> source, int priority)
00376 {
00377     if (debug_) {
00378         cerr << "adding source " << name << "; we now have " << sources.size()
00379              << " with needsPoll " << needsPoll;
00380         if (parent_)
00381             cerr << " parent needsPoll = " << parent_->needsPoll << endl;
00382         cerr << endl;
00383     }
00384 
00385     if (source->parent_)
00386         throw ML::Exception("adding a source that already has a parent");
00387     source->parent_ = this;
00388 
00389     sources.push_back(make_pair(name, source));
00390     int fd = source->selectFd();
00391     if (fd != -1)
00392         addFd(fd, source.get());
00393     bool oldNeedsPoll = needsPoll;
00394     if (source->needsPoll && !oldNeedsPoll) {
00395         needsPoll = true;
00396 
00397         // Deadlock?
00398         if (parent_)
00399             parent_->checkNeedsPoll();
00400     }
00401 
00402     if (debug_) {
00403         cerr << "finished adding source " << name << "; we now have " << sources.size()
00404              << " with needsPoll " << needsPoll;
00405         if (parent_)
00406             cerr << " parent needsPoll = " << parent_->needsPoll << endl;
00407         cerr << endl;
00408     }
00409 
00410     wakeupMainThread();
00411 }
00412 
00413 void
00414 MessageLoop::
00415 debug(bool debugOn)
00416 {
00417     debug_ = debugOn;
00418     Guard guard(lock);
00419     for (unsigned i = 0;  i < sources.size();  ++i) {
00420         sources[i].second->debug(debugOn);
00421     }
00422 }
00423 
00424 void
00425 MessageLoop::
00426 checkNeedsPoll()
00427 {
00428     Guard guard(lock);
00429     bool newNeedsPoll = false;
00430     for (unsigned i = 0;  i < sources.size() && !newNeedsPoll;  ++i)
00431         newNeedsPoll = sources[i].second->needsPoll;
00432     needsPoll = newNeedsPoll;
00433 }
00434 
00435 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator