![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* async_event_source.cc 00002 Jeremy Barnes, 9 November 2012 00003 */ 00004 00005 #include "async_event_source.h" 00006 #include <sys/timerfd.h> 00007 #include "jml/arch/exception.h" 00008 #include <iostream> 00009 #include "message_loop.h" 00010 00011 00012 using namespace std; 00013 00014 namespace Datacratic { 00015 00016 /*****************************************************************************/ 00017 /* ASYNC EVENT SOURCE */ 00018 /*****************************************************************************/ 00019 00020 void 00021 AsyncEventSource:: 00022 disconnect() 00023 { 00024 if (!parent_) 00025 return; 00026 parent_->removeSource(this); 00027 parent_ = nullptr; 00028 } 00029 00030 00031 /*****************************************************************************/ 00032 /* PERIODIC EVENT SOURCE */ 00033 /*****************************************************************************/ 00034 00035 PeriodicEventSource:: 00036 PeriodicEventSource() 00037 : timerFd(-1), 00038 timePeriodSeconds(0), 00039 singleThreaded_(true) 00040 00041 { 00042 } 00043 00044 PeriodicEventSource:: 00045 PeriodicEventSource(double timePeriodSeconds, 00046 std::function<void (uint64_t)> onTimeout, 00047 bool singleThreaded) 00048 : timerFd(-1), 00049 timePeriodSeconds(timePeriodSeconds), 00050 onTimeout(onTimeout), 00051 singleThreaded_(singleThreaded) 00052 { 00053 init(timePeriodSeconds, onTimeout, singleThreaded); 00054 } 00055 00056 void 00057 PeriodicEventSource:: 00058 init(double timePeriodSeconds, 00059 std::function<void (uint64_t)> onTimeout, 00060 bool singleThreaded) 00061 { 00062 if (timerFd != -1) 00063 throw ML::Exception("double initialization of periodic event source"); 00064 00065 this->timePeriodSeconds = timePeriodSeconds; 00066 this->onTimeout = onTimeout; 00067 this->singleThreaded_ = singleThreaded; 00068 00069 timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); 00070 if (timerFd == -1) 00071 throw ML::Exception(errno, "timerfd_create"); 00072 00073 itimerspec spec; 00074 00075 int res = clock_gettime(CLOCK_MONOTONIC, &spec.it_value); 00076 if (res == -1) 00077 throw ML::Exception(errno, "clock_gettime"); 00078 uint64_t seconds, nanoseconds; 00079 seconds = timePeriodSeconds; 00080 nanoseconds = (timePeriodSeconds - seconds) * 1000000000; 00081 00082 spec.it_interval.tv_sec = spec.it_value.tv_sec = seconds; 00083 spec.it_interval.tv_nsec = spec.it_value.tv_nsec = nanoseconds; 00084 00085 #if 0 00086 // Relative to current time, so zero 00087 spec.it_value.tv_sec = spec.it_value.tv_nsec = 0; 00088 00089 00090 spec.it_value.tv_nsec += nanoseconds; 00091 if (spec.it_value.tv_nsec >= 1000000000) { 00092 ++spec.it_value.tv_sec; 00093 spec.it_value.tv_nsec -= 1000000000; 00094 } 00095 spec.it_value.tv_sec += seconds; 00096 #endif 00097 00098 res = timerfd_settime(timerFd, 0, &spec, 0); 00099 if (res == -1) 00100 throw ML::Exception(errno, "timerfd_settime"); 00101 } 00102 00103 PeriodicEventSource:: 00104 ~PeriodicEventSource() 00105 { 00106 int res = close(timerFd); 00107 if (res == -1) 00108 cerr << "warning: close on timerfd: " << strerror(errno) << endl; 00109 } 00110 00111 int 00112 PeriodicEventSource:: 00113 selectFd() const 00114 { 00115 return timerFd; 00116 } 00117 00118 bool 00119 PeriodicEventSource:: 00120 processOne() 00121 { 00122 uint64_t numWakeups = 0; 00123 for (;;) { 00124 int res = read(timerFd, &numWakeups, 8); 00125 if (res == -1 && errno == EINTR) continue; 00126 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) 00127 break; 00128 if (res == -1) 00129 throw ML::Exception(errno, "timerfd read"); 00130 else if (res != 8) 00131 throw ML::Exception("timerfd read: wrong number of bytes: %d", 00132 res); 00133 onTimeout(numWakeups); 00134 break; 00135 } 00136 return false; 00137 } 00138 00139 00140 } // namespace Datacratic