![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* Wolfgang Sourdeau - April 2013 */ 00002 00003 #include <fcntl.h> 00004 #include <unistd.h> 00005 #include <netinet/tcp.h> 00006 #include <sys/epoll.h> 00007 #include <sys/types.h> 00008 #include <sys/socket.h> 00009 00010 #include "jml/arch/cmp_xchg.h" 00011 #include "soa/service/rest_service_endpoint.h" 00012 00013 #include "tcpsockets.h" 00014 00015 00016 using namespace std; 00017 using namespace Datacratic; 00018 00019 00020 const int bufferSizePow = 16; 00021 const int tcpBufferSizePow = 13; 00022 00023 /* CHARRINGBUFFER */ 00024 size_t 00025 CharRingBuffer:: 00026 availableForWriting() 00027 const 00028 { 00029 size_t available, 00030 myReadPosition(readPosition), myWritePosition(writePosition); 00031 00032 if (myReadPosition <= myWritePosition) { 00033 available = bufferSize; 00034 } 00035 else { 00036 available = 0; 00037 } 00038 available += myReadPosition - myWritePosition - 1; 00039 00040 if (available >= bufferSize) { 00041 cerr << "writing: bufferSize: " << bufferSize 00042 << "; readPosition: " << myReadPosition 00043 << "; writePosition: " << myWritePosition 00044 << "; available: " << available 00045 << endl; 00046 ExcAssert(available < bufferSize); 00047 } 00048 00049 return available; 00050 } 00051 00052 size_t 00053 CharRingBuffer:: 00054 availableForReading() 00055 const 00056 { 00057 size_t available, 00058 myReadPosition(readPosition), myWritePosition(writePosition); 00059 00060 if (myWritePosition < myReadPosition) { 00061 available = bufferSize; 00062 } 00063 else { 00064 available = 0; 00065 } 00066 available += myWritePosition - myReadPosition; 00067 00068 if (available >= bufferSize) { 00069 cerr << "reading: bufferSize: " << bufferSize 00070 << "; readPosition: " << myReadPosition 00071 << "; writePosition: " << myWritePosition 00072 << "; available: " << available 00073 << endl; 00074 } 00075 ExcAssert(available < bufferSize); 00076 00077 return available; 00078 } 00079 00080 void 00081 CharRingBuffer:: 00082 write(const char *newBytes, size_t len) 00083 { 00084 size_t maxLen = availableForWriting(); 00085 if (len > maxLen) 00086 throw ML::Exception("no room left"); 00087 00088 size_t myWritePosition(writePosition); 00089 size_t bytesLeftRight = bufferSize - myWritePosition; 00090 if (len > bytesLeftRight) { 00091 memcpy(buffer + myWritePosition, newBytes, bytesLeftRight); 00092 memcpy(buffer, newBytes + bytesLeftRight, len - bytesLeftRight); 00093 } 00094 else { 00095 memcpy(buffer + myWritePosition, newBytes, len); 00096 } 00097 00098 size_t newWritePosition = (myWritePosition + len) & bufferMask; 00099 if (!ML::cmp_xchg(writePosition, myWritePosition, newWritePosition)) { 00100 throw ML::Exception("write position changed unexpectedly"); 00101 } 00102 } 00103 00104 void 00105 CharRingBuffer:: 00106 read(char *bytes, size_t len, bool peek) 00107 { 00108 size_t maxLen = availableForReading(); 00109 if (len > maxLen) 00110 throw ML::Exception("nothing left to read"); 00111 00112 size_t myReadPosition(readPosition); 00113 size_t bytesLeftRight = bufferSize - myReadPosition; 00114 00115 if (len > bytesLeftRight) { 00116 memcpy(bytes, buffer + myReadPosition, bytesLeftRight); 00117 memcpy(bytes + bytesLeftRight, buffer, len - bytesLeftRight); 00118 } 00119 else { 00120 memcpy(bytes, buffer + myReadPosition, len); 00121 } 00122 00123 if (!peek) { 00124 size_t newReadPosition = (myReadPosition + len) & bufferMask; 00125 if (!ML::cmp_xchg(readPosition, myReadPosition, newReadPosition)) { 00126 throw ML::Exception("read position changed unexpectedly"); 00127 } 00128 } 00129 } 00130 00131 /* CHARMESSAGERINGBUFFER */ 00132 bool 00133 CharMessageRingBuffer:: 00134 writeMessage(const std::string & newMessage) 00135 { 00136 bool rc(true); 00137 00138 char msgSize = newMessage.size(); 00139 ssize_t totalSize = msgSize + 1; 00140 if (totalSize <= availableForWriting()) { 00141 write(&msgSize, 1); 00142 write(newMessage.c_str(), msgSize); 00143 } 00144 else { 00145 // cerr << "writeMessage: no buffer room\n"; 00146 rc = false; 00147 } 00148 00149 return rc; 00150 } 00151 00152 bool 00153 CharMessageRingBuffer:: 00154 readMessage(std::string & message) 00155 { 00156 bool rc(true); 00157 size_t available = availableForReading(); 00158 00159 if (available > 0) { 00160 char msgLenChar; 00161 read(&msgLenChar, 1, true); 00162 size_t msgLen(msgLenChar); 00163 available--; 00164 if (msgLen > available) { 00165 rc = false; 00166 } 00167 else { 00168 /* first byte will contain size */ 00169 char buffer[msgLen+1]; 00170 read(buffer, msgLen + 1); 00171 // cerr << "msgLen: " << msgLen << endl; 00172 message = string(buffer + 1, msgLen); 00173 } 00174 } 00175 else { 00176 rc = false; 00177 } 00178 00179 return rc; 00180 } 00181 00182 /* FULLPOLLER */ 00183 FullPoller:: 00184 FullPoller() 00185 : epollSocket_(-1), shutdown_(false) 00186 { 00187 } 00188 00189 FullPoller:: 00190 ~FullPoller() 00191 { 00192 shutdown(); 00193 disconnect(); 00194 } 00195 00196 void 00197 FullPoller:: 00198 init() 00199 { 00200 epollSocket_ = epoll_create(1); 00201 } 00202 00203 void 00204 FullPoller:: 00205 shutdown() 00206 { 00207 shutdown_ = true; 00208 if (epollSocket_ != -1) { 00209 ::close(epollSocket_); 00210 epollSocket_ = -1; 00211 } 00212 } 00213 00214 void 00215 FullPoller:: 00216 addFd(int fd, void *data) 00217 { 00218 struct epoll_event ev; 00219 00220 ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP; 00221 ev.data.fd = fd; 00222 int rc = epoll_ctl(epollSocket_, EPOLL_CTL_ADD, fd, &ev); 00223 if (rc == -1) { 00224 throw ML::Exception(errno, "addFd", "error"); 00225 } 00226 // cerr << "adding socket " << fd << " to epoll set " << this << "\n"; 00227 } 00228 00229 void 00230 FullPoller:: 00231 removeFd(int fd) 00232 { 00233 epoll_ctl(epollSocket_, EPOLL_CTL_DEL, fd, NULL); 00234 } 00235 00236 void 00237 FullPoller:: 00238 handleEvents() 00239 { 00240 epoll_event events[64]; 00241 memset(events, 0, sizeof(events)); 00242 00243 int res = epoll_wait(epollSocket_, events, 64, 0); 00244 if (res == -1) { 00245 if (errno == EBADF) { 00246 cerr << "got bad FD" << endl; 00247 return; 00248 } 00249 else if (errno != EINTR) 00250 throw ML::Exception(errno, "epoll_wait"); 00251 } 00252 else { 00253 for (unsigned i = 0; i < res; i++) { 00254 // cerr << "handling event on fd: " << events[i].data.fd << endl; 00255 handleEvent(events[i]); 00256 } 00257 } 00258 } 00259 00260 bool 00261 FullPoller:: 00262 poll() 00263 const 00264 { 00265 struct epoll_event ev; 00266 return !shutdown_ && (epoll_wait(epollSocket_, &ev, 1, 0) > 0); 00267 } 00268 00269 /* TCPNAMEDENDPOINT */ 00270 00271 TcpNamedEndpoint:: 00272 TcpNamedEndpoint() 00273 : NamedEndpoint(), FullPoller(), 00274 recvBuffer(bufferSizePow), sendBuffer(bufferSizePow) 00275 { 00276 needsPoll = true; 00277 } 00278 00279 TcpNamedEndpoint:: 00280 ~TcpNamedEndpoint() 00281 { 00282 shutdown(); 00283 } 00284 00285 void 00286 TcpNamedEndpoint:: 00287 init(shared_ptr<ConfigurationService> config, 00288 const string & endpointName) 00289 { 00290 NamedEndpoint::init(config, endpointName); 00291 FullPoller::init(); 00292 00293 socket_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); 00294 // cerr << "endpoint socket: " << socket_ << endl; 00295 uint32_t value = 1 << tcpBufferSizePow; 00296 setsockopt(socket_, SOL_TCP, SO_SNDBUF, &value, sizeof(value)); 00297 setsockopt(socket_, SOL_TCP, SO_RCVBUF, &value, sizeof(value)); 00298 00299 addFd(socket_); 00300 } 00301 00302 void 00303 TcpNamedEndpoint:: 00304 onDisconnect(int fd) 00305 { 00306 } 00307 00308 void 00309 TcpNamedEndpoint:: 00310 handleEvent(epoll_event & event) 00311 { 00312 if (event.data.fd == socket_) { 00313 if ((event.events & EPOLLIN) == EPOLLIN) { 00314 int newFd = accept4(socket_, NULL, NULL, SOCK_NONBLOCK); 00315 if (newFd == -1) { 00316 throw ML::Exception(errno, "accept", "failure in handleEvent"); 00317 } 00318 #if 0 00319 int flags = fcntl(newFd, F_GETFL); 00320 flags |= O_NONBLOCK; 00321 fcntl(newFd, F_SETFL, &flags); 00322 #endif 00323 // cerr << "epoll connected\n"; 00324 onConnect(newFd); 00325 } 00326 else { 00327 throw ML::Exception("unhandled"); 00328 } 00329 } 00330 else if ((event.events & EPOLLIN) == EPOLLIN) { 00331 // cerr << "epollin on socket" << event.data.fd << endl; 00332 /* fill the ring buffer */ 00333 ssize_t nBytes = recvBuffer.availableForWriting(); 00334 if (nBytes > 0) { 00335 // cerr << "reading " << nBytes << " from client socket\n"; 00336 while (nBytes > 0) { 00337 char buffer[nBytes]; 00338 int rc = ::read(event.data.fd, buffer, nBytes); 00339 // cerr << "read returned " << rc << "\n"; 00340 if (rc == -1) { 00341 if (errno == EAGAIN) 00342 break; 00343 cerr << "errno = " << errno << endl; 00344 throw ML::Exception(errno, "read", "handleEvent"); 00345 } 00346 if (rc == 0) 00347 break; 00348 recvBuffer.write(buffer, rc); 00349 nBytes -= rc; 00350 } 00351 flushMessages(); 00352 } 00353 } 00354 else if ((event.events & EPOLLHUP) == EPOLLHUP) { 00355 onDisconnect(event.data.fd); 00356 } 00357 } 00358 00359 void 00360 TcpNamedEndpoint:: 00361 flushMessages() 00362 { 00363 string message; 00364 while (recvBuffer.readMessage(message)) { 00365 onMessage_(message); 00366 } 00367 // cerr << "flushMessages done\n"; 00368 } 00369 00370 void 00371 TcpNamedEndpoint:: 00372 onConnect(int newFd) 00373 { 00374 addFd(newFd); 00375 } 00376 00377 void 00378 TcpNamedEndpoint:: 00379 bindTcp(int port) 00380 { 00381 struct sockaddr_in addr; 00382 memset(&addr, 0, sizeof(addr)); 00383 addr.sin_family = AF_INET; 00384 addr.sin_port = htons(port); 00385 addr.sin_addr.s_addr = inet_addr("127.0.0.1"); 00386 00387 if (::bind(socket_, (struct sockaddr *)&addr, sizeof(addr)) == -1) { 00388 throw ML::Exception(errno, "failure", "bind"); 00389 } 00390 if (::listen(socket_, 1024) == -1) { 00391 throw ML::Exception(errno, "failure", "listen"); 00392 } 00393 cerr << "listening\n"; 00394 } 00395 00396 void 00397 TcpNamedEndpoint:: 00398 shutdown() 00399 { 00400 FullPoller::shutdown(); 00401 if (socket_ != -1) { 00402 // shutdown(socket_, SHUT_RDRW); 00403 ::close(socket_); 00404 socket_ = -1; 00405 } 00406 } 00407 00408 /* TCPNAMEDPROXY */ 00409 TcpNamedProxy:: 00410 TcpNamedProxy() 00411 : FullPoller(), 00412 recvBuffer(bufferSizePow), sendBuffer(bufferSizePow) 00413 { 00414 needsPoll = true; 00415 } 00416 00417 TcpNamedProxy:: 00418 ~TcpNamedProxy() 00419 { 00420 shutdown(); 00421 } 00422 00423 void 00424 TcpNamedProxy:: 00425 init(shared_ptr<ConfigurationService> config) 00426 { 00427 FullPoller::init(); 00428 00429 socket_ = socket(AF_INET, SOCK_STREAM, 0); 00430 // uint32_t value(1); 00431 // setsockopt(socket_, SOL_TCP, TCP_NODELAY, &value, sizeof(value)); 00432 uint32_t value = 1 << tcpBufferSizePow; 00433 setsockopt(socket_, SOL_TCP, SO_SNDBUF, &value, sizeof(value)); 00434 setsockopt(socket_, SOL_TCP, SO_RCVBUF, &value, sizeof(value)); 00435 00436 addFd(socket_); 00437 } 00438 00439 void 00440 TcpNamedProxy:: 00441 connectTo(string host, int port) 00442 { 00443 struct sockaddr_in addr; 00444 00445 bzero((char *) &addr, sizeof(addr)); 00446 addr.sin_family = AF_INET; 00447 addr.sin_port = htons(port); 00448 addr.sin_addr.s_addr = inet_addr(host.c_str()); 00449 if (addr.sin_addr.s_addr == -1) { 00450 throw ML::Exception(errno, "lookup failed", "inet_addr"); 00451 } 00452 00453 int rc = connect(socket_, 00454 (const struct sockaddr *) &addr, sizeof(addr)); 00455 cerr << "connect rc: " << rc << endl; 00456 if (rc == 0) { 00457 state_ = CONNECTED; 00458 int flags = fcntl(socket_, F_GETFL); 00459 flags |= O_NONBLOCK; 00460 fcntl(socket_, F_SETFL, &flags); 00461 } 00462 else if (rc == -1) { 00463 throw ML::Exception(errno, "connection failed", "connectTo"); 00464 } 00465 else { 00466 throw ML::Exception(errno, "unexpected return code"); 00467 } 00468 } 00469 00470 void 00471 TcpNamedProxy:: 00472 shutdown() 00473 { 00474 FullPoller::shutdown(); 00475 if (socket_ != -1) { 00476 // shutdown(socket_, SHUT_RDRW); 00477 removeFd(socket_); 00478 ::close(socket_); 00479 socket_ = -1; 00480 } 00481 } 00482 00483 bool 00484 TcpNamedProxy:: 00485 isConnected() 00486 const 00487 { 00488 return state_ == CONNECTED; 00489 } 00490 00491 bool 00492 TcpNamedProxy:: 00493 sendMessage(const string & message) 00494 { 00495 return sendBuffer.writeMessage(message); 00496 } 00497 00498 void 00499 TcpNamedProxy:: 00500 handleEvent(epoll_event & event) 00501 { 00502 // cerr << "handleEvent: " << event.events << endl; 00503 if ((event.events & EPOLLIN) == EPOLLIN) { 00504 // cerr << "proxy pollin\n"; 00505 } 00506 else if ((event.events & EPOLLOUT) == EPOLLOUT) { 00507 int nBytes = sendBuffer.availableForReading(); 00508 if (nBytes > 0) { 00509 // cerr << "available for reading: " << nBytes << endl; 00510 char buffer[nBytes]; 00511 sendBuffer.read(buffer, nBytes); 00512 int rc = ::write(socket_, buffer, nBytes); 00513 if (rc == -1) { 00514 throw ML::Exception(errno, "handleEvent", "write"); 00515 } 00516 // ExcAssertEqual(rc, nBytes); 00517 // nBytes = sendBuffer.availableForReading(); 00518 // cerr << "available for reading after send: " << nBytes << endl; 00519 } 00520 } 00521 else if ((event.events & EPOLLHUP) == EPOLLHUP) { 00522 cerr << "proxy pollhup\n"; 00523 } 00524 } 00525 00526 void 00527 TcpNamedProxy:: 00528 onMessage(string && newMessage) 00529 { 00530 cerr << "received message: " << newMessage << endl; 00531 } 00532 00533 void 00534 TcpNamedProxy:: 00535 onDisconnect(int fd) 00536 { 00537 cerr << "socket disconnected\n"; 00538 }