RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/endpoint.cc
00001 /* endpoint.cc
00002    Jeremy Barnes, 21 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "soa/service//endpoint.h"
00008 
00009 #include "soa/service//http_endpoint.h"
00010 #include "jml/arch/cmp_xchg.h"
00011 #include "jml/arch/atomic_ops.h"
00012 #include "jml/arch/format.h"
00013 #include "jml/arch/exception.h"
00014 #include "jml/arch/demangle.h"
00015 #include "jml/arch/backtrace.h"
00016 #include "jml/arch/timers.h"
00017 #include "jml/arch/futex.h"
00018 #include "jml/utils/set_utils.h"
00019 #include "jml/utils/guard.h"
00020 #include "jml/utils/vector_utils.h"
00021 #include "jml/utils/smart_ptr_utils.h"
00022 #include "jml/utils/exc_assert.h"
00023 #include "jml/arch/rt.h"
00024 #include <sys/prctl.h>
00025 #include <sys/epoll.h>
00026 #include <poll.h>
00027 
00028 
00029 using namespace std;
00030 using namespace ML;
00031 
00032 
00033 namespace Datacratic {
00034 
00035 /*****************************************************************************/
00036 /* ENDPOINT BASE                                                             */
00037 /*****************************************************************************/
00038 
00039 EndpointBase::
00040 EndpointBase(const std::string & name)
00041     : idle(1), modifyIdle(true),
00042       name_(name),
00043       threadsActive_(0),
00044       numTransports(0), shutdown_(false)
00045 {
00046     Epoller::init(16384);
00047     Epoller::addFd(wakeup.fd());
00048     Epoller::handleEvent = std::bind(&EndpointBase::handleEpollEvent,
00049                                      this,
00050                                      std::placeholders::_1);
00051 }
00052 
00053 EndpointBase::
00054 ~EndpointBase()
00055 {
00056 }
00057 
00058 void
00059 EndpointBase::
00060 spinup(int num_threads, bool synchronous)
00061 {
00062     shutdown_ = false;
00063 
00064     if (eventThreads)
00065         throw Exception("spinup with threads already up");
00066     eventThreads.reset(new boost::thread_group());
00067 
00068     threadsActive_ = 0;
00069 
00070     for (unsigned i = 0;  i < num_threads;  ++i) {
00071         boost::thread * thread
00072             = eventThreads->create_thread
00073             ([=] ()
00074              {
00075                  this->runEventThread(i, num_threads);
00076              });
00077         eventThreadList.push_back(thread);
00078     }
00079 
00080     if (synchronous) {
00081         for (;;) {
00082             int oldValue = threadsActive_;
00083             if (oldValue >= num_threads) break;
00084             //cerr << "threadsActive_ " << threadsActive_
00085             //     << " of " << num_threads << endl;
00086             futex_wait(threadsActive_, oldValue);
00087             //ML::sleep(0.001);
00088         }
00089     }
00090 }
00091 
00092 void
00093 EndpointBase::
00094 makeRealTime(int priority)
00095 {
00096     for (unsigned i = 0;  i < eventThreadList.size();  ++i)
00097         makeThreadRealTime(*eventThreadList[i], priority);
00098 }
00099 
00100 void
00101 EndpointBase::
00102 shutdown()
00103 {
00104     //cerr << "Endpoint shutdown" << endl;
00105     //cerr << "numTransports = " << numTransports << endl;
00106 
00107     closePeer();
00108 
00109     {
00110         Guard guard(lock);
00111         //cerr << "sending shutdown to " << alive.size() << " transports"
00112         //     << endl;
00113 
00114         for (auto it = alive.begin(), end = alive.end();  it != end;  ++it) {
00115             auto transport = it->get();
00116             //cerr << "shutting down transport " << transport->status() << endl;
00117             transport->doAsync([=] ()
00118                                {
00119                                    //cerr << "killing transport " << transport
00120                                    //     << endl;
00121                                    transport->closeWhenHandlerFinished();
00122                                },
00123                                "killtransport");
00124         }
00125     }
00126 
00127     //cerr << "eventThreads = " << eventThreads.get() << endl;
00128     //cerr << "eventThreadList.size() = " << eventThreadList.size() << endl;
00129 
00130     //cerr << "numTransports = " << numTransports << endl;
00131 
00132     sleepUntilIdle();
00133 
00134     //cerr << "idle" << endl;
00135 
00136     while (numTransports != 0) {
00137         //cerr << "shutdown " << this << ": numTransports = "
00138         //     << numTransports << endl;
00139         ML::sleep(0.1);
00140     }
00141 
00142     //cerr << "numTransports = " << numTransports << endl;
00143 
00144     shutdown_ = true;
00145     ML::memory_barrier();
00146     wakeup.signal();
00147 
00148     if (eventThreads) {
00149         eventThreads->join_all();
00150         eventThreads.reset();
00151     }
00152     eventThreadList.clear();
00153 
00154     // Now undo the signal
00155     wakeup.read();
00156 
00157 }
00158 
00159 void
00160 EndpointBase::
00161 useThisThread()
00162 {
00163     runEventThread(-1, -1);
00164 }
00165 
00166 void
00167 EndpointBase::
00168 notifyNewTransport(const std::shared_ptr<TransportBase> & transport)
00169 {
00170     Guard guard(lock);
00171 
00172     //cerr << "new transport " << transport << endl;
00173 
00174     if (alive.count(transport))
00175         throw ML::Exception("active set already contains connection");
00176     alive.insert(transport);
00177 
00178     int fd = transport->getHandle();
00179     if (fd < 0)
00180         throw Exception("notifyNewTransport: fd %d out of range");
00181 
00182     startPolling(transport.get());
00183 
00184     if (numTransports++ == 0 && modifyIdle)
00185         idle.acquire();
00186     futex_wake(numTransports);
00187 
00188     int & ntr = numTransportsByHost[transport->getPeerName()];
00189     ++ntr;
00190 
00191     //cerr << "host " << transport->getPeerName() << " has "
00192     //     << ntr << " connections" << endl;
00193 
00194 
00195     if (onTransportOpen)
00196         onTransportOpen(transport.get());
00197 }
00198 
00199 void
00200 EndpointBase::
00201 startPolling(TransportBase * transport)
00202 {
00203     addFdOneShot(transport->epollFd_, transport);
00204 }
00205 
00206 void
00207 EndpointBase::
00208 stopPolling(TransportBase * transport)
00209 {
00210     removeFd(transport->epollFd_);
00211 }
00212 
00213 void
00214 EndpointBase::
00215 restartPolling(TransportBase * transport)
00216 {
00217     restartFdOneShot(transport->epollFd_, transport);
00218 }
00219 
00220 void
00221 EndpointBase::
00222 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport)
00223 {
00224 #if 0
00225     cerr << "closed transport " << transport << " with fd "
00226          << transport->getHandle() << " with " << transport.use_count()
00227          << " references" << " and " << transport->hasAsync() << " async"
00228          << endl;
00229 #endif
00230 
00231     if (onTransportClose)
00232         onTransportClose(transport.get());
00233 
00234     stopPolling(transport.get());
00235 
00236     transport->zombie_ = true;
00237     transport->closePeer();
00238 
00239     Guard guard(lock);
00240     if (!alive.count(transport)) {
00241         cerr << "closed transport " << transport << " with fd "
00242              << transport->getHandle() << " with " << transport.use_count()
00243              << " references" << " and " << transport->hasAsync() << " async"
00244              << endl;
00245         cerr << "activities: " << endl;
00246         transport->activities.dump();
00247         cerr << endl << endl;
00248 
00249         throw ML::Exception("active set didn't contain connection");
00250     }
00251     alive.erase(transport);
00252 
00253     int & ntr = numTransportsByHost[transport->getPeerName()];
00254     --numTransports;
00255     futex_wake(numTransports);
00256     --ntr;
00257     if (ntr <= 0)
00258         numTransportsByHost.erase(transport->getPeerName());
00259     if (numTransports == 0 && modifyIdle)
00260         idle.release();
00261 }
00262 
00263 void
00264 EndpointBase::
00265 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport)
00266 {
00267     notifyCloseTransport(transport);
00268 }
00269 
00270 void
00271 EndpointBase::
00272 sleepUntilIdle() const
00273 {
00274     for (;;) {
00275         //cerr << "sleepUntilIdle " << this << ": numTransports = "
00276         //     << numTransports << endl;
00277         ACE_Time_Value time(0, 100000);
00278         time += ACE_OS::gettimeofday();
00279         int res = idle.acquire(time);
00280         if (res != -1) {
00281             idle.release();
00282             return;
00283         }
00284 
00285         Guard guard(lock);
00286         cerr << alive.size() << " transports" << endl;
00287 
00288         for (auto it = alive.begin(), end = alive.end();  it != end;  ++it) {
00289             auto transport = it->get();
00290             cerr << "transport " << transport->status() << endl;
00291         }
00292 
00293         dumpState();
00294     }
00295 }
00296 
00297 void
00298 EndpointBase::
00299 dumpState() const
00300 {
00301     Guard guard(lock);
00302     cerr << "----------------------------------------------" << endl;
00303     cerr << "Endpoint of type " << type_name(*this)
00304          << " with " << numTransports << " transports"
00305          << endl;
00306 
00307 }
00308 
00309 int
00310 EndpointBase::
00311 numConnections() const
00312 {
00313     return numTransports;
00314 }
00315 
00316 std::map<std::string, int>
00317 EndpointBase::
00318 numConnectionsByHost() const
00319 {
00320     Guard guard(lock);
00321     return numTransportsByHost;
00322 }
00323 
00325 bool
00326 EndpointBase::
00327 handleEpollEvent(epoll_event & event)
00328 {
00329     bool debug = false;
00330 
00331     if (debug) {
00332         cerr << "handleEvent" << endl;
00333         int mask = event.events;
00334                 
00335         cerr << "events " 
00336              << (mask & EPOLLIN ? "I" : "")
00337              << (mask & EPOLLOUT ? "O" : "")
00338              << (mask & EPOLLPRI ? "P" : "")
00339              << (mask & EPOLLERR ? "E" : "")
00340              << (mask & EPOLLHUP ? "H" : "")
00341              << (mask & EPOLLRDHUP ? "R" : "")
00342              << endl;
00343     }            
00344             
00345     TransportBase * transport_
00346         = reinterpret_cast<TransportBase *>(event.data.ptr);
00347     
00348     //cerr << "transport_ = " << transport_ << endl; 
00349 
00350     if (transport_ == 0) return true;  // wakeup for shutdown
00351 
00352     if (debug)
00353         cerr << "transport status = " << transport_->status() << endl;
00354 
00355     // Pin it so that it can't be destroyed whilst handling messages
00356     std::shared_ptr<TransportBase> transport
00357         = transport_->shared_from_this();
00358 
00359     transport->handleEvents();
00360 
00361     if (!transport->isZombie())
00362         this->restartPolling(transport.get());
00363 
00364     return false;
00365 }
00366 
00367 void
00368 EndpointBase::
00369 runEventThread(int threadNum, int numThreads)
00370 {
00371     //cerr << "runEventThread" << endl;
00372 
00373     prctl(PR_SET_NAME,"EptCtrl",0,0,0);
00374 
00375     bool debug = false;
00376     //debug = name() == "Backchannel";
00377     //debug = threadNum == 7;
00378 
00379     ML::Duty_Cycle_Timer duty;
00380 
00381     Date lastCheck = Date::now();
00382 
00383     ML::atomic_inc(threadsActive_);
00384     futex_wake(threadsActive_);
00385     //cerr << "threadsActive_ " << threadsActive_ << endl;
00386 
00387     Epoller::OnEvent beforeSleep = [&] ()
00388         {
00389             duty.notifyBeforeSleep();
00390         };
00391 
00392     Epoller::OnEvent afterSleep = [&] ()
00393         {
00394             duty.notifyAfterSleep();
00395         };
00396 
00397 
00398     // Where does my timeslice start?
00399     double timesliceUs = 1000.0 / numThreads;
00400     int myStartUs = timesliceUs * threadNum;
00401     int myEndUs   = timesliceUs * (threadNum + 1);
00402 
00403     if (debug) {
00404         static ML::Spinlock lock;
00405         lock.acquire();
00406         cerr << "threadNum = " << threadNum << " of " << name()
00407              << " numThreads = " << numThreads
00408              << " myStartUs = " << myStartUs
00409              << " myEndUs = " << myEndUs
00410              << endl;
00411         lock.release();
00412     }
00413 
00414     bool forceInSlice = false;
00415 
00416     while (!shutdown_) {
00417 
00418         Date now = Date::now();
00419         
00420         if (now.secondsSince(lastCheck) > 1.0 && debug) {
00421             ML::Duty_Cycle_Timer::Stats stats = duty.stats();
00422             string msg = format("control thread for %s: "
00423                                 "events %lld sleeping %lld "
00424                                 "processing %lld duty %.2f%%",
00425                                 name().c_str(),
00426                                 (long long)stats.numWakeups,
00427                                 (long long)stats.usAsleep,
00428                                 (long long)stats.usAwake,
00429                                 stats.duty_cycle() * 100.0);
00430             cerr << msg << flush;
00431             duty.clear();
00432             lastCheck = now;
00433         }
00434 
00435         int us = now.fractionalSeconds() * 1000000;
00436         int fracms = us % 1000;  // fractional part of the millisecond
00437         
00438         if (debug && false) {
00439             cerr << "now = " << now.print(6) << " us = " << us
00440                  << " fracms = " << fracms << " myStartUs = "
00441                  << myStartUs << " myEndUs = " << myEndUs
00442                  << endl;
00443         }
00444 
00445         // Are we in our timeslice?
00446         if (/* forceInSlice
00447                || */(fracms >= myStartUs && fracms < myEndUs)) {
00448             // Yes... then sleep in epoll_wait...
00449             int usToWait = myEndUs - fracms;
00450             if (usToWait < 0 || usToWait > timesliceUs)
00451                 usToWait = timesliceUs;
00452             
00453             int numHandled = handleEvents(usToWait, 4, handleEvent,
00454                                           beforeSleep, afterSleep);
00455             if (debug && false)
00456                 cerr << "  in slice: handled " << numHandled << " events "
00457                      << "for " << usToWait << " microseconds "
00458                      << " taken " << Date::now().secondsSince(now) * 1000000
00459                      << "us" << endl;
00460             if (numHandled == -1) break;
00461             forceInSlice = false;
00462         }
00463         else {
00464             // No... try to handle something and then sleep if we don't
00465             // find anything to do
00466             int numHandled = handleEvents(0, 1, handleEvent,
00467                                           beforeSleep, afterSleep);
00468             if (debug && false)
00469                 cerr << "  out of slice: handled " << numHandled << " events"
00470                      << endl;
00471             if (numHandled == -1) break;
00472             if (numHandled == 0) {
00473                 // Sleep until our timeslice
00474                 duty.notifyBeforeSleep();
00475                 int usToSleep = myStartUs - fracms;
00476                 if (usToSleep < 0)
00477                     usToSleep += 1000;
00478                 ExcAssertGreaterEqual(usToSleep, 0);
00479                 if (debug && false)
00480                     cerr << "sleeping for " << usToSleep << " micros" << endl;
00481                 ML::sleep(usToSleep / 1000000.0);
00482                 duty.notifyAfterSleep();
00483                 forceInSlice = true;
00484 
00485                 if (debug && false)
00486                     cerr << " slept for "
00487                          << Date::now().secondsSince(now) * 1000000
00488                          << "us when " << usToSleep << " requested"
00489                          << endl;
00490             }
00491         }
00492     }
00493 
00494     cerr << "thread shutting down" << endl;
00495 
00496     ML::atomic_dec(threadsActive_);
00497     futex_wake(threadsActive_);
00498 }
00499 
00500 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator