RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/passive_endpoint.cc
00001 /* passive_endpoint.cc
00002    Jeremy Barnes, 29 April 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Passive endpoint implementation.
00006 */
00007 
00008 #include <unordered_map>
00009 
00010 #include "soa/service//passive_endpoint.h"
00011 #include <poll.h>
00012 #include <boost/date_time/gregorian/gregorian.hpp>
00013 
00014 using namespace std;
00015 using namespace ML;
00016 using namespace boost::posix_time;
00017 
00018 namespace Datacratic {
00019 
00020 
00021 /*****************************************************************************/
00022 /* PASSIVE ENDPOINT                                                          */
00023 /*****************************************************************************/
00024 
00025 PassiveEndpoint::
00026 PassiveEndpoint(const std::string & name)
00027     : EndpointBase(name)
00028 {
00029 }
00030 
00031 PassiveEndpoint::
00032 ~PassiveEndpoint()
00033 {
00034     closePeer();
00035     shutdown();
00036 }
00037 
00038 int
00039 PassiveEndpoint::
00040 init(PortRange const & portRange, const std::string & hostname, int num_threads, bool synchronous,
00041      bool nameLookup, int backlog)
00042 {
00043     //static const char *fName = "PassiveEndpoint::init:";
00044     //cerr << fName << this << ":was called for " << hostname << endl;
00045     spinup(num_threads, synchronous);
00046 
00047     int port = listen(portRange, hostname, nameLookup, backlog);
00048     cerr << "listening on hostname " << hostname << " port " << port << endl;
00049     return port;
00050 }
00051 
00052 
00053 /*****************************************************************************/
00054 /* ACCEPTOR FOR SOCKETTRANSPORT                                              */
00055 /*****************************************************************************/
00056 
00057 AcceptorT<SocketTransport>::
00058 AcceptorT()
00059     : fd(-1), endpoint(0)
00060 {
00061 }
00062 
00063 AcceptorT<SocketTransport>::
00064 ~AcceptorT()
00065 {
00066     closePeer();
00067 }
00068 
00069 int
00070 AcceptorT<SocketTransport>::
00071 listen(PortRange const & portRange,
00072        const std::string & hostname,
00073        PassiveEndpoint * endpoint,
00074        bool nameLookup,
00075        int backlog)
00076 {
00077     closePeer();
00078     
00079     this->endpoint = endpoint;
00080     this->nameLookup = nameLookup;
00081 
00082     fd = socket(AF_INET, SOCK_STREAM, 0);
00083 
00084     // Avoid already bound messages for the minute after a server has exited
00085     int tr = 1;
00086     int res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tr, sizeof(int));
00087 
00088     if (res == -1) {
00089         close(fd);
00090         fd = -1;
00091         throw Exception("error setsockopt SO_REUSEADDR: %s", strerror(errno));
00092     }
00093 
00094     const char * hostNameToUse
00095         = (hostname == "*" ? "0.0.0.0" : hostname.c_str());
00096 
00097     int port = portRange.bindPort
00098         ([&](int port)
00099          {
00100              addr = ACE_INET_Addr(port, hostNameToUse, AF_INET);
00101 
00102              //cerr << "port = " << port
00103              //     << " hostname = " << hostname
00104              //     << " addr = " << addr.get_host_name() << " "
00105              //     << addr.get_host_addr() << " "
00106              //     << addr.get_ip_address() << endl;
00107 
00108              int res = ::bind(fd,
00109                               reinterpret_cast<sockaddr *>(addr.get_addr()),
00110                               addr.get_addr_size());
00111              if (res == -1 && errno != EADDRINUSE)
00112                  throw Exception("listen: bind returned %s", strerror(errno));
00113              return res == 0;
00114          });
00115     
00116     if (port == -1) {
00117         throw Exception("couldn't bind to any port");
00118     }
00119 
00120     // Avoid already bound messages for the minute after a server has exited
00121     res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tr, sizeof(int));
00122 
00123     if (res == -1) {
00124         close(fd);
00125         fd = -1;
00126         throw Exception("error setsockopt SO_REUSEADDR: %s", strerror(errno));
00127     }
00128 
00129     res = ::listen(fd, backlog);
00130 
00131     if (res == -1) {
00132         close(fd);
00133         fd = -1;
00134         throw Exception("error on listen: %s", strerror(errno));
00135     }
00136 
00137     shutdown = false;
00138 
00139     acceptThread.reset(new boost::thread([=] () { this->runAcceptThread(); }));
00140     return port;
00141 }
00142 
00143 void
00144 AcceptorT<SocketTransport>::
00145 closePeer()
00146 {
00147     if (!acceptThread) return;
00148     shutdown = true;
00149 
00150     ML::memory_barrier();
00151 
00152     wakeup.signal();
00153 
00154     close(fd);
00155     fd = -1;
00156 
00157     acceptThread->join();
00158     acceptThread.reset();
00159 }
00160 
00161 std::string
00162 AcceptorT<SocketTransport>::
00163 hostname() const
00164 {
00165     char buf[1024];
00166     int res = addr.get_host_name(buf, 1024);
00167     if (res == -1)
00168         throw ML::Exception("invalid hostname");
00169     return buf;
00170 }
00171 
00172 int
00173 AcceptorT<SocketTransport>::
00174 port() const
00175 {
00176     return addr.get_port_number();
00177 }
00178 
00179 struct NameEntry {
00180     NameEntry(const string & name)
00181         : name_(name), date_(Date::now())
00182         {}
00183 
00184     string name_;
00185     Date date_;
00186 };
00187 
00188 void
00189 AcceptorT<SocketTransport>::
00190 runAcceptThread()
00191 {
00192     //static const char *fName = "AcceptorT<SocketTransport>::runAcceptThread:";
00193     unordered_map<string,NameEntry> addr2Name;
00194 
00195     int res = fcntl(fd, F_SETFL, O_NONBLOCK);
00196     if (res != 0) {
00197         if (shutdown)
00198             return;  // deal with race between start up and shut down
00199         throw ML::Exception(errno, "fcntl");
00200     }
00201 
00202     while (!shutdown) {
00203 
00204         sockaddr_in addr;
00205         socklen_t addr_len = sizeof(addr);
00206         //cerr << "accept on fd " << fd << endl;
00207 
00208         pollfd fds[2] = {
00209             { fd, POLLIN, 0 },
00210             { wakeup.fd(), POLLIN, 0 }
00211         };
00212 
00213         int res = ::poll(fds, 2, -1);
00214 
00215         //cerr << "accept poll returned " << res << endl;
00216 
00217         if (shutdown)
00218             return;
00219         
00220         if (res == -1 && errno == EINTR)
00221             continue;
00222         if (res == 0)
00223             throw ML::Exception("should not be no fds ready to accept");
00224 
00225         if (fds[1].revents) {
00226             wakeup.read();
00227         }
00228 
00229         if (!fds[0].revents)
00230             continue;
00231 
00232         res = accept(fd, (sockaddr *)&addr, &addr_len);
00233 
00234         //cerr << "accept returned " << res << endl;
00235 
00236         if (res == -1 && errno == EWOULDBLOCK)
00237             continue;
00238 
00239         if (res == -1 && errno == EINTR) continue;
00240 
00241         if (res == -1)
00242             endpoint->acceptError(format("accept: %s", strerror(errno)));
00243 
00244 #if 0
00245         union {
00246             char octets[4];
00247             uint32_t addr;
00248         } a;
00249         a.addr = addr.sin_addr;
00250 #endif
00251 
00252         ACE_INET_Addr addr2(&addr, addr_len);
00253 
00254 #if 0
00255         ptime now = second_clock::universal_time();
00256 
00257         cerr << boost::this_thread::get_id() << ":"<<to_iso_extended_string(now) << ":accept succeeded from "
00258              << addr2.get_host_addr() << ":" << addr2.get_port_number()
00259              << " (" << addr2.get_host_name() << ")"
00260              << " for endpoint " << endpoint->name() << " res = " << res
00261              << " pointer " << endpoint << endl;
00262 #endif
00263         std::shared_ptr<SocketTransport> newTransport
00264             (new SocketTransport(this->endpoint));
00265 
00266         newTransport->peer_ = ACE_SOCK_Stream(res);
00267         string peerName = addr2.get_host_addr();
00268         if (nameLookup) {
00269             auto it = addr2Name.find(peerName);
00270             if (it == addr2Name.end()) {
00271                 string addr = peerName;
00272                 peerName = addr2.get_host_name();
00273                 addr2Name.insert({addr, NameEntry(peerName)});
00274             }
00275             else {
00276                 peerName = it->second.name_;
00277             }
00278         }
00279 
00280         if (peerName == "<unknown>")
00281             peerName = addr2.get_host_addr();
00282         newTransport->peerName_ = peerName;
00283         endpoint->associateHandler(newTransport);
00284 
00285         /* cleanup name entries older than 5 seconds */
00286         Date now = Date::now();
00287         auto it = addr2Name.begin();
00288         while (it != addr2Name.end()) {
00289             const NameEntry & entry = it->second;
00290             if (entry.date_.plusSeconds(5) < now) {
00291                 it = addr2Name.erase(it);
00292             }
00293             else {
00294                 it++;
00295             }
00296         }
00297     }
00298 }
00299 
00300 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator