![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00007 #include "soa/service//message_loop.h" 00008 #include "jml/utils/smart_ptr_utils.h" 00009 #include <boost/make_shared.hpp> 00010 #include <iostream> 00011 #include "jml/arch/exception.h" 00012 #include "jml/arch/timers.h" 00013 #include "jml/arch/demangle.h" 00014 #include <time.h> 00015 #include <limits.h> 00016 #include "jml/arch/futex.h" 00017 #include "soa/types/date.h" 00018 #include <sys/epoll.h> 00019 #include "jml/arch/backtrace.h" 00020 #include <thread> 00021 00022 00023 using namespace std; 00024 using namespace ML; 00025 00026 00027 namespace Datacratic { 00028 00029 00030 /*****************************************************************************/ 00031 /* ASYNC EVENT SOURCE */ 00032 /*****************************************************************************/ 00033 00034 00035 00036 /*****************************************************************************/ 00037 /* MESSAGE LOOP */ 00038 /*****************************************************************************/ 00039 00040 MessageLoop:: 00041 MessageLoop(int numThreads, double maxAddedLatency) 00042 : numThreadsCreated(0), totalSleepTime(0.0) 00043 { 00044 init(numThreads, maxAddedLatency); 00045 } 00046 00047 MessageLoop:: 00048 ~MessageLoop() 00049 { 00050 shutdown(); 00051 } 00052 00053 void 00054 MessageLoop:: 00055 init(int numThreads, double maxAddedLatency) 00056 { 00057 if (maxAddedLatency == 0) 00058 cerr << "warning: MessageLoop with maxAddedLatency of zero will busy wait" << endl; 00059 00060 Epoller::init(16384); 00061 this->shutdown_ = false; 00062 this->maxAddedLatency_ = maxAddedLatency; 00063 this->handleEvent = std::bind(&MessageLoop::handleEpollEvent, 00064 this, 00065 std::placeholders::_1); 00066 debug_ = false; 00067 } 00068 00069 void 00070 MessageLoop:: 00071 start(std::function<void ()> onStop) 00072 { 00073 if (numThreadsCreated) 00074 throw ML::Exception("already have started message loop"); 00075 00076 //cerr << "starting thread from " << this << endl; 00077 //ML::backtrace(); 00078 00079 auto runfn = [=] () 00080 { 00081 this->runWorkerThread(); 00082 if (onStop) onStop(); 00083 }; 00084 00085 threads.create_thread(runfn); 00086 00087 ++numThreadsCreated; 00088 } 00089 00090 void 00091 MessageLoop:: 00092 startSync() 00093 { 00094 if (numThreadsCreated) 00095 throw ML::Exception("already have started message loop"); 00096 00097 ++numThreadsCreated; 00098 00099 runWorkerThread(); 00100 } 00101 00102 void 00103 MessageLoop:: 00104 shutdown() 00105 { 00106 if (shutdown_) 00107 return; 00108 00109 shutdown_ = true; 00110 futex_wake((int &)shutdown_); 00111 00112 threads.join_all(); 00113 00114 numThreadsCreated = 0; 00115 } 00116 00117 void 00118 MessageLoop:: 00119 addSource(const std::string & name, 00120 AsyncEventSource & source, int priority) 00121 { 00122 addSource(name, make_unowned_std_sp(source), priority); 00123 } 00124 00125 void 00126 MessageLoop:: 00127 addSource(const std::string & name, 00128 std::shared_ptr<AsyncEventSource> source, int priority) 00129 { 00130 Guard guard(lock); 00131 addSourceImpl(name, source, priority); 00132 } 00133 00134 void 00135 MessageLoop:: 00136 addPeriodic(const std::string & name, 00137 double timePeriodSeconds, 00138 std::function<void (uint64_t)> toRun, 00139 int priority) 00140 { 00141 addSource(name, 00142 std::make_shared<PeriodicEventSource>(timePeriodSeconds, 00143 toRun), 00144 priority); 00145 } 00146 00147 void 00148 MessageLoop:: 00149 addSourceDeferred(const std::string & name, 00150 AsyncEventSource & source, int priority) 00151 { 00152 //cerr << "addSourceDeferred" << endl; 00153 // TODO: assert that lock is held by this thread 00154 deferredSources.push_back(make_tuple(name, make_unowned_std_sp(source), priority)); 00155 } 00156 00157 void 00158 MessageLoop:: 00159 addSourceDeferred(const std::string & name, 00160 std::shared_ptr<AsyncEventSource> source, int priority) 00161 { 00162 //cerr << "addSourceDeferred" << endl; 00163 // TODO: assert that lock is held by this thread 00164 deferredSources.push_back(make_tuple(name, source, priority)); 00165 } 00166 00167 void 00168 MessageLoop:: 00169 addPeriodicDeferred(const std::string & name, 00170 double timePeriodSeconds, 00171 std::function<void (uint64_t)> toRun, 00172 int priority) 00173 { 00174 addSourceDeferred(name, 00175 std::make_shared<PeriodicEventSource>(timePeriodSeconds, 00176 toRun), 00177 priority); 00178 } 00179 00180 void 00181 MessageLoop:: 00182 removeSource(AsyncEventSource * source) 00183 { 00184 // When we free the elements in our destructor, they will try to remove themselves. 00185 // we just make it a nop. 00186 if (shutdown_) 00187 return; 00188 00189 Guard guard(lock); 00190 for (unsigned i = 0; i < sources.size(); ++i) { 00191 if (sources[i].second.get() != source) 00192 continue; 00193 int fd = source->selectFd(); 00194 if (fd != -1) { 00195 removeFd(fd); 00196 00197 // Make sure that our and our parent's value of needsPoll is up to date 00198 bool sourceNeedsPoll = source->needsPoll; 00199 if (needsPoll && sourceNeedsPoll) { 00200 bool oldNeedsPoll = needsPoll; 00201 checkNeedsPoll(); 00202 if (oldNeedsPoll != needsPoll && parent_) 00203 parent_->checkNeedsPoll(); 00204 } 00205 } 00206 sources.erase(sources.begin() + i); 00207 return; 00208 } 00209 00210 throw ML::Exception("couldn't remove message loop source"); 00211 } 00212 00213 void 00214 MessageLoop:: 00215 wakeupMainThread() 00216 { 00217 // TODO: do 00218 } 00219 00220 void 00221 MessageLoop:: 00222 startSubordinateThread(const SubordinateThreadFn & thread) 00223 { 00224 Guard guard(lock); 00225 int64_t id = 0; 00226 threads.create_thread(std::bind(thread, std::ref(shutdown_), 00227 id)); 00228 } 00229 00230 void 00231 MessageLoop:: 00232 runWorkerThread() 00233 { 00234 Date lastCheck = Date::now(); 00235 00236 ML::Duty_Cycle_Timer duty; 00237 00238 while (!shutdown_) { 00239 00240 00241 Date start = Date::now(); 00242 00243 bool more = true; 00244 00245 if (debug_) { 00246 cerr << "handling events from " << sources.size() 00247 << " sources with needsPoll " << needsPoll << endl; 00248 for (unsigned i = 0; i < sources.size(); ++i) 00249 cerr << sources[i].first << " " << sources[i].second->needsPoll << endl; 00250 } 00251 00252 while (more) { 00253 more = processOne(); 00254 } 00255 00256 Date end = Date::now(); 00257 00258 double elapsed = end.secondsSince(start); 00259 double sleepTime = maxAddedLatency_ - elapsed; 00260 00261 duty.notifyBeforeSleep(); 00262 if (sleepTime > 0) { 00263 totalSleepTime += sleepTime; 00264 ML::sleep(sleepTime); 00265 } 00266 duty.notifyAfterSleep(); 00267 00268 if (lastCheck.secondsUntil(end) > 10.0) { 00269 auto stats = duty.stats(); 00270 //cerr << "message loop: wakeups " << stats.numWakeups 00271 // << " duty " << stats.duty_cycle() << endl; 00272 lastCheck = end; 00273 duty.clear(); 00274 } 00275 } 00276 } 00277 00278 bool 00279 MessageLoop:: 00280 handleEpollEvent(epoll_event & event) 00281 { 00282 bool debug = false; 00283 00284 if (debug) { 00285 cerr << "handleEvent" << endl; 00286 int mask = event.events; 00287 00288 cerr << "events " 00289 << (mask & EPOLLIN ? "I" : "") 00290 << (mask & EPOLLOUT ? "O" : "") 00291 << (mask & EPOLLPRI ? "P" : "") 00292 << (mask & EPOLLERR ? "E" : "") 00293 << (mask & EPOLLHUP ? "H" : "") 00294 << (mask & EPOLLRDHUP ? "R" : "") 00295 << endl; 00296 } 00297 00298 AsyncEventSource * source 00299 = reinterpret_cast<AsyncEventSource *>(event.data.ptr); 00300 00301 //cerr << "source = " << source << " of type " 00302 // << ML::type_name(*source) << endl; 00303 00304 if (source == 0) return true; // wakeup for shutdown 00305 00306 source->processOne(); 00307 00308 return false; 00309 } 00310 00311 bool 00312 MessageLoop:: 00313 poll() const 00314 { 00315 if (needsPoll) { 00316 Guard guard(lock); 00317 for (auto & s: sources) 00318 if (s.second->poll()) 00319 return true; 00320 return false; 00321 } 00322 else return Epoller::poll(); 00323 } 00324 00325 bool 00326 MessageLoop:: 00327 processOne() 00328 { 00329 bool more = false; 00330 00331 if (needsPoll) { 00332 Guard guard(lock); 00333 00334 for (unsigned i = 0; i < sources.size(); ++i) { 00335 try { 00336 bool hasMore = sources[i].second->processOne(); 00337 if (debug_) { 00338 cerr << "source " << sources[i].first << " has " << hasMore << endl; 00339 } 00340 //if (hasMore) 00341 // cerr << "source " << sources[i].first << " has more" << endl; 00342 more = more || hasMore; 00343 } catch (...) { 00344 cerr << "exception processing source " << sources[i].first 00345 << endl; 00346 throw; 00347 } 00348 } 00349 } 00350 else more = Epoller::processOne(); 00351 00352 // Add in any deferred sources that have been queued by the event handlers 00353 if (!deferredSources.empty()) { 00354 // Taking this lock after calling empty() instead of before is dodgy, 00355 // but looking at the g++ STL it should never crash and it saves us 00356 // from a spurious lock on every iteration through. 00357 // TODO: a better scheme, maybe with a boolean for hasDeferredSources 00358 Guard guard(lock); 00359 00360 //cerr << "adding " << deferredSources.size() << " deferred sources" 00361 // << endl; 00362 00363 for (auto s: deferredSources) 00364 addSourceImpl(std::get<0>(s), std::get<1>(s), std::get<2>(s)); 00365 00366 deferredSources.clear(); 00367 } 00368 00369 return more; 00370 } 00371 00372 void 00373 MessageLoop:: 00374 addSourceImpl(const std::string & name, 00375 std::shared_ptr<AsyncEventSource> source, int priority) 00376 { 00377 if (debug_) { 00378 cerr << "adding source " << name << "; we now have " << sources.size() 00379 << " with needsPoll " << needsPoll; 00380 if (parent_) 00381 cerr << " parent needsPoll = " << parent_->needsPoll << endl; 00382 cerr << endl; 00383 } 00384 00385 if (source->parent_) 00386 throw ML::Exception("adding a source that already has a parent"); 00387 source->parent_ = this; 00388 00389 sources.push_back(make_pair(name, source)); 00390 int fd = source->selectFd(); 00391 if (fd != -1) 00392 addFd(fd, source.get()); 00393 bool oldNeedsPoll = needsPoll; 00394 if (source->needsPoll && !oldNeedsPoll) { 00395 needsPoll = true; 00396 00397 // Deadlock? 00398 if (parent_) 00399 parent_->checkNeedsPoll(); 00400 } 00401 00402 if (debug_) { 00403 cerr << "finished adding source " << name << "; we now have " << sources.size() 00404 << " with needsPoll " << needsPoll; 00405 if (parent_) 00406 cerr << " parent needsPoll = " << parent_->needsPoll << endl; 00407 cerr << endl; 00408 } 00409 00410 wakeupMainThread(); 00411 } 00412 00413 void 00414 MessageLoop:: 00415 debug(bool debugOn) 00416 { 00417 debug_ = debugOn; 00418 Guard guard(lock); 00419 for (unsigned i = 0; i < sources.size(); ++i) { 00420 sources[i].second->debug(debugOn); 00421 } 00422 } 00423 00424 void 00425 MessageLoop:: 00426 checkNeedsPoll() 00427 { 00428 Guard guard(lock); 00429 bool newNeedsPoll = false; 00430 for (unsigned i = 0; i < sources.size() && !newNeedsPoll; ++i) 00431 newNeedsPoll = sources[i].second->needsPoll; 00432 needsPoll = newNeedsPoll; 00433 } 00434 00435 } // namespace Datacratic