RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* transport.cc 00002 Jeremy Barnes, 24 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Information on the transport. 00006 */ 00007 00008 #include "transport.h" 00009 00010 #include "soa/service//http_endpoint.h" 00011 #include "jml/arch/cmp_xchg.h" 00012 #include "jml/arch/atomic_ops.h" 00013 #include "jml/arch/format.h" 00014 #include "jml/arch/exception.h" 00015 #include "jml/arch/demangle.h" 00016 #include "jml/arch/backtrace.h" 00017 #include "jml/utils/environment.h" 00018 #include <iostream> 00019 #include <sys/epoll.h> 00020 #include <sys/timerfd.h> 00021 #include <sys/eventfd.h> 00022 #include <poll.h> 00023 00024 00025 using namespace std; 00026 using namespace ML; 00027 00028 00029 namespace Datacratic { 00030 00031 boost::function<void (const char *, float)> onLatencyEvent; 00032 00033 ML::Env_Option<bool> DEBUG_TRANSPORTS("DEBUG_TRANSPORTS", false); 00034 00035 00036 /*****************************************************************************/ 00037 /* TRANSPORT BASE */ 00038 /*****************************************************************************/ 00039 00040 long TransportBase::created = 0; 00041 long TransportBase::destroyed = 0; 00042 00043 00044 TransportBase:: 00045 TransportBase() 00046 { 00047 throw Exception("TransportBase constructor requires an endpoint"); 00048 } 00049 00050 TransportBase:: 00051 TransportBase(EndpointBase * endpoint) 00052 : lockThread(0), lockActivity(0), debug(DEBUG_TRANSPORTS), 00053 asyncHead_(0), 00054 endpoint_(endpoint), 00055 recycle_(0), close_(0), flags_(0), 00056 hasConnection_(false), zombie_(false) 00057 { 00058 atomic_add(created, 1); 00059 00060 if (!endpoint) 00061 throw ML::Exception("transport requires an endpoint"); 00062 00063 magic_ = 0x12345678; 00064 00065 addActivityS("created"); 00066 00067 epollFd_ = epoll_create(1024); 00068 if (epollFd_ == -1) 00069 throw ML::Exception(errno, "couldn't create epoll fd"); 00070 timerFd_ = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK); 00071 if (timerFd_ == -1) 00072 throw ML::Exception(errno, "timer FD couldn't be created"); 00073 eventFd_ = eventfd(0, EFD_NONBLOCK); 00074 if (eventFd_ == -1) 00075 throw ML::Exception(errno, "event FD couldn't be created"); 00076 00077 /* Add all of these other FDs to the event FD */ 00078 struct epoll_event data; 00079 data.data.u64 = 0; 00080 data.data.fd = timerFd_; 00081 data.events = EPOLLIN; 00082 int res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd_, &data); 00083 if (res == -1) 00084 throw ML::Exception(errno, "epoll_ctl ADD timerFd"); 00085 data.data.fd = eventFd_; 00086 res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &data); 00087 if (res == -1) 00088 throw ML::Exception(errno, "epoll_ctl ADD eventFd"); 00089 00090 //dumpActivities(); 00091 00092 } 00093 00094 TransportBase:: 00095 ~TransportBase() 00096 { 00097 int res = close(epollFd_); 00098 if (res == -1) 00099 cerr << "closing epoll fd: " << strerror(errno) << endl; 00100 res = close(timerFd_); 00101 if (res == -1) 00102 cerr << "closing timer fd: " << strerror(errno) << endl; 00103 res = close(eventFd_); 00104 if (res == -1) 00105 cerr << "closing event fd: " << strerror(errno) << endl; 00106 00107 popAsync(); 00108 assertNotLockedByAnotherThread(); 00109 checkMagic(); 00110 atomic_add(destroyed, 1); 00111 magic_ = 0; 00112 lockThread = 1; 00113 lockActivity = "DESTROYED"; 00114 } 00115 00116 void 00117 TransportBase:: 00118 checkMagic() const 00119 { 00120 if (magic_ != 0x12345678) { 00121 cerr << "dead transport: " << endl; 00122 //activities.dump(); 00123 throw Exception("attempt to access dead transport %p: magic %d", 00124 this, magic_); 00125 } 00126 } 00127 00128 int 00129 TransportBase:: 00130 handleInput() 00131 { 00132 if (close_) return -1; 00133 00134 InHandlerGuard guard(this, "input"); 00135 00136 if (close_) return -1; 00137 00138 addActivityS("input"); 00139 00140 try { 00141 slave().handleInput(); 00142 } catch (const std::exception & exc) { 00143 handleException("input", exc); 00144 } catch (...) { 00145 handleUnknownException("input"); 00146 } 00147 00148 return endEventHandler("input", guard); 00149 } 00150 00151 int 00152 TransportBase:: 00153 handleOutput() 00154 { 00155 if (close_) return -1; 00156 00157 InHandlerGuard guard(this, "output"); 00158 00159 if (close_) return -1; 00160 00161 addActivityS("output"); 00162 00163 try { 00164 slave().handleOutput(); 00165 } catch (const std::exception & exc) { 00166 handleException("output", exc); 00167 } catch (...) { 00168 handleUnknownException("output"); 00169 } 00170 00171 return endEventHandler("output", guard); 00172 } 00173 00174 int 00175 TransportBase:: 00176 handlePeerShutdown() 00177 { 00178 if (close_) return -1; 00179 00180 InHandlerGuard guard(this, "peerShutdown"); 00181 00182 if (close_) return -1; 00183 00184 addActivityS("peerShutdown"); 00185 00186 try { 00187 slave().handlePeerShutdown(); 00188 } catch (const std::exception & exc) { 00189 handleException("peerShutdown", exc); 00190 } catch (...) { 00191 handleUnknownException("peerShutdown"); 00192 } 00193 00194 return endEventHandler("peerShutdown", guard); 00195 } 00196 00197 int 00198 TransportBase:: 00199 handleTimeout() 00200 { 00201 if (close_) return -1; 00202 00203 Date before = Date::now(); 00204 00205 InHandlerGuard guard(this, "timeout"); 00206 00207 if (close_) return -1; 00208 00209 Date afterLock = Date::now(); 00210 00211 addActivity("timeout %zd in %.1fms", timeout_.timeoutCookie, 00212 afterLock.secondsSince(before) * 1000); 00213 00214 try { 00215 slave().handleTimeout(timeout_.timeout, timeout_.timeoutCookie); 00216 00217 //Date afterHandler = Date::now(); 00218 00219 } catch (const std::exception & exc) { 00220 handleException("timeout", exc); 00221 } catch (...) { 00222 handleUnknownException("timeout"); 00223 } 00224 00225 return endEventHandler("timeout", guard); 00226 } 00227 00228 int 00229 TransportBase:: 00230 handleError(const std::string & error) 00231 { 00232 if (close_) return -1; 00233 00234 InHandlerGuard guard(this, "error"); 00235 00236 if (close_) return -1; 00237 00238 addActivityS("error"); 00239 00240 try { 00241 slave().handleError(error); 00242 } catch (const std::exception & exc) { 00243 handleException("error", exc); 00244 } catch (...) { 00245 handleUnknownException("error"); 00246 } 00247 00248 return endEventHandler("error", guard); 00249 } 00250 00251 int 00252 TransportBase:: 00253 handleAsync(const boost::function<void ()> & callback, const char * name, 00254 Date dateSet) 00255 { 00256 Date now = Date::now(); 00257 00258 double delay = now.secondsSince(dateSet); 00259 00260 if (onLatencyEvent) 00261 onLatencyEvent("asyncCallback", delay); 00262 00263 static Date lastMessage; 00264 static int messagesSinceLastMessage = 0; 00265 00266 if (delay > 0.01) { 00267 if (lastMessage.secondsSince(now) > 0.5) { 00268 lastMessage = now; 00269 cerr << "big delay on async callback " << name << ": " 00270 << delay * 1000 << "ms" 00271 << endl; 00272 if (messagesSinceLastMessage > 0) 00273 cerr << "also " << messagesSinceLastMessage << endl; 00274 00275 cerr << "dateSet: " << dateSet.print(5) << endl; 00276 cerr << "now: " << now.print(5) << endl; 00277 activities.dump(); 00278 //* (char *)0 = 0; // cause segfault for gdb 00279 messagesSinceLastMessage = 0; 00280 } 00281 else ++messagesSinceLastMessage; 00282 } 00283 00284 if (close_) return -1; 00285 00286 addActivity("handleAsync: %s", name); 00287 00288 InHandlerGuard guard(this, name); 00289 00290 if (close_) return -1; 00291 00292 addActivityS(name); 00293 00294 try { 00295 callback(); 00296 } catch (const std::exception & exc) { 00297 handleException(name, exc); 00298 } catch (...) { 00299 handleUnknownException(name); 00300 } 00301 00302 return endEventHandler(name, guard); 00303 } 00304 00305 void 00306 TransportBase:: 00307 associate(std::shared_ptr<ConnectionHandler> newSlave) 00308 { 00309 addActivity("associate with " + newSlave->status()); 00310 00311 //assertLockedByThisThread(); 00312 assertNotLockedByAnotherThread(); 00313 00314 if (slave_) { 00315 assertLockedByThisThread(); 00316 00317 if (slave_.get() == newSlave.get()) 00318 throw Exception("re-associating the same slave of type " 00319 + ML::type_name(*slave_)); 00320 slave().onDisassociate(); 00321 } 00322 00323 slave_ = newSlave; 00324 slave().setTransport(this); 00325 00326 auto finishAssociate = [=] () 00327 { 00328 this->slave().onGotTransport(); 00329 }; 00330 00331 /* If we're already locked, then finish the association here. Otherwise, 00332 do it asynchronously so that we can do it in the handler context. 00333 */ 00334 if (lockedByThisThread()) { 00335 InHandlerGuard guard(this, "associate"); 00336 finishAssociate(); 00337 endEventHandler("associate", guard); 00338 } 00339 else { 00340 doAsync(finishAssociate, "associate"); 00341 } 00342 00343 addActivityS("associate done"); 00344 } 00345 00346 std::shared_ptr<ConnectionHandler> 00347 TransportBase:: 00348 disassociate() 00349 { 00350 addActivityS("disassociate"); 00351 00352 if (!slave_) 00353 throw Exception("TransportBase::disassociate(): no slave"); 00354 00355 cancelTimer(); 00356 00357 slave().onDisassociate(); 00358 std::shared_ptr<ConnectionHandler> result = slave_; 00359 slave_.reset(); 00360 return result; 00361 } 00362 00363 void 00364 TransportBase:: 00365 recycleWhenHandlerFinished() 00366 { 00367 if (!lockedByThisThread()) 00368 throw Exception("not in handler"); 00369 00370 addActivityS("recycleWhenHandlerFinished"); 00371 00372 this->recycle_ = true; 00373 } 00374 00375 void 00376 TransportBase:: 00377 closeWhenHandlerFinished() 00378 { 00379 //backtrace(); 00380 00381 if (!lockedByThisThread()) 00382 throw Exception("not in handler"); 00383 00384 addActivityS("closeWhenHandlerFinished"); 00385 00386 this->close_ = true; 00387 this->recycle_ = false; 00388 } 00389 00390 void 00391 TransportBase:: 00392 closeAsync() 00393 { 00394 auto doClose = [=] () { this->closeWhenHandlerFinished(); }; 00395 doAsync(doClose, "closeAsync"); 00396 } 00397 00398 void 00399 TransportBase:: 00400 associateWhenHandlerFinished(std::shared_ptr<ConnectionHandler> slave, 00401 const std::string & whereFrom) 00402 { 00403 if (!slave) 00404 throw Exception("associateWhenHandlerFinished(): null slave"); 00405 00406 if (newSlave_) 00407 throw Exception("associateWhenHandlerFinished(): attempt to " 00408 "double replace slave %s from %s " 00409 "with new slave %s from %s", 00410 newSlaveFrom_.c_str(), 00411 type_name(*slave_).c_str(), 00412 whereFrom.c_str(), 00413 type_name(*slave).c_str()); 00414 00415 if (!lockedByThisThread()) { 00416 throw Exception("associateWhenHandlerFinished(): needs to be in " 00417 "handler"); 00418 //associate(slave); 00419 } 00420 else { 00421 newSlave_ = slave; 00422 newSlaveFrom_ = whereFrom; 00423 } 00424 } 00425 00426 void 00427 TransportBase:: 00428 handleException(const std::string & handler, const std::exception & exc) 00429 { 00430 addActivityS("exception"); 00431 00432 if (slave_) 00433 slave().onHandlerException(handler, exc); 00434 } 00435 00436 void 00437 TransportBase:: 00438 handleUnknownException(const std::string & handler) 00439 { 00440 handleException(handler, ML::Exception("unknown exception")); 00441 } 00442 00443 int 00444 TransportBase:: 00445 endEventHandler(const char * handler, InHandlerGuard & guard) 00446 { 00447 addActivity("endHandler %s with close %d, recycle %d, " 00448 "newHandler %p", handler, close_, recycle_, 00449 newSlave_.get()); 00450 00451 if (close_) { 00452 try { 00453 if (hasSlave()) disassociate(); 00454 } catch (const std::exception & exc) { 00455 cerr << "error: disassociate() threw exception: " 00456 << exc.what(); 00457 } 00458 00459 return -1; // handle_close will be called once all out of the way 00460 } 00461 else if (recycle_) { 00462 try { 00463 if (hasSlave()) disassociate(); 00464 //cerr << "done disassociate for " << handler << " for " 00465 // << this << " with recycle" << endl; 00466 } catch (const std::exception & exc) { 00467 cerr << "error: disassociate() threw exception: " 00468 << exc.what(); 00469 return -1; 00470 } 00471 endpoint_->notifyRecycleTransport(shared_from_this()); 00472 recycle_ = false; 00473 return 0; 00474 } 00475 else if (newSlave_) { 00476 auto saved = newSlave_; 00477 newSlave_.reset(); 00478 associate(saved); 00479 } 00480 00481 return 0; 00482 } 00483 00484 void 00485 TransportBase:: 00486 doError(const std::string & error) 00487 { 00488 slave().doError(error); 00489 } 00490 00491 struct TransportTimer { 00492 TransportTimer(TransportBase * transport, 00493 const char * event, 00494 double maxTime = 0.0) 00495 : start(maxTime == 0.0 ? Date() : Date::now()), 00496 transport(transport), event(event), 00497 maxTime(maxTime) 00498 { 00499 if (maxTime != 0.0) 00500 statusBefore = transport->status(); 00501 } 00502 00503 ~TransportTimer() 00504 { 00505 if (maxTime == 0.0) return; 00506 double elapsed = Date::now().secondsSince(start); 00507 if (elapsed > maxTime) { 00508 cerr << "transport operation " << event << " took " 00509 << elapsed * 1000 << "ms with status " 00510 << transport->status() << " before " 00511 << statusBefore << endl; 00512 } 00513 } 00514 00515 Date start; 00516 TransportBase * transport; 00517 const char * event; 00518 double maxTime; 00519 std::string statusBefore; 00520 }; 00521 00522 int pollFlagsToEpoll(int flags) 00523 { 00524 int result = 0; 00525 if (flags & POLLIN) result |= EPOLLIN; 00526 if (flags & POLLOUT) result |= EPOLLOUT; 00527 if (flags & POLLPRI) result |= EPOLLPRI; 00528 if (flags & POLLERR) result |= EPOLLERR; 00529 if (flags & POLLHUP) result |= EPOLLHUP; 00530 if (flags & POLLRDHUP) result |= EPOLLRDHUP; 00531 return result; 00532 } 00533 00534 void 00535 TransportBase:: 00536 hasConnection() 00537 { 00538 if (getHandle() < 0) 00539 throw ML::Exception("hasConnection without a connection"); 00540 00541 struct epoll_event data; 00542 data.data.u64 = 0; 00543 data.data.fd = getHandle(); 00544 data.events = pollFlagsToEpoll(flags_); 00545 int res = epoll_ctl(epollFd_, EPOLL_CTL_ADD, getHandle(), &data); 00546 if (res == -1) 00547 throw ML::Exception(errno, "epoll_ctl ADD getHandle()"); 00548 00549 hasConnection_ = true; 00550 00551 //cerr << "transport " << getHandle() << " " 00552 // << status() << " has a connection" << endl; 00553 } 00554 00555 std::string 00556 epollFlags(int mask) 00557 { 00558 return ML::format("%s%s%s%s%s%s%s%s", 00559 (mask & EPOLLIN ? "I" : ""), 00560 (mask & EPOLLOUT ? "O" : ""), 00561 (mask & EPOLLPRI ? "P" : ""), 00562 (mask & EPOLLERR ? "E" : ""), 00563 (mask & EPOLLHUP ? "H" : ""), 00564 (mask & EPOLLRDHUP ? "R" : ""), 00565 (mask & EPOLLET ? "E" : ""), 00566 (mask & EPOLLONESHOT ? "1" : "")); 00567 } 00568 00569 std::string 00570 pollFlags(int mask) 00571 { 00572 return ML::format("%s%s%s%s%s%s%s", 00573 (mask & POLLIN ? "I" : ""), 00574 (mask & POLLOUT ? "O" : ""), 00575 (mask & POLLPRI ? "P" : ""), 00576 (mask & POLLERR ? "E" : ""), 00577 (mask & POLLHUP ? "H" : ""), 00578 (mask & POLLRDHUP ? "R" : ""), 00579 (mask & POLLNVAL ? "N" : "")); 00580 } 00581 00582 int 00583 TransportBase:: 00584 handleEvents() 00585 { 00586 int rc = 0; 00587 00588 while (!isZombie() && rc != -1) { 00589 struct pollfd items[3] = { 00590 { eventFd_, POLLIN, 0 }, 00591 { timerFd_, POLLIN, 0 }, 00592 { getHandle(), flags_, 0 } 00593 }; 00594 00595 int res = poll(items, 3, 0); 00596 00597 #if 0 00598 cerr << "handleevents for " << getHandle() << " " << status() 00599 << " got " << res << " events" << " and has async " 00600 << hasAsync() << endl; 00601 00602 for (unsigned i = 0; i < 3; ++i) { 00603 if (!items[i].revents) continue; 00604 string fdname; 00605 if (i == 0) fdname = "wakeup"; 00606 else if (i == 1) fdname = "timer"; 00607 else if (i == 2) fdname = "connection"; 00608 00609 int mask = items[i].revents; 00610 00611 cerr << " " << fdname << " has flags " 00612 << pollFlags(mask) << endl; 00613 } 00614 #endif 00615 00616 if (res == 0 && !hasAsync()) break; 00617 00618 if (items[0].revents) { 00619 // Clear the wakeup if there was one 00620 uint64_t nevents; 00621 int res = eventfd_read(eventFd_, &nevents); 00622 if (res == -1) 00623 throw ML::Exception(errno, "eventfd_read"); 00624 //cerr << " got wakeup" << endl; 00625 } 00626 if (rc != -1 && items[2].revents & POLLERR) { 00627 // Connection finished or has an error; check which one 00628 int error = 0; 00629 socklen_t error_len = sizeof(int); 00630 int res = getsockopt(getHandle(), SOL_SOCKET, SO_ERROR, 00631 &error, &error_len); 00632 if (res == -1 || error_len != sizeof(int)) 00633 throw ML::Exception(errno, "getsockopt(SO_ERROR)"); 00634 00635 { 00636 TransportTimer timer(this, "error"); 00637 rc = handleError(strerror(error)); 00638 } 00639 } 00640 if (rc != -1 && items[1].revents & POLLIN) { 00641 // Timeout... 00642 00643 // First read the number of timeouts to reset the count 00644 uint64_t numTimeouts; 00645 int res = read(timerFd_, &numTimeouts, 8); 00646 if (res == -1) 00647 throw ML::Exception(errno, "reading from timerfd"); 00648 if (res != 8) 00649 throw ML::Exception("read wrong num bytes from timerfd"); 00650 00651 if (timeout_.isSet()) { 00652 // Now call the handler 00653 TransportTimer timer(this, "timeout"); 00654 rc = handleTimeout(); 00655 } 00656 00657 //cerr << " got timeout" << endl; 00658 } 00659 if (rc != -1 00660 && (items[2].revents & POLLIN) 00661 && (flags_ & POLLIN)) { 00662 //cerr << " got input" << endl; 00663 TransportTimer timer(this, "input"); 00664 rc = handleInput(); 00665 } 00666 if (rc != -1 00667 && (items[2].revents & POLLOUT) 00668 && (flags_ & POLLOUT)) { 00669 //cerr << " got output" << endl; 00670 TransportTimer timer(this, "output"); 00671 rc = handleOutput(); 00672 } 00673 if (rc != -1 00674 && (items[2].revents & POLLRDHUP) 00675 && (flags_ & POLLRDHUP)) { 00676 //cerr << " got output" << endl; 00677 TransportTimer timer(this, "peerShutdown"); 00678 rc = handlePeerShutdown(); 00679 } 00680 00681 if (hasAsync() && rc != -1) { 00682 std::vector<AsyncEntry> async 00683 = popAsync(); 00684 00685 for (unsigned i = 0; i < async.size(); ++i) { 00686 //cerr << " got async " << async[i].name << endl; 00687 TransportTimer timer(this, async[i].name.c_str()); 00688 rc = handleAsync(async[i].callback, 00689 async[i].name.c_str(), 00690 async[i].date); 00691 if (rc == -1) break; 00692 } 00693 } 00694 } 00695 00696 //cerr << "finished handling events for " << this << " with rc " << rc 00697 // << endl; 00698 00699 if (rc == -1) { 00700 00701 //cerr << " closing connection" << endl; 00702 std::shared_ptr<TransportBase> tr 00703 = shared_from_this(); 00704 00705 { 00706 InHandlerGuard guard(this, "close"); 00707 00708 addActivityS("close"); 00709 00710 if (hasSlave()) { 00711 cerr << " slave" << endl; 00712 slave().onCleanup(); 00713 slave_.reset(); 00714 } 00715 } 00716 00717 if (hasConnection_) { 00718 int res = epoll_ctl(epollFd_, EPOLL_CTL_DEL, getHandle(), 0); 00719 if (res == -1) 00720 throw ML::Exception("TransportBase::close(): epoll_ctl DEL %d: %s", 00721 getHandle(), strerror(errno)); 00722 } 00723 cancelTimer(); 00724 00725 //closePeer(); 00726 00727 endpoint_->notifyCloseTransport(tr); 00728 } 00729 else if (hasConnection_) { 00730 // Change the epoll event set 00731 00732 struct epoll_event data; 00733 data.data.u64 = 0; 00734 data.data.fd = getHandle(); 00735 data.events = pollFlagsToEpoll(flags_); 00736 00737 //cerr << "setting flags to " << epollFlags(flags_) << endl; 00738 00739 int res = epoll_ctl(epollFd_, EPOLL_CTL_MOD, getHandle(), &data); 00740 00741 // TODO: no setting of mask if not necessary 00742 if (res == -1) 00743 throw ML::Exception(errno, "TransportBase::close(): epoll_ctl MOD"); 00744 } 00745 00746 return rc; 00747 } 00748 00749 std::string 00750 TransportBase:: 00751 status() const 00752 { 00753 string result = format("Transport %p", this) 00754 + " of type " + ML::type_name(*this); 00755 if (hasSlave()) 00756 result += " with slave " + slave_->status(); 00757 else result += " with no slave"; 00758 return result; 00759 } 00760 00761 void 00762 TransportBase:: 00763 startReading() 00764 { 00765 //cerr << "starting reading " << this << endl; 00766 assertLockedByThisThread(); 00767 flags_ |= POLLIN; 00768 } 00769 00770 void 00771 TransportBase:: 00772 stopReading() 00773 { 00774 //cerr << "stopping reading " << this << endl; 00775 assertLockedByThisThread(); 00776 flags_ &= ~POLLIN; 00777 } 00778 00779 void 00780 TransportBase:: 00781 startWriting() 00782 { 00783 //cerr << "starting writing " << this << endl; 00784 assertLockedByThisThread(); 00785 flags_ |= POLLOUT; 00786 } 00787 00788 void 00789 TransportBase:: 00790 stopWriting() 00791 { 00792 //cerr << "stopping writing " << this << endl; 00793 assertLockedByThisThread(); 00794 flags_ &= ~POLLOUT; 00795 } 00796 00797 void 00798 TransportBase:: 00799 scheduleTimerAbsolute(Date timeout, size_t cookie, 00800 void (*freecookie) (size_t)) 00801 { 00802 timeout_.set(timeout, cookie, freecookie); 00803 long seconds = timeout.wholeSecondsSinceEpoch(); 00804 long nanoseconds = timeout.fractionalSeconds() * 1000000000.0; 00805 itimerspec spec = { { 0, 0 }, { seconds, nanoseconds } }; 00806 int res = timerfd_settime(timerFd_, TFD_TIMER_ABSTIME, &spec, 0); 00807 if (res == -1) 00808 throw ML::Exception(errno, "timerfd_settime absolute"); 00809 } 00810 00811 void 00812 TransportBase:: 00813 scheduleTimerRelative(double secondsFromNow, 00814 size_t cookie, 00815 void (*freecookie) (size_t)) 00816 { 00817 assertLockedByThisThread(); 00818 00819 if (secondsFromNow < 0) 00820 throw ML::Exception("attempting to schedule timer in the past: %f", 00821 secondsFromNow); 00822 00823 timeout_.set(Date::now().plusSeconds(secondsFromNow), 00824 cookie, freecookie); 00825 long seconds = secondsFromNow; 00826 long nanoseconds = 1000000000.0 * (secondsFromNow - seconds); 00827 itimerspec spec = { { 0, 0 }, { seconds, nanoseconds } }; 00828 int res = timerfd_settime(timerFd_, 0, &spec, 0); 00829 if (res == -1) 00830 throw ML::Exception(errno, "timerfd_settime relative"); 00831 } 00832 00833 void 00834 TransportBase:: 00835 cancelTimer() 00836 { 00837 timeout_.cancel(); 00838 00839 itimerspec spec = { { 0, 0 }, { 0, 0 } }; 00840 int res = timerfd_settime(timerFd_, 0, &spec, 0); 00841 if (res == -1) 00842 throw ML::Exception(errno, "timerfd_settime"); 00843 } 00844 00845 void 00846 TransportBase:: 00847 doAsync(const boost::function<void ()> & callback, const std::string & name) 00848 { 00849 addActivity("doAsync: %s", name.c_str()); 00850 pushAsync(callback, name); 00851 } 00852 00853 void 00854 TransportBase:: 00855 pushAsync(const boost::function<void ()> & fn, const std::string & name) 00856 { 00857 std::auto_ptr<AsyncNode> node(new AsyncNode(fn, name)); 00858 00859 AsyncNode * current = asyncHead_; 00860 00861 for (;;) { 00862 node->next = current; 00863 if (ML::cmp_xchg(asyncHead_, current, node.get())) break; 00864 } 00865 00866 node.release(); 00867 00868 if (!lockedByThisThread()) 00869 eventfd_write(eventFd_, 1); 00870 } 00871 00874 std::vector<TransportBase::AsyncEntry> 00875 TransportBase:: 00876 popAsync() 00877 { 00878 AsyncNode * current = asyncHead_; 00879 00880 for (;;) { 00881 if (ML::cmp_xchg(asyncHead_, current, (AsyncNode *)0)) break; 00882 } 00883 00884 std::vector<AsyncEntry> result; 00885 00886 for (; current; ) { 00887 result.push_back(*current); 00888 auto next = current->next; 00889 delete current; 00890 current = next; 00891 } 00892 00893 // TODO: should just iterate in reverse order... 00894 std::reverse(result.begin(), result.end()); 00895 00896 return result; 00897 } 00898 00899 TransportBase::Activities:: 00900 ~Activities() 00901 { 00902 Guard guard(lock); 00903 activities.clear(); 00904 } 00905 00906 void 00907 TransportBase::Activities:: 00908 dump() const 00909 { 00910 Guard guard(lock); 00911 if (activities.empty()) return; 00912 Date firstTime = activities.front().time, lastTime = firstTime; 00913 for (unsigned i = 0; i < activities.size(); ++i) { 00914 Date time = activities[i].time; 00915 cerr << format("%3d %s %7.3f %7.3f %s\n", 00916 i, 00917 time.print(4).c_str(), 00918 time.secondsSince(firstTime), 00919 time.secondsSince(lastTime), 00920 activities[i].what.c_str()); 00921 lastTime = time; 00922 } 00923 } 00924 00925 Json::Value 00926 TransportBase::Activities:: 00927 toJson(int first, int last) const 00928 { 00929 Guard guard(lock); 00930 if (last == -1) last = size(); 00931 00932 if (first < 0 || last < first || last > size()) 00933 throw Exception("Activities::toJson(): " 00934 "range %d-%d incompatible with 0-%d", 00935 first, last, (int)size()); 00936 00937 Json::Value result; 00938 00939 if (first == last) return result; 00940 00941 Date firstTime = activities[first].time, lastTime = firstTime; 00942 for (unsigned i = first; i < last; ++i) { 00943 Date time = activities[i].time; 00944 result[i - first][0u] = time.print(4); 00945 result[i - first][1u] = time.secondsSince(firstTime); 00946 result[i - first][2u] = time.secondsSince(lastTime); 00947 result[i - first][3u] = activities[i].what; 00948 lastTime = time; 00949 } 00950 00951 return result; 00952 } 00953 00954 void 00955 TransportBase::Activity:: 00956 fromJson(const Json::Value & val) 00957 { 00958 if (val.size() != 4) 00959 throw Exception("not an activity in JSON: %s", val.toString().c_str()); 00960 00961 time = Date(val[0u]); 00962 what = val[3].asString(); 00963 } 00964 00965 void 00966 TransportBase::Activities:: 00967 fromJson(const Json::Value & val) 00968 { 00969 vector<Activity> activities; 00970 00971 for (unsigned i = 0; i < val.size(); ++i) { 00972 activities.push_back(val[i]); 00973 } 00974 00975 activities.swap(activities); 00976 } 00977 00978 00979 /*****************************************************************************/ 00980 /* SOCKET TRANSPORT */ 00981 /*****************************************************************************/ 00982 00983 SocketTransport:: 00984 SocketTransport() 00985 { 00986 } 00987 00988 SocketTransport:: 00989 SocketTransport(EndpointBase * endpoint) 00990 : TransportBase(endpoint) 00991 { 00992 } 00993 00994 SocketTransport:: 00995 ~SocketTransport() 00996 { 00997 if (getHandle() != -1) { 00998 cerr << "warning: closing TransportBase " << this 00999 << " of type " << status() << "with open socket " 01000 << getHandle() << endl; 01001 backtrace(); 01002 } 01003 } 01004 01005 ssize_t 01006 SocketTransport:: 01007 send(const char * buf, size_t len, int flags) 01008 { 01009 return peer().send(buf, len, flags); 01010 } 01011 01012 ssize_t 01013 SocketTransport:: 01014 recv(char * buf, size_t buf_size, int flags) 01015 { 01016 return peer().recv(buf, buf_size, flags); 01017 } 01018 01019 int 01020 SocketTransport:: 01021 closePeer() 01022 { 01023 addActivityS("closePeer"); 01024 return peer().close(); 01025 } 01026 01027 } // namespace Datacratic 01028