RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/async_event_source.cc
00001 /* async_event_source.cc
00002    Jeremy Barnes, 9 November 2012
00003 */
00004 
00005 #include "async_event_source.h"
00006 #include <sys/timerfd.h>
00007 #include "jml/arch/exception.h"
00008 #include <iostream>
00009 #include "message_loop.h"
00010 
00011 
00012 using namespace std;
00013 
00014 namespace Datacratic {
00015 
00016 /*****************************************************************************/
00017 /* ASYNC EVENT SOURCE                                                        */
00018 /*****************************************************************************/
00019 
00020 void
00021 AsyncEventSource::
00022 disconnect()
00023 {
00024     if (!parent_)
00025         return;
00026     parent_->removeSource(this);
00027     parent_ = nullptr;
00028 }
00029 
00030 
00031 /*****************************************************************************/
00032 /* PERIODIC EVENT SOURCE                                                     */
00033 /*****************************************************************************/
00034 
00035 PeriodicEventSource::
00036 PeriodicEventSource()
00037     : timerFd(-1),
00038       timePeriodSeconds(0),
00039       singleThreaded_(true)
00040       
00041 {
00042 }
00043 
00044 PeriodicEventSource::
00045 PeriodicEventSource(double timePeriodSeconds,
00046                     std::function<void (uint64_t)> onTimeout,
00047                     bool singleThreaded)
00048     : timerFd(-1),
00049       timePeriodSeconds(timePeriodSeconds),
00050       onTimeout(onTimeout),
00051       singleThreaded_(singleThreaded)
00052 {
00053     init(timePeriodSeconds, onTimeout, singleThreaded);
00054 }
00055 
00056 void
00057 PeriodicEventSource::
00058 init(double timePeriodSeconds,
00059      std::function<void (uint64_t)> onTimeout,
00060      bool singleThreaded)
00061 {
00062     if (timerFd != -1)
00063         throw ML::Exception("double initialization of periodic event source");
00064 
00065     this->timePeriodSeconds = timePeriodSeconds;
00066     this->onTimeout = onTimeout;
00067     this->singleThreaded_ = singleThreaded;
00068 
00069     timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
00070     if (timerFd == -1)
00071         throw ML::Exception(errno, "timerfd_create");
00072 
00073     itimerspec spec;
00074     
00075     int res = clock_gettime(CLOCK_MONOTONIC, &spec.it_value);
00076     if (res == -1)
00077         throw ML::Exception(errno, "clock_gettime");
00078     uint64_t seconds, nanoseconds;
00079     seconds = timePeriodSeconds;
00080     nanoseconds = (timePeriodSeconds - seconds) * 1000000000;
00081 
00082     spec.it_interval.tv_sec = spec.it_value.tv_sec = seconds;
00083     spec.it_interval.tv_nsec = spec.it_value.tv_nsec = nanoseconds;
00084 
00085 #if 0    
00086     // Relative to current time, so zero
00087     spec.it_value.tv_sec = spec.it_value.tv_nsec = 0;
00088 
00089     
00090     spec.it_value.tv_nsec += nanoseconds;
00091     if (spec.it_value.tv_nsec >= 1000000000) {
00092         ++spec.it_value.tv_sec;
00093         spec.it_value.tv_nsec -= 1000000000;
00094     }
00095     spec.it_value.tv_sec += seconds;
00096 #endif
00097 
00098     res = timerfd_settime(timerFd, 0, &spec, 0);
00099     if (res == -1)
00100         throw ML::Exception(errno, "timerfd_settime");
00101 }
00102 
00103 PeriodicEventSource::
00104 ~PeriodicEventSource()
00105 {
00106     int res = close(timerFd);
00107     if (res == -1)
00108         cerr << "warning: close on timerfd: " << strerror(errno) << endl;
00109 }
00110 
00111 int
00112 PeriodicEventSource::
00113 selectFd() const
00114 {
00115     return timerFd;
00116 }
00117 
00118 bool
00119 PeriodicEventSource::
00120 processOne()
00121 {
00122     uint64_t numWakeups = 0;
00123     for (;;) {
00124         int res = read(timerFd, &numWakeups, 8);
00125         if (res == -1 && errno == EINTR) continue;
00126         if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
00127             break;
00128         if (res == -1)
00129             throw ML::Exception(errno, "timerfd read");
00130         else if (res != 8)
00131             throw ML::Exception("timerfd read: wrong number of bytes: %d",
00132                                 res);
00133         onTimeout(numWakeups);
00134         break;
00135     }
00136     return false;
00137 }
00138 
00139 
00140 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator