RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* passive_endpoint.cc 00002 Jeremy Barnes, 29 April 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Passive endpoint implementation. 00006 */ 00007 00008 #include <unordered_map> 00009 00010 #include "soa/service//passive_endpoint.h" 00011 #include <poll.h> 00012 #include <boost/date_time/gregorian/gregorian.hpp> 00013 00014 using namespace std; 00015 using namespace ML; 00016 using namespace boost::posix_time; 00017 00018 namespace Datacratic { 00019 00020 00021 /*****************************************************************************/ 00022 /* PASSIVE ENDPOINT */ 00023 /*****************************************************************************/ 00024 00025 PassiveEndpoint:: 00026 PassiveEndpoint(const std::string & name) 00027 : EndpointBase(name) 00028 { 00029 } 00030 00031 PassiveEndpoint:: 00032 ~PassiveEndpoint() 00033 { 00034 closePeer(); 00035 shutdown(); 00036 } 00037 00038 int 00039 PassiveEndpoint:: 00040 init(PortRange const & portRange, const std::string & hostname, int num_threads, bool synchronous, 00041 bool nameLookup, int backlog) 00042 { 00043 //static const char *fName = "PassiveEndpoint::init:"; 00044 //cerr << fName << this << ":was called for " << hostname << endl; 00045 spinup(num_threads, synchronous); 00046 00047 int port = listen(portRange, hostname, nameLookup, backlog); 00048 cerr << "listening on hostname " << hostname << " port " << port << endl; 00049 return port; 00050 } 00051 00052 00053 /*****************************************************************************/ 00054 /* ACCEPTOR FOR SOCKETTRANSPORT */ 00055 /*****************************************************************************/ 00056 00057 AcceptorT<SocketTransport>:: 00058 AcceptorT() 00059 : fd(-1), endpoint(0) 00060 { 00061 } 00062 00063 AcceptorT<SocketTransport>:: 00064 ~AcceptorT() 00065 { 00066 closePeer(); 00067 } 00068 00069 int 00070 AcceptorT<SocketTransport>:: 00071 listen(PortRange const & portRange, 00072 const std::string & hostname, 00073 PassiveEndpoint * endpoint, 00074 bool nameLookup, 00075 int backlog) 00076 { 00077 closePeer(); 00078 00079 this->endpoint = endpoint; 00080 this->nameLookup = nameLookup; 00081 00082 fd = socket(AF_INET, SOCK_STREAM, 0); 00083 00084 // Avoid already bound messages for the minute after a server has exited 00085 int tr = 1; 00086 int res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tr, sizeof(int)); 00087 00088 if (res == -1) { 00089 close(fd); 00090 fd = -1; 00091 throw Exception("error setsockopt SO_REUSEADDR: %s", strerror(errno)); 00092 } 00093 00094 const char * hostNameToUse 00095 = (hostname == "*" ? "0.0.0.0" : hostname.c_str()); 00096 00097 int port = portRange.bindPort 00098 ([&](int port) 00099 { 00100 addr = ACE_INET_Addr(port, hostNameToUse, AF_INET); 00101 00102 //cerr << "port = " << port 00103 // << " hostname = " << hostname 00104 // << " addr = " << addr.get_host_name() << " " 00105 // << addr.get_host_addr() << " " 00106 // << addr.get_ip_address() << endl; 00107 00108 int res = ::bind(fd, 00109 reinterpret_cast<sockaddr *>(addr.get_addr()), 00110 addr.get_addr_size()); 00111 if (res == -1 && errno != EADDRINUSE) 00112 throw Exception("listen: bind returned %s", strerror(errno)); 00113 return res == 0; 00114 }); 00115 00116 if (port == -1) { 00117 throw Exception("couldn't bind to any port"); 00118 } 00119 00120 // Avoid already bound messages for the minute after a server has exited 00121 res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tr, sizeof(int)); 00122 00123 if (res == -1) { 00124 close(fd); 00125 fd = -1; 00126 throw Exception("error setsockopt SO_REUSEADDR: %s", strerror(errno)); 00127 } 00128 00129 res = ::listen(fd, backlog); 00130 00131 if (res == -1) { 00132 close(fd); 00133 fd = -1; 00134 throw Exception("error on listen: %s", strerror(errno)); 00135 } 00136 00137 shutdown = false; 00138 00139 acceptThread.reset(new boost::thread([=] () { this->runAcceptThread(); })); 00140 return port; 00141 } 00142 00143 void 00144 AcceptorT<SocketTransport>:: 00145 closePeer() 00146 { 00147 if (!acceptThread) return; 00148 shutdown = true; 00149 00150 ML::memory_barrier(); 00151 00152 wakeup.signal(); 00153 00154 close(fd); 00155 fd = -1; 00156 00157 acceptThread->join(); 00158 acceptThread.reset(); 00159 } 00160 00161 std::string 00162 AcceptorT<SocketTransport>:: 00163 hostname() const 00164 { 00165 char buf[1024]; 00166 int res = addr.get_host_name(buf, 1024); 00167 if (res == -1) 00168 throw ML::Exception("invalid hostname"); 00169 return buf; 00170 } 00171 00172 int 00173 AcceptorT<SocketTransport>:: 00174 port() const 00175 { 00176 return addr.get_port_number(); 00177 } 00178 00179 struct NameEntry { 00180 NameEntry(const string & name) 00181 : name_(name), date_(Date::now()) 00182 {} 00183 00184 string name_; 00185 Date date_; 00186 }; 00187 00188 void 00189 AcceptorT<SocketTransport>:: 00190 runAcceptThread() 00191 { 00192 //static const char *fName = "AcceptorT<SocketTransport>::runAcceptThread:"; 00193 unordered_map<string,NameEntry> addr2Name; 00194 00195 int res = fcntl(fd, F_SETFL, O_NONBLOCK); 00196 if (res != 0) { 00197 if (shutdown) 00198 return; // deal with race between start up and shut down 00199 throw ML::Exception(errno, "fcntl"); 00200 } 00201 00202 while (!shutdown) { 00203 00204 sockaddr_in addr; 00205 socklen_t addr_len = sizeof(addr); 00206 //cerr << "accept on fd " << fd << endl; 00207 00208 pollfd fds[2] = { 00209 { fd, POLLIN, 0 }, 00210 { wakeup.fd(), POLLIN, 0 } 00211 }; 00212 00213 int res = ::poll(fds, 2, -1); 00214 00215 //cerr << "accept poll returned " << res << endl; 00216 00217 if (shutdown) 00218 return; 00219 00220 if (res == -1 && errno == EINTR) 00221 continue; 00222 if (res == 0) 00223 throw ML::Exception("should not be no fds ready to accept"); 00224 00225 if (fds[1].revents) { 00226 wakeup.read(); 00227 } 00228 00229 if (!fds[0].revents) 00230 continue; 00231 00232 res = accept(fd, (sockaddr *)&addr, &addr_len); 00233 00234 //cerr << "accept returned " << res << endl; 00235 00236 if (res == -1 && errno == EWOULDBLOCK) 00237 continue; 00238 00239 if (res == -1 && errno == EINTR) continue; 00240 00241 if (res == -1) 00242 endpoint->acceptError(format("accept: %s", strerror(errno))); 00243 00244 #if 0 00245 union { 00246 char octets[4]; 00247 uint32_t addr; 00248 } a; 00249 a.addr = addr.sin_addr; 00250 #endif 00251 00252 ACE_INET_Addr addr2(&addr, addr_len); 00253 00254 #if 0 00255 ptime now = second_clock::universal_time(); 00256 00257 cerr << boost::this_thread::get_id() << ":"<<to_iso_extended_string(now) << ":accept succeeded from " 00258 << addr2.get_host_addr() << ":" << addr2.get_port_number() 00259 << " (" << addr2.get_host_name() << ")" 00260 << " for endpoint " << endpoint->name() << " res = " << res 00261 << " pointer " << endpoint << endl; 00262 #endif 00263 std::shared_ptr<SocketTransport> newTransport 00264 (new SocketTransport(this->endpoint)); 00265 00266 newTransport->peer_ = ACE_SOCK_Stream(res); 00267 string peerName = addr2.get_host_addr(); 00268 if (nameLookup) { 00269 auto it = addr2Name.find(peerName); 00270 if (it == addr2Name.end()) { 00271 string addr = peerName; 00272 peerName = addr2.get_host_name(); 00273 addr2Name.insert({addr, NameEntry(peerName)}); 00274 } 00275 else { 00276 peerName = it->second.name_; 00277 } 00278 } 00279 00280 if (peerName == "<unknown>") 00281 peerName = addr2.get_host_addr(); 00282 newTransport->peerName_ = peerName; 00283 endpoint->associateHandler(newTransport); 00284 00285 /* cleanup name entries older than 5 seconds */ 00286 Date now = Date::now(); 00287 auto it = addr2Name.begin(); 00288 while (it != addr2Name.end()) { 00289 const NameEntry & entry = it->second; 00290 if (entry.date_.plusSeconds(5) < now) { 00291 it = addr2Name.erase(it); 00292 } 00293 else { 00294 it++; 00295 } 00296 } 00297 } 00298 } 00299 00300 } // namespace Datacratic