RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* endpoint.cc 00002 Jeremy Barnes, 21 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "soa/service//endpoint.h" 00008 00009 #include "soa/service//http_endpoint.h" 00010 #include "jml/arch/cmp_xchg.h" 00011 #include "jml/arch/atomic_ops.h" 00012 #include "jml/arch/format.h" 00013 #include "jml/arch/exception.h" 00014 #include "jml/arch/demangle.h" 00015 #include "jml/arch/backtrace.h" 00016 #include "jml/arch/timers.h" 00017 #include "jml/arch/futex.h" 00018 #include "jml/utils/set_utils.h" 00019 #include "jml/utils/guard.h" 00020 #include "jml/utils/vector_utils.h" 00021 #include "jml/utils/smart_ptr_utils.h" 00022 #include "jml/utils/exc_assert.h" 00023 #include "jml/arch/rt.h" 00024 #include <sys/prctl.h> 00025 #include <sys/epoll.h> 00026 #include <poll.h> 00027 00028 00029 using namespace std; 00030 using namespace ML; 00031 00032 00033 namespace Datacratic { 00034 00035 /*****************************************************************************/ 00036 /* ENDPOINT BASE */ 00037 /*****************************************************************************/ 00038 00039 EndpointBase:: 00040 EndpointBase(const std::string & name) 00041 : idle(1), modifyIdle(true), 00042 name_(name), 00043 threadsActive_(0), 00044 numTransports(0), shutdown_(false) 00045 { 00046 Epoller::init(16384); 00047 Epoller::addFd(wakeup.fd()); 00048 Epoller::handleEvent = std::bind(&EndpointBase::handleEpollEvent, 00049 this, 00050 std::placeholders::_1); 00051 } 00052 00053 EndpointBase:: 00054 ~EndpointBase() 00055 { 00056 } 00057 00058 void 00059 EndpointBase:: 00060 spinup(int num_threads, bool synchronous) 00061 { 00062 shutdown_ = false; 00063 00064 if (eventThreads) 00065 throw Exception("spinup with threads already up"); 00066 eventThreads.reset(new boost::thread_group()); 00067 00068 threadsActive_ = 0; 00069 00070 for (unsigned i = 0; i < num_threads; ++i) { 00071 boost::thread * thread 00072 = eventThreads->create_thread 00073 ([=] () 00074 { 00075 this->runEventThread(i, num_threads); 00076 }); 00077 eventThreadList.push_back(thread); 00078 } 00079 00080 if (synchronous) { 00081 for (;;) { 00082 int oldValue = threadsActive_; 00083 if (oldValue >= num_threads) break; 00084 //cerr << "threadsActive_ " << threadsActive_ 00085 // << " of " << num_threads << endl; 00086 futex_wait(threadsActive_, oldValue); 00087 //ML::sleep(0.001); 00088 } 00089 } 00090 } 00091 00092 void 00093 EndpointBase:: 00094 makeRealTime(int priority) 00095 { 00096 for (unsigned i = 0; i < eventThreadList.size(); ++i) 00097 makeThreadRealTime(*eventThreadList[i], priority); 00098 } 00099 00100 void 00101 EndpointBase:: 00102 shutdown() 00103 { 00104 //cerr << "Endpoint shutdown" << endl; 00105 //cerr << "numTransports = " << numTransports << endl; 00106 00107 closePeer(); 00108 00109 { 00110 Guard guard(lock); 00111 //cerr << "sending shutdown to " << alive.size() << " transports" 00112 // << endl; 00113 00114 for (auto it = alive.begin(), end = alive.end(); it != end; ++it) { 00115 auto transport = it->get(); 00116 //cerr << "shutting down transport " << transport->status() << endl; 00117 transport->doAsync([=] () 00118 { 00119 //cerr << "killing transport " << transport 00120 // << endl; 00121 transport->closeWhenHandlerFinished(); 00122 }, 00123 "killtransport"); 00124 } 00125 } 00126 00127 //cerr << "eventThreads = " << eventThreads.get() << endl; 00128 //cerr << "eventThreadList.size() = " << eventThreadList.size() << endl; 00129 00130 //cerr << "numTransports = " << numTransports << endl; 00131 00132 sleepUntilIdle(); 00133 00134 //cerr << "idle" << endl; 00135 00136 while (numTransports != 0) { 00137 //cerr << "shutdown " << this << ": numTransports = " 00138 // << numTransports << endl; 00139 ML::sleep(0.1); 00140 } 00141 00142 //cerr << "numTransports = " << numTransports << endl; 00143 00144 shutdown_ = true; 00145 ML::memory_barrier(); 00146 wakeup.signal(); 00147 00148 if (eventThreads) { 00149 eventThreads->join_all(); 00150 eventThreads.reset(); 00151 } 00152 eventThreadList.clear(); 00153 00154 // Now undo the signal 00155 wakeup.read(); 00156 00157 } 00158 00159 void 00160 EndpointBase:: 00161 useThisThread() 00162 { 00163 runEventThread(-1, -1); 00164 } 00165 00166 void 00167 EndpointBase:: 00168 notifyNewTransport(const std::shared_ptr<TransportBase> & transport) 00169 { 00170 Guard guard(lock); 00171 00172 //cerr << "new transport " << transport << endl; 00173 00174 if (alive.count(transport)) 00175 throw ML::Exception("active set already contains connection"); 00176 alive.insert(transport); 00177 00178 int fd = transport->getHandle(); 00179 if (fd < 0) 00180 throw Exception("notifyNewTransport: fd %d out of range"); 00181 00182 startPolling(transport.get()); 00183 00184 if (numTransports++ == 0 && modifyIdle) 00185 idle.acquire(); 00186 futex_wake(numTransports); 00187 00188 int & ntr = numTransportsByHost[transport->getPeerName()]; 00189 ++ntr; 00190 00191 //cerr << "host " << transport->getPeerName() << " has " 00192 // << ntr << " connections" << endl; 00193 00194 00195 if (onTransportOpen) 00196 onTransportOpen(transport.get()); 00197 } 00198 00199 void 00200 EndpointBase:: 00201 startPolling(TransportBase * transport) 00202 { 00203 addFdOneShot(transport->epollFd_, transport); 00204 } 00205 00206 void 00207 EndpointBase:: 00208 stopPolling(TransportBase * transport) 00209 { 00210 removeFd(transport->epollFd_); 00211 } 00212 00213 void 00214 EndpointBase:: 00215 restartPolling(TransportBase * transport) 00216 { 00217 restartFdOneShot(transport->epollFd_, transport); 00218 } 00219 00220 void 00221 EndpointBase:: 00222 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport) 00223 { 00224 #if 0 00225 cerr << "closed transport " << transport << " with fd " 00226 << transport->getHandle() << " with " << transport.use_count() 00227 << " references" << " and " << transport->hasAsync() << " async" 00228 << endl; 00229 #endif 00230 00231 if (onTransportClose) 00232 onTransportClose(transport.get()); 00233 00234 stopPolling(transport.get()); 00235 00236 transport->zombie_ = true; 00237 transport->closePeer(); 00238 00239 Guard guard(lock); 00240 if (!alive.count(transport)) { 00241 cerr << "closed transport " << transport << " with fd " 00242 << transport->getHandle() << " with " << transport.use_count() 00243 << " references" << " and " << transport->hasAsync() << " async" 00244 << endl; 00245 cerr << "activities: " << endl; 00246 transport->activities.dump(); 00247 cerr << endl << endl; 00248 00249 throw ML::Exception("active set didn't contain connection"); 00250 } 00251 alive.erase(transport); 00252 00253 int & ntr = numTransportsByHost[transport->getPeerName()]; 00254 --numTransports; 00255 futex_wake(numTransports); 00256 --ntr; 00257 if (ntr <= 0) 00258 numTransportsByHost.erase(transport->getPeerName()); 00259 if (numTransports == 0 && modifyIdle) 00260 idle.release(); 00261 } 00262 00263 void 00264 EndpointBase:: 00265 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport) 00266 { 00267 notifyCloseTransport(transport); 00268 } 00269 00270 void 00271 EndpointBase:: 00272 sleepUntilIdle() const 00273 { 00274 for (;;) { 00275 //cerr << "sleepUntilIdle " << this << ": numTransports = " 00276 // << numTransports << endl; 00277 ACE_Time_Value time(0, 100000); 00278 time += ACE_OS::gettimeofday(); 00279 int res = idle.acquire(time); 00280 if (res != -1) { 00281 idle.release(); 00282 return; 00283 } 00284 00285 Guard guard(lock); 00286 cerr << alive.size() << " transports" << endl; 00287 00288 for (auto it = alive.begin(), end = alive.end(); it != end; ++it) { 00289 auto transport = it->get(); 00290 cerr << "transport " << transport->status() << endl; 00291 } 00292 00293 dumpState(); 00294 } 00295 } 00296 00297 void 00298 EndpointBase:: 00299 dumpState() const 00300 { 00301 Guard guard(lock); 00302 cerr << "----------------------------------------------" << endl; 00303 cerr << "Endpoint of type " << type_name(*this) 00304 << " with " << numTransports << " transports" 00305 << endl; 00306 00307 } 00308 00309 int 00310 EndpointBase:: 00311 numConnections() const 00312 { 00313 return numTransports; 00314 } 00315 00316 std::map<std::string, int> 00317 EndpointBase:: 00318 numConnectionsByHost() const 00319 { 00320 Guard guard(lock); 00321 return numTransportsByHost; 00322 } 00323 00325 bool 00326 EndpointBase:: 00327 handleEpollEvent(epoll_event & event) 00328 { 00329 bool debug = false; 00330 00331 if (debug) { 00332 cerr << "handleEvent" << endl; 00333 int mask = event.events; 00334 00335 cerr << "events " 00336 << (mask & EPOLLIN ? "I" : "") 00337 << (mask & EPOLLOUT ? "O" : "") 00338 << (mask & EPOLLPRI ? "P" : "") 00339 << (mask & EPOLLERR ? "E" : "") 00340 << (mask & EPOLLHUP ? "H" : "") 00341 << (mask & EPOLLRDHUP ? "R" : "") 00342 << endl; 00343 } 00344 00345 TransportBase * transport_ 00346 = reinterpret_cast<TransportBase *>(event.data.ptr); 00347 00348 //cerr << "transport_ = " << transport_ << endl; 00349 00350 if (transport_ == 0) return true; // wakeup for shutdown 00351 00352 if (debug) 00353 cerr << "transport status = " << transport_->status() << endl; 00354 00355 // Pin it so that it can't be destroyed whilst handling messages 00356 std::shared_ptr<TransportBase> transport 00357 = transport_->shared_from_this(); 00358 00359 transport->handleEvents(); 00360 00361 if (!transport->isZombie()) 00362 this->restartPolling(transport.get()); 00363 00364 return false; 00365 } 00366 00367 void 00368 EndpointBase:: 00369 runEventThread(int threadNum, int numThreads) 00370 { 00371 //cerr << "runEventThread" << endl; 00372 00373 prctl(PR_SET_NAME,"EptCtrl",0,0,0); 00374 00375 bool debug = false; 00376 //debug = name() == "Backchannel"; 00377 //debug = threadNum == 7; 00378 00379 ML::Duty_Cycle_Timer duty; 00380 00381 Date lastCheck = Date::now(); 00382 00383 ML::atomic_inc(threadsActive_); 00384 futex_wake(threadsActive_); 00385 //cerr << "threadsActive_ " << threadsActive_ << endl; 00386 00387 Epoller::OnEvent beforeSleep = [&] () 00388 { 00389 duty.notifyBeforeSleep(); 00390 }; 00391 00392 Epoller::OnEvent afterSleep = [&] () 00393 { 00394 duty.notifyAfterSleep(); 00395 }; 00396 00397 00398 // Where does my timeslice start? 00399 double timesliceUs = 1000.0 / numThreads; 00400 int myStartUs = timesliceUs * threadNum; 00401 int myEndUs = timesliceUs * (threadNum + 1); 00402 00403 if (debug) { 00404 static ML::Spinlock lock; 00405 lock.acquire(); 00406 cerr << "threadNum = " << threadNum << " of " << name() 00407 << " numThreads = " << numThreads 00408 << " myStartUs = " << myStartUs 00409 << " myEndUs = " << myEndUs 00410 << endl; 00411 lock.release(); 00412 } 00413 00414 bool forceInSlice = false; 00415 00416 while (!shutdown_) { 00417 00418 Date now = Date::now(); 00419 00420 if (now.secondsSince(lastCheck) > 1.0 && debug) { 00421 ML::Duty_Cycle_Timer::Stats stats = duty.stats(); 00422 string msg = format("control thread for %s: " 00423 "events %lld sleeping %lld " 00424 "processing %lld duty %.2f%%", 00425 name().c_str(), 00426 (long long)stats.numWakeups, 00427 (long long)stats.usAsleep, 00428 (long long)stats.usAwake, 00429 stats.duty_cycle() * 100.0); 00430 cerr << msg << flush; 00431 duty.clear(); 00432 lastCheck = now; 00433 } 00434 00435 int us = now.fractionalSeconds() * 1000000; 00436 int fracms = us % 1000; // fractional part of the millisecond 00437 00438 if (debug && false) { 00439 cerr << "now = " << now.print(6) << " us = " << us 00440 << " fracms = " << fracms << " myStartUs = " 00441 << myStartUs << " myEndUs = " << myEndUs 00442 << endl; 00443 } 00444 00445 // Are we in our timeslice? 00446 if (/* forceInSlice 00447 || */(fracms >= myStartUs && fracms < myEndUs)) { 00448 // Yes... then sleep in epoll_wait... 00449 int usToWait = myEndUs - fracms; 00450 if (usToWait < 0 || usToWait > timesliceUs) 00451 usToWait = timesliceUs; 00452 00453 int numHandled = handleEvents(usToWait, 4, handleEvent, 00454 beforeSleep, afterSleep); 00455 if (debug && false) 00456 cerr << " in slice: handled " << numHandled << " events " 00457 << "for " << usToWait << " microseconds " 00458 << " taken " << Date::now().secondsSince(now) * 1000000 00459 << "us" << endl; 00460 if (numHandled == -1) break; 00461 forceInSlice = false; 00462 } 00463 else { 00464 // No... try to handle something and then sleep if we don't 00465 // find anything to do 00466 int numHandled = handleEvents(0, 1, handleEvent, 00467 beforeSleep, afterSleep); 00468 if (debug && false) 00469 cerr << " out of slice: handled " << numHandled << " events" 00470 << endl; 00471 if (numHandled == -1) break; 00472 if (numHandled == 0) { 00473 // Sleep until our timeslice 00474 duty.notifyBeforeSleep(); 00475 int usToSleep = myStartUs - fracms; 00476 if (usToSleep < 0) 00477 usToSleep += 1000; 00478 ExcAssertGreaterEqual(usToSleep, 0); 00479 if (debug && false) 00480 cerr << "sleeping for " << usToSleep << " micros" << endl; 00481 ML::sleep(usToSleep / 1000000.0); 00482 duty.notifyAfterSleep(); 00483 forceInSlice = true; 00484 00485 if (debug && false) 00486 cerr << " slept for " 00487 << Date::now().secondsSince(now) * 1000000 00488 << "us when " << usToSleep << " requested" 00489 << endl; 00490 } 00491 } 00492 } 00493 00494 cerr << "thread shutting down" << endl; 00495 00496 ML::atomic_dec(threadsActive_); 00497 futex_wake(threadsActive_); 00498 } 00499 00500 } // namespace Datacratic