RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/transport.cc
00001 /* transport.cc
00002    Jeremy Barnes, 24 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Information on the transport.
00006 */
00007 
00008 #include "transport.h"
00009 
00010 #include "soa/service//http_endpoint.h"
00011 #include "jml/arch/cmp_xchg.h"
00012 #include "jml/arch/atomic_ops.h"
00013 #include "jml/arch/format.h"
00014 #include "jml/arch/exception.h"
00015 #include "jml/arch/demangle.h"
00016 #include "jml/arch/backtrace.h"
00017 #include "jml/utils/environment.h"
00018 #include <iostream>
00019 #include <sys/epoll.h>
00020 #include <sys/timerfd.h>
00021 #include <sys/eventfd.h>
00022 #include <poll.h>
00023 
00024 
00025 using namespace std;
00026 using namespace ML;
00027 
00028 
00029 namespace Datacratic {
00030 
00031 boost::function<void (const char *, float)> onLatencyEvent;
00032 
00033 ML::Env_Option<bool> DEBUG_TRANSPORTS("DEBUG_TRANSPORTS", false);
00034 
00035 
00036 /*****************************************************************************/
00037 /* TRANSPORT BASE                                                            */
00038 /*****************************************************************************/
00039 
00040 long TransportBase::created = 0;
00041 long TransportBase::destroyed = 0;
00042 
00043 
00044 TransportBase::
00045 TransportBase()
00046 {
00047     throw Exception("TransportBase constructor requires an endpoint");
00048 }
00049 
00050 TransportBase::
00051 TransportBase(EndpointBase * endpoint)
00052     : lockThread(0), lockActivity(0), debug(DEBUG_TRANSPORTS),
00053       asyncHead_(0),
00054       endpoint_(endpoint),
00055       recycle_(0), close_(0), flags_(0),
00056       hasConnection_(false), zombie_(false)
00057 {
00058     atomic_add(created, 1);
00059 
00060     if (!endpoint)
00061         throw ML::Exception("transport requires an endpoint");
00062 
00063     magic_ = 0x12345678;
00064 
00065     addActivityS("created");
00066 
00067     epollFd_ = epoll_create(1024);
00068     if (epollFd_ == -1)
00069         throw ML::Exception(errno, "couldn't create epoll fd");
00070     timerFd_ = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK);
00071     if (timerFd_ == -1)
00072         throw ML::Exception(errno, "timer FD couldn't be created");
00073     eventFd_ = eventfd(0, EFD_NONBLOCK);
00074     if (eventFd_ == -1)
00075         throw ML::Exception(errno, "event FD couldn't be created");
00076 
00077     /* Add all of these other FDs to the event FD */
00078     struct epoll_event data;
00079     data.data.u64 = 0;
00080     data.data.fd = timerFd_;
00081     data.events = EPOLLIN;
00082     int res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd_, &data);
00083     if (res == -1)
00084         throw ML::Exception(errno, "epoll_ctl ADD timerFd");
00085     data.data.fd = eventFd_;
00086     res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &data);
00087     if (res == -1)
00088         throw ML::Exception(errno, "epoll_ctl ADD eventFd");
00089 
00090     //dumpActivities();
00091 
00092 }
00093 
00094 TransportBase::
00095 ~TransportBase()
00096 {
00097     int res = close(epollFd_);
00098     if (res == -1)
00099         cerr << "closing epoll fd: " << strerror(errno) << endl;
00100     res = close(timerFd_);
00101     if (res == -1)
00102         cerr << "closing timer fd: " << strerror(errno) << endl;
00103     res = close(eventFd_);
00104     if (res == -1)
00105         cerr << "closing event fd: " << strerror(errno) << endl;
00106 
00107     popAsync();
00108     assertNotLockedByAnotherThread();
00109     checkMagic();
00110     atomic_add(destroyed, 1);
00111     magic_ = 0;
00112     lockThread = 1;
00113     lockActivity = "DESTROYED";
00114 }
00115 
00116 void
00117 TransportBase::
00118 checkMagic() const
00119 {
00120     if (magic_ != 0x12345678) {
00121         cerr << "dead transport: " << endl;
00122         //activities.dump();
00123         throw Exception("attempt to access dead transport %p: magic %d",
00124                         this, magic_);
00125     }
00126 }
00127 
00128 int
00129 TransportBase::
00130 handleInput()
00131 {
00132     if (close_) return -1;
00133 
00134     InHandlerGuard guard(this, "input");
00135 
00136     if (close_) return -1;
00137 
00138     addActivityS("input");
00139 
00140     try {
00141         slave().handleInput();
00142     } catch (const std::exception & exc) {
00143         handleException("input", exc);
00144     } catch (...) {
00145         handleUnknownException("input");
00146     }
00147 
00148     return endEventHandler("input", guard);
00149 }
00150 
00151 int
00152 TransportBase::
00153 handleOutput()
00154 {
00155     if (close_) return -1;
00156 
00157     InHandlerGuard guard(this, "output");
00158 
00159     if (close_) return -1;
00160 
00161     addActivityS("output");
00162 
00163     try {
00164         slave().handleOutput();
00165     } catch (const std::exception & exc) {
00166         handleException("output", exc);
00167     } catch (...) {
00168         handleUnknownException("output");
00169     }
00170 
00171     return endEventHandler("output", guard);
00172 }
00173 
00174 int
00175 TransportBase::
00176 handlePeerShutdown()
00177 {
00178     if (close_) return -1;
00179 
00180     InHandlerGuard guard(this, "peerShutdown");
00181 
00182     if (close_) return -1;
00183 
00184     addActivityS("peerShutdown");
00185 
00186     try {
00187         slave().handlePeerShutdown();
00188     } catch (const std::exception & exc) {
00189         handleException("peerShutdown", exc);
00190     } catch (...) {
00191         handleUnknownException("peerShutdown");
00192     }
00193 
00194     return endEventHandler("peerShutdown", guard);
00195 }
00196 
00197 int
00198 TransportBase::
00199 handleTimeout()
00200 {
00201     if (close_) return -1;
00202 
00203     Date before = Date::now();
00204 
00205     InHandlerGuard guard(this, "timeout");
00206 
00207     if (close_) return -1;
00208 
00209     Date afterLock = Date::now();
00210 
00211     addActivity("timeout %zd in %.1fms", timeout_.timeoutCookie,
00212                 afterLock.secondsSince(before) * 1000);
00213 
00214     try {
00215         slave().handleTimeout(timeout_.timeout, timeout_.timeoutCookie);
00216 
00217         //Date afterHandler = Date::now();
00218 
00219     } catch (const std::exception & exc) {
00220         handleException("timeout", exc);
00221     } catch (...) {
00222         handleUnknownException("timeout");
00223     }
00224 
00225     return endEventHandler("timeout", guard);
00226 }
00227 
00228 int
00229 TransportBase::
00230 handleError(const std::string & error)
00231 {
00232     if (close_) return -1;
00233 
00234     InHandlerGuard guard(this, "error");
00235 
00236     if (close_) return -1;
00237 
00238     addActivityS("error");
00239 
00240     try {
00241         slave().handleError(error);
00242     } catch (const std::exception & exc) {
00243         handleException("error", exc);
00244     } catch (...) {
00245         handleUnknownException("error");
00246     }
00247 
00248     return endEventHandler("error", guard);
00249 }
00250 
00251 int
00252 TransportBase::
00253 handleAsync(const boost::function<void ()> & callback, const char * name,
00254             Date dateSet)
00255 {
00256     Date now = Date::now();
00257 
00258     double delay = now.secondsSince(dateSet);
00259 
00260     if (onLatencyEvent)
00261         onLatencyEvent("asyncCallback", delay);
00262 
00263     static Date lastMessage;
00264     static int messagesSinceLastMessage = 0;
00265 
00266     if (delay > 0.01) {
00267         if (lastMessage.secondsSince(now) > 0.5) {
00268             lastMessage = now;
00269             cerr << "big delay on async callback " << name << ": "
00270                  << delay * 1000 << "ms"
00271                  << endl;
00272             if (messagesSinceLastMessage > 0)
00273                 cerr << "also " << messagesSinceLastMessage << endl;
00274 
00275             cerr << "dateSet: " << dateSet.print(5) << endl;
00276             cerr << "now: " << now.print(5) << endl;
00277             activities.dump();
00278             //* (char *)0 = 0;  // cause segfault for gdb
00279             messagesSinceLastMessage = 0;
00280         }
00281         else ++messagesSinceLastMessage;
00282     }
00283     
00284     if (close_) return -1;
00285 
00286     addActivity("handleAsync: %s", name);
00287 
00288     InHandlerGuard guard(this, name);
00289 
00290     if (close_) return -1;
00291 
00292     addActivityS(name);
00293 
00294     try {
00295         callback();
00296     } catch (const std::exception & exc) {
00297         handleException(name, exc);
00298     } catch (...) {
00299         handleUnknownException(name);
00300     }
00301 
00302     return endEventHandler(name, guard);
00303 }
00304 
00305 void
00306 TransportBase::
00307 associate(std::shared_ptr<ConnectionHandler> newSlave)
00308 {
00309     addActivity("associate with " + newSlave->status());
00310     
00311     //assertLockedByThisThread();
00312     assertNotLockedByAnotherThread();
00313 
00314     if (slave_) {
00315         assertLockedByThisThread();
00316 
00317         if (slave_.get() == newSlave.get())
00318             throw Exception("re-associating the same slave of type "
00319                             + ML::type_name(*slave_));
00320         slave().onDisassociate();
00321     }
00322 
00323     slave_ = newSlave;
00324     slave().setTransport(this);
00325 
00326     auto finishAssociate = [=] ()
00327         {
00328             this->slave().onGotTransport();
00329         };
00330 
00331     /* If we're already locked, then finish the association here.  Otherwise,
00332        do it asynchronously so that we can do it in the handler context.
00333     */
00334     if (lockedByThisThread()) {
00335         InHandlerGuard guard(this, "associate");
00336         finishAssociate();
00337         endEventHandler("associate", guard);
00338     }
00339     else {
00340         doAsync(finishAssociate, "associate");
00341     }
00342     
00343     addActivityS("associate done");
00344 }
00345 
00346 std::shared_ptr<ConnectionHandler>
00347 TransportBase::
00348 disassociate()
00349 {
00350     addActivityS("disassociate");
00351 
00352     if (!slave_)
00353         throw Exception("TransportBase::disassociate(): no slave");
00354 
00355     cancelTimer();
00356 
00357     slave().onDisassociate();
00358     std::shared_ptr<ConnectionHandler> result = slave_;
00359     slave_.reset();
00360     return result;
00361 }
00362 
00363 void
00364 TransportBase::
00365 recycleWhenHandlerFinished()
00366 {
00367     if (!lockedByThisThread())
00368         throw Exception("not in handler");
00369 
00370     addActivityS("recycleWhenHandlerFinished");
00371 
00372     this->recycle_ = true;
00373 }
00374 
00375 void
00376 TransportBase::
00377 closeWhenHandlerFinished()
00378 {
00379     //backtrace();
00380 
00381     if (!lockedByThisThread())
00382         throw Exception("not in handler");
00383 
00384     addActivityS("closeWhenHandlerFinished");
00385 
00386     this->close_ = true;
00387     this->recycle_ = false;
00388 }
00389 
00390 void
00391 TransportBase::
00392 closeAsync()
00393 {
00394     auto doClose = [=] () { this->closeWhenHandlerFinished(); };
00395     doAsync(doClose, "closeAsync");
00396 }
00397 
00398 void
00399 TransportBase::
00400 associateWhenHandlerFinished(std::shared_ptr<ConnectionHandler> slave,
00401                              const std::string & whereFrom)
00402 {
00403     if (!slave)
00404         throw Exception("associateWhenHandlerFinished(): null slave");
00405 
00406     if (newSlave_)
00407         throw Exception("associateWhenHandlerFinished(): attempt to "
00408                         "double replace slave %s from %s "
00409                         "with new slave %s from %s",
00410                         newSlaveFrom_.c_str(),
00411                         type_name(*slave_).c_str(),
00412                         whereFrom.c_str(),
00413                         type_name(*slave).c_str());
00414 
00415     if (!lockedByThisThread()) {
00416         throw Exception("associateWhenHandlerFinished(): needs to be in "
00417                         "handler");
00418         //associate(slave);
00419     }
00420     else {
00421         newSlave_ = slave;
00422         newSlaveFrom_ = whereFrom;
00423     }
00424 }
00425 
00426 void
00427 TransportBase::
00428 handleException(const std::string & handler, const std::exception & exc)
00429 {
00430     addActivityS("exception");
00431 
00432     if (slave_)
00433         slave().onHandlerException(handler, exc);
00434 }
00435 
00436 void
00437 TransportBase::
00438 handleUnknownException(const std::string & handler)
00439 {
00440     handleException(handler, ML::Exception("unknown exception"));
00441 }
00442 
00443 int
00444 TransportBase::
00445 endEventHandler(const char * handler, InHandlerGuard & guard)
00446 {
00447     addActivity("endHandler %s with close %d, recycle %d, "
00448                 "newHandler %p", handler, close_, recycle_,
00449                 newSlave_.get());
00450 
00451     if (close_) {
00452         try {
00453             if (hasSlave()) disassociate();
00454         } catch (const std::exception & exc) {
00455             cerr << "error: disassociate() threw exception: "
00456                  << exc.what();
00457         }
00458 
00459         return -1;  // handle_close will be called once all out of the way
00460     }
00461     else if (recycle_) {
00462         try {
00463             if (hasSlave()) disassociate();
00464             //cerr << "done disassociate for " << handler << " for "
00465             //     << this << " with recycle" << endl;
00466         } catch (const std::exception & exc) {
00467             cerr << "error: disassociate() threw exception: "
00468                  << exc.what();
00469             return -1;
00470         }
00471         endpoint_->notifyRecycleTransport(shared_from_this());
00472         recycle_ = false;
00473         return 0;
00474     }
00475     else if (newSlave_) {
00476         auto saved = newSlave_;
00477         newSlave_.reset();
00478         associate(saved);
00479     }
00480 
00481     return 0;
00482 }
00483 
00484 void
00485 TransportBase::
00486 doError(const std::string & error)
00487 {
00488     slave().doError(error);
00489 }
00490 
00491 struct TransportTimer {
00492     TransportTimer(TransportBase * transport,
00493                    const char * event,
00494                    double maxTime = 0.0)
00495         : start(maxTime == 0.0 ? Date() : Date::now()),
00496           transport(transport), event(event),
00497           maxTime(maxTime)
00498     {
00499         if (maxTime != 0.0)
00500             statusBefore = transport->status();
00501     }
00502 
00503     ~TransportTimer()
00504     {
00505         if (maxTime == 0.0) return;
00506         double elapsed = Date::now().secondsSince(start);
00507         if (elapsed > maxTime) {
00508             cerr << "transport operation " << event << " took "
00509                  << elapsed * 1000 << "ms with status "
00510                  << transport->status() << " before "
00511                  << statusBefore << endl;
00512         }
00513     }
00514 
00515     Date start;
00516     TransportBase * transport;
00517     const char * event;
00518     double maxTime;
00519     std::string statusBefore;
00520 };
00521 
00522 int pollFlagsToEpoll(int flags)
00523 {
00524     int result = 0;
00525     if (flags & POLLIN) result |= EPOLLIN;
00526     if (flags & POLLOUT) result |= EPOLLOUT;
00527     if (flags & POLLPRI) result |= EPOLLPRI;
00528     if (flags & POLLERR) result |= EPOLLERR;
00529     if (flags & POLLHUP) result |= EPOLLHUP;
00530     if (flags & POLLRDHUP) result |= EPOLLRDHUP;
00531     return result;
00532 }
00533 
00534 void
00535 TransportBase::
00536 hasConnection()
00537 {
00538     if (getHandle() < 0)
00539         throw ML::Exception("hasConnection without a connection");
00540 
00541     struct epoll_event data;
00542     data.data.u64 = 0;
00543     data.data.fd = getHandle();
00544     data.events = pollFlagsToEpoll(flags_);
00545     int res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, getHandle(), &data);
00546     if (res == -1)
00547         throw ML::Exception(errno, "epoll_ctl ADD getHandle()");
00548 
00549     hasConnection_ = true;
00550 
00551     //cerr << "transport " << getHandle() << " "
00552     //     << status() << " has a connection" << endl;
00553 }
00554 
00555 std::string
00556 epollFlags(int mask)
00557 {
00558     return ML::format("%s%s%s%s%s%s%s%s",
00559                       (mask & EPOLLIN ? "I" : ""),
00560                       (mask & EPOLLOUT ? "O" : ""),
00561                       (mask & EPOLLPRI ? "P" : ""),
00562                       (mask & EPOLLERR ? "E" : ""),
00563                       (mask & EPOLLHUP ? "H" : ""),
00564                       (mask & EPOLLRDHUP ? "R" : ""),
00565                       (mask & EPOLLET ? "E" : ""),
00566                       (mask & EPOLLONESHOT ? "1" : ""));
00567 }
00568 
00569 std::string
00570 pollFlags(int mask)
00571 {
00572     return ML::format("%s%s%s%s%s%s%s",
00573                       (mask & POLLIN ? "I" : ""),
00574                       (mask & POLLOUT ? "O" : ""),
00575                       (mask & POLLPRI ? "P" : ""),
00576                       (mask & POLLERR ? "E" : ""),
00577                       (mask & POLLHUP ? "H" : ""),
00578                       (mask & POLLRDHUP ? "R" : ""),
00579                       (mask & POLLNVAL ? "N" : ""));
00580 }
00581 
00582 int
00583 TransportBase::
00584 handleEvents()
00585 {
00586     int rc = 0;
00587         
00588     while (!isZombie() && rc != -1) {
00589         struct pollfd items[3] = {
00590             { eventFd_, POLLIN, 0 },
00591             { timerFd_, POLLIN, 0 },
00592             { getHandle(), flags_, 0 }
00593         };
00594 
00595         int res = poll(items, 3, 0);
00596         
00597 #if 0
00598         cerr << "handleevents for " << getHandle() << " " << status()
00599              << " got " << res << " events" << " and has async "
00600              << hasAsync() << endl;
00601 
00602         for (unsigned i = 0;  i < 3;  ++i) {
00603             if (!items[i].revents) continue;
00604             string fdname;
00605             if (i == 0) fdname = "wakeup";
00606             else if (i == 1) fdname = "timer";
00607             else if (i == 2) fdname = "connection";
00608             
00609             int mask = items[i].revents;
00610 
00611             cerr << "    " << fdname << " has flags "
00612                  << pollFlags(mask) << endl;
00613         }
00614 #endif
00615 
00616         if (res == 0 && !hasAsync()) break;
00617         
00618         if (items[0].revents) {
00619             // Clear the wakeup if there was one
00620             uint64_t nevents;
00621             int res = eventfd_read(eventFd_, &nevents);
00622             if (res == -1)
00623                 throw ML::Exception(errno, "eventfd_read");
00624             //cerr << "    got wakeup" << endl;
00625         }
00626         if (rc != -1 && items[2].revents & POLLERR) {
00627             // Connection finished or has an error; check which one
00628             int error = 0;
00629             socklen_t error_len = sizeof(int);
00630             int res = getsockopt(getHandle(), SOL_SOCKET, SO_ERROR,
00631                                  &error, &error_len);
00632             if (res == -1 || error_len != sizeof(int))
00633                 throw ML::Exception(errno, "getsockopt(SO_ERROR)");
00634             
00635             {
00636                 TransportTimer timer(this, "error");
00637                 rc = handleError(strerror(error));
00638             }
00639         }
00640         if (rc != -1 && items[1].revents & POLLIN) {
00641             // Timeout...
00642             
00643             // First read the number of timeouts to reset the count
00644             uint64_t numTimeouts;
00645             int res = read(timerFd_, &numTimeouts, 8);
00646             if (res == -1)
00647                 throw ML::Exception(errno, "reading from timerfd");
00648             if (res != 8)
00649                 throw ML::Exception("read wrong num bytes from timerfd");
00650             
00651             if (timeout_.isSet()) {
00652                 // Now call the handler
00653                 TransportTimer timer(this, "timeout");
00654                 rc = handleTimeout();
00655             }
00656 
00657             //cerr << "    got timeout" << endl;
00658         }
00659         if (rc != -1
00660             && (items[2].revents & POLLIN)
00661             && (flags_ & POLLIN)) {
00662             //cerr << "    got input" << endl;
00663             TransportTimer timer(this, "input");
00664             rc = handleInput();
00665         }
00666         if (rc != -1
00667             && (items[2].revents & POLLOUT)
00668             && (flags_ & POLLOUT)) {
00669             //cerr << "    got output" << endl;
00670             TransportTimer timer(this, "output");
00671             rc = handleOutput();
00672         }
00673         if (rc != -1
00674             && (items[2].revents & POLLRDHUP)
00675             && (flags_ & POLLRDHUP)) {
00676             //cerr << "    got output" << endl;
00677             TransportTimer timer(this, "peerShutdown");
00678             rc = handlePeerShutdown();
00679         }
00680 
00681         if (hasAsync() && rc != -1) {
00682             std::vector<AsyncEntry> async
00683                 = popAsync();
00684             
00685             for (unsigned i = 0;  i < async.size();  ++i) {
00686                 //cerr << "    got async " << async[i].name << endl;
00687                 TransportTimer timer(this, async[i].name.c_str());
00688                 rc = handleAsync(async[i].callback,
00689                                  async[i].name.c_str(),
00690                                  async[i].date);
00691                 if (rc == -1) break;
00692             }
00693         }
00694     }
00695 
00696     //cerr << "finished handling events for " << this << " with rc " << rc
00697     //     << endl;
00698 
00699     if (rc == -1) {
00700 
00701         //cerr << "    closing connection" << endl;
00702         std::shared_ptr<TransportBase> tr
00703             = shared_from_this();
00704     
00705         {
00706             InHandlerGuard guard(this, "close");
00707 
00708             addActivityS("close");
00709 
00710             if (hasSlave()) {
00711                 cerr << "    slave" << endl;
00712                 slave().onCleanup();
00713                 slave_.reset();
00714             }
00715         }
00716 
00717         if (hasConnection_) {
00718             int res = epoll_ctl(epollFd_, EPOLL_CTL_DEL, getHandle(), 0);
00719             if (res == -1)
00720                 throw ML::Exception("TransportBase::close(): epoll_ctl DEL %d: %s",
00721                                     getHandle(), strerror(errno));
00722         }
00723         cancelTimer();
00724 
00725         //closePeer();
00726         
00727         endpoint_->notifyCloseTransport(tr);
00728     }
00729     else if (hasConnection_) {
00730         // Change the epoll event set
00731 
00732         struct epoll_event data;
00733         data.data.u64 = 0;
00734         data.data.fd = getHandle();
00735         data.events = pollFlagsToEpoll(flags_);
00736         
00737         //cerr << "setting flags to " << epollFlags(flags_) << endl;
00738 
00739         int res = epoll_ctl(epollFd_, EPOLL_CTL_MOD, getHandle(), &data);
00740         
00741         // TODO: no setting of mask if not necessary
00742         if (res == -1)
00743             throw ML::Exception(errno, "TransportBase::close(): epoll_ctl MOD");
00744     }
00745 
00746     return rc;
00747 }
00748 
00749 std::string
00750 TransportBase::
00751 status() const
00752 {
00753     string result = format("Transport %p", this)
00754         + " of type " + ML::type_name(*this);
00755     if (hasSlave())
00756         result += " with slave " + slave_->status();
00757     else result += " with no slave";
00758     return result;
00759 }
00760 
00761 void
00762 TransportBase::
00763 startReading()
00764 {
00765     //cerr << "starting reading " << this << endl;
00766     assertLockedByThisThread();
00767     flags_ |= POLLIN;
00768 }
00769 
00770 void
00771 TransportBase::
00772 stopReading()
00773 {
00774     //cerr << "stopping reading " << this << endl;
00775     assertLockedByThisThread();
00776     flags_ &= ~POLLIN;
00777 }
00778 
00779 void
00780 TransportBase::
00781 startWriting()
00782 {
00783     //cerr << "starting writing " << this << endl;
00784     assertLockedByThisThread();
00785     flags_ |= POLLOUT;
00786 }
00787 
00788 void
00789 TransportBase::
00790 stopWriting()
00791 {
00792     //cerr << "stopping writing " << this << endl;
00793     assertLockedByThisThread();
00794     flags_ &= ~POLLOUT;
00795 }
00796 
00797 void
00798 TransportBase::
00799 scheduleTimerAbsolute(Date timeout, size_t cookie,
00800                       void (*freecookie) (size_t))
00801 {
00802     timeout_.set(timeout, cookie, freecookie);
00803     long seconds = timeout.wholeSecondsSinceEpoch();
00804     long nanoseconds = timeout.fractionalSeconds() * 1000000000.0;
00805     itimerspec spec = { { 0, 0 }, { seconds, nanoseconds } };
00806     int res = timerfd_settime(timerFd_, TFD_TIMER_ABSTIME, &spec, 0);
00807     if (res == -1)
00808         throw ML::Exception(errno, "timerfd_settime absolute");
00809 }
00810 
00811 void 
00812 TransportBase::
00813 scheduleTimerRelative(double secondsFromNow,
00814                       size_t cookie,
00815                       void (*freecookie) (size_t))
00816 {
00817     assertLockedByThisThread();
00818     
00819     if (secondsFromNow < 0)
00820         throw ML::Exception("attempting to schedule timer in the past: %f",
00821                             secondsFromNow);
00822 
00823     timeout_.set(Date::now().plusSeconds(secondsFromNow),
00824                  cookie, freecookie);
00825     long seconds = secondsFromNow;
00826     long nanoseconds = 1000000000.0 * (secondsFromNow - seconds);
00827     itimerspec spec = { { 0, 0 }, { seconds, nanoseconds } };
00828     int res = timerfd_settime(timerFd_, 0, &spec, 0);
00829     if (res == -1)
00830         throw ML::Exception(errno, "timerfd_settime relative");
00831 }
00832     
00833 void
00834 TransportBase::
00835 cancelTimer()
00836 {
00837     timeout_.cancel();
00838 
00839     itimerspec spec = { { 0, 0 }, { 0, 0 } };
00840     int res = timerfd_settime(timerFd_, 0, &spec, 0);
00841     if (res == -1)
00842         throw ML::Exception(errno, "timerfd_settime");
00843 }
00844 
00845 void
00846 TransportBase::
00847 doAsync(const boost::function<void ()> & callback, const std::string & name)
00848 {
00849     addActivity("doAsync: %s", name.c_str());
00850     pushAsync(callback, name);
00851 }
00852 
00853 void
00854 TransportBase::
00855 pushAsync(const boost::function<void ()> & fn, const std::string & name)
00856 {
00857     std::auto_ptr<AsyncNode> node(new AsyncNode(fn, name));
00858     
00859     AsyncNode * current = asyncHead_;
00860     
00861     for (;;) {
00862         node->next = current;
00863         if (ML::cmp_xchg(asyncHead_, current, node.get())) break;
00864     }
00865 
00866     node.release();
00867     
00868     if (!lockedByThisThread())
00869         eventfd_write(eventFd_, 1);
00870 }
00871 
00874 std::vector<TransportBase::AsyncEntry>
00875 TransportBase::
00876 popAsync()
00877 {
00878     AsyncNode * current = asyncHead_;
00879     
00880     for (;;) {
00881         if (ML::cmp_xchg(asyncHead_, current, (AsyncNode *)0)) break;
00882     }
00883     
00884     std::vector<AsyncEntry> result;
00885     
00886     for (; current; ) {
00887         result.push_back(*current);
00888         auto next = current->next;
00889         delete current;
00890         current = next;
00891     }
00892     
00893     // TODO: should just iterate in reverse order...
00894     std::reverse(result.begin(), result.end());
00895     
00896     return result;
00897 }
00898 
00899 TransportBase::Activities::
00900 ~Activities()
00901 {
00902     Guard guard(lock);
00903     activities.clear();
00904 }
00905 
00906 void
00907 TransportBase::Activities::
00908 dump() const
00909 {
00910     Guard guard(lock);
00911     if (activities.empty()) return;
00912     Date firstTime = activities.front().time, lastTime = firstTime;
00913     for (unsigned i = 0;  i < activities.size();  ++i) {
00914         Date time = activities[i].time;
00915         cerr << format("%3d %s %7.3f %7.3f %s\n",
00916                        i,
00917                        time.print(4).c_str(),
00918                        time.secondsSince(firstTime),
00919                        time.secondsSince(lastTime),
00920                        activities[i].what.c_str());
00921         lastTime = time;
00922     }
00923 }
00924 
00925 Json::Value
00926 TransportBase::Activities::
00927 toJson(int first, int last) const
00928 {
00929     Guard guard(lock);
00930     if (last == -1) last = size();
00931 
00932     if (first < 0 || last < first || last > size())
00933         throw Exception("Activities::toJson(): "
00934                         "range %d-%d incompatible with 0-%d",
00935                         first, last, (int)size());
00936 
00937     Json::Value result;
00938 
00939     if (first == last) return result;
00940 
00941     Date firstTime = activities[first].time, lastTime = firstTime;
00942     for (unsigned i = first;  i < last;  ++i) {
00943         Date time = activities[i].time;
00944         result[i - first][0u] = time.print(4);
00945         result[i - first][1u] = time.secondsSince(firstTime);
00946         result[i - first][2u] = time.secondsSince(lastTime);
00947         result[i - first][3u] = activities[i].what;
00948         lastTime = time;
00949     }
00950 
00951     return result;
00952 }
00953 
00954 void
00955 TransportBase::Activity::
00956 fromJson(const Json::Value & val)
00957 {
00958     if (val.size() != 4)
00959         throw Exception("not an activity in JSON: %s", val.toString().c_str());
00960     
00961     time = Date(val[0u]);
00962     what = val[3].asString();
00963 }
00964 
00965 void
00966 TransportBase::Activities::
00967 fromJson(const Json::Value & val)
00968 {
00969     vector<Activity> activities;
00970 
00971     for (unsigned i = 0;  i < val.size();  ++i) {
00972         activities.push_back(val[i]);
00973     }
00974 
00975     activities.swap(activities);
00976 }
00977 
00978 
00979 /*****************************************************************************/
00980 /* SOCKET TRANSPORT                                                          */
00981 /*****************************************************************************/
00982 
00983 SocketTransport::
00984 SocketTransport()
00985 {
00986 }
00987 
00988 SocketTransport::
00989 SocketTransport(EndpointBase * endpoint)
00990     : TransportBase(endpoint)
00991 {
00992 }
00993 
00994 SocketTransport::
00995 ~SocketTransport()
00996 {
00997     if (getHandle() != -1) {
00998         cerr << "warning: closing TransportBase " << this
00999              << " of type " << status() << "with open socket "
01000              << getHandle() << endl;
01001         backtrace();
01002     }
01003 }
01004 
01005 ssize_t
01006 SocketTransport::
01007 send(const char * buf, size_t len, int flags)
01008 {
01009     return peer().send(buf, len, flags);
01010 }
01011    
01012 ssize_t
01013 SocketTransport::
01014 recv(char * buf, size_t buf_size, int flags)
01015 {
01016     return peer().recv(buf, buf_size, flags);
01017 }
01018 
01019 int
01020 SocketTransport::
01021 closePeer()
01022 {
01023     addActivityS("closePeer");
01024     return peer().close();
01025 }
01026 
01027 } // namespace Datacratic
01028 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator