RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/tcpsockets.cc
00001 /* Wolfgang Sourdeau - April 2013 */
00002 
00003 #include <fcntl.h>
00004 #include <unistd.h>
00005 #include <netinet/tcp.h>
00006 #include <sys/epoll.h>
00007 #include <sys/types.h>
00008 #include <sys/socket.h>
00009 
00010 #include "jml/arch/cmp_xchg.h"
00011 #include "soa/service/rest_service_endpoint.h"
00012 
00013 #include "tcpsockets.h"
00014 
00015 
00016 using namespace std;
00017 using namespace Datacratic;
00018 
00019 
00020 const int bufferSizePow = 16;
00021 const int tcpBufferSizePow = 13;
00022 
00023 /* CHARRINGBUFFER */
00024 size_t
00025 CharRingBuffer::
00026 availableForWriting()
00027     const
00028 {
00029     size_t available,
00030         myReadPosition(readPosition), myWritePosition(writePosition);
00031 
00032     if (myReadPosition <= myWritePosition) {
00033         available = bufferSize;
00034     }
00035     else {
00036         available = 0;
00037     }
00038     available += myReadPosition - myWritePosition - 1;
00039     
00040     if (available >= bufferSize) {
00041         cerr << "writing: bufferSize: " << bufferSize
00042              << "; readPosition: " << myReadPosition
00043              << "; writePosition: " << myWritePosition
00044              << "; available: " << available
00045              << endl;
00046         ExcAssert(available < bufferSize);
00047     }
00048 
00049     return available;
00050 }
00051 
00052 size_t
00053 CharRingBuffer::
00054 availableForReading()
00055     const
00056 {
00057     size_t available,
00058         myReadPosition(readPosition), myWritePosition(writePosition);
00059 
00060     if (myWritePosition < myReadPosition) {
00061         available = bufferSize;
00062     }
00063     else {
00064         available = 0;
00065     }
00066     available += myWritePosition - myReadPosition;
00067 
00068     if (available >= bufferSize) {
00069         cerr << "reading: bufferSize: " << bufferSize
00070              << "; readPosition: " << myReadPosition
00071              << "; writePosition: " << myWritePosition
00072              << "; available: " << available
00073              << endl;
00074     }
00075     ExcAssert(available < bufferSize);
00076     
00077     return available;
00078 }
00079 
00080 void
00081 CharRingBuffer::
00082 write(const char *newBytes, size_t len)
00083 {
00084     size_t maxLen = availableForWriting();
00085     if (len > maxLen)
00086         throw ML::Exception("no room left");
00087 
00088     size_t myWritePosition(writePosition);
00089     size_t bytesLeftRight = bufferSize - myWritePosition;
00090     if (len > bytesLeftRight) {
00091         memcpy(buffer + myWritePosition, newBytes, bytesLeftRight);
00092         memcpy(buffer, newBytes + bytesLeftRight, len - bytesLeftRight);
00093     }
00094     else {
00095         memcpy(buffer + myWritePosition, newBytes, len);
00096     }
00097 
00098     size_t newWritePosition = (myWritePosition + len) & bufferMask;
00099     if (!ML::cmp_xchg(writePosition, myWritePosition, newWritePosition)) {
00100         throw ML::Exception("write position changed unexpectedly");
00101     }
00102 }
00103 
00104 void
00105 CharRingBuffer::
00106 read(char *bytes, size_t len, bool peek)
00107 {
00108     size_t maxLen = availableForReading();
00109     if (len > maxLen)
00110         throw ML::Exception("nothing left to read");
00111 
00112     size_t myReadPosition(readPosition);
00113     size_t bytesLeftRight = bufferSize - myReadPosition;
00114 
00115     if (len > bytesLeftRight) {
00116         memcpy(bytes, buffer + myReadPosition, bytesLeftRight);
00117         memcpy(bytes + bytesLeftRight, buffer, len - bytesLeftRight);
00118     }
00119     else {
00120         memcpy(bytes, buffer + myReadPosition, len);
00121     }
00122 
00123     if (!peek) {
00124         size_t newReadPosition = (myReadPosition + len) & bufferMask;
00125         if (!ML::cmp_xchg(readPosition, myReadPosition, newReadPosition)) {
00126             throw ML::Exception("read position changed unexpectedly");
00127         }
00128     }
00129 }
00130 
00131 /* CHARMESSAGERINGBUFFER */
00132 bool
00133 CharMessageRingBuffer::
00134 writeMessage(const std::string & newMessage)
00135 {
00136     bool rc(true);
00137 
00138     char msgSize = newMessage.size();
00139     ssize_t totalSize = msgSize + 1;
00140     if (totalSize <= availableForWriting()) {
00141         write(&msgSize, 1);
00142         write(newMessage.c_str(), msgSize);
00143     }
00144     else {
00145         // cerr << "writeMessage: no buffer room\n";
00146         rc = false;
00147     }
00148 
00149     return rc;
00150 }
00151 
00152 bool
00153 CharMessageRingBuffer::
00154 readMessage(std::string & message)
00155 {
00156     bool rc(true);
00157     size_t available = availableForReading();
00158 
00159     if (available > 0) {
00160         char msgLenChar;
00161         read(&msgLenChar, 1, true);
00162         size_t msgLen(msgLenChar);
00163         available--;
00164         if (msgLen > available) {
00165             rc = false;
00166         }
00167         else {
00168             /* first byte will contain size */
00169             char buffer[msgLen+1];
00170             read(buffer, msgLen + 1);
00171             // cerr << "msgLen: " << msgLen << endl;
00172             message = string(buffer + 1, msgLen);
00173         }
00174     }
00175     else {
00176         rc = false;
00177     }
00178 
00179     return rc;
00180 }
00181 
00182 /* FULLPOLLER */
00183 FullPoller::
00184 FullPoller()
00185     : epollSocket_(-1), shutdown_(false)
00186 {
00187 }
00188 
00189 FullPoller::
00190 ~FullPoller()
00191 {
00192     shutdown();
00193     disconnect();
00194 }
00195 
00196 void
00197 FullPoller::
00198 init()
00199 {
00200     epollSocket_ = epoll_create(1);
00201 }
00202 
00203 void
00204 FullPoller::
00205 shutdown()
00206 {
00207     shutdown_ = true;
00208     if (epollSocket_ != -1) {
00209         ::close(epollSocket_);
00210         epollSocket_ = -1;
00211     }
00212 }
00213 
00214 void
00215 FullPoller::
00216 addFd(int fd, void *data)
00217 {
00218     struct epoll_event ev;
00219     
00220     ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP;
00221     ev.data.fd = fd;
00222     int rc = epoll_ctl(epollSocket_, EPOLL_CTL_ADD, fd, &ev);
00223     if (rc == -1) {
00224         throw ML::Exception(errno, "addFd", "error");
00225     }
00226     // cerr << "adding socket " << fd << " to epoll set " << this << "\n";
00227 }
00228 
00229 void
00230 FullPoller::
00231 removeFd(int fd)
00232 {
00233     epoll_ctl(epollSocket_, EPOLL_CTL_DEL, fd, NULL);
00234 }
00235 
00236 void
00237 FullPoller::
00238 handleEvents()
00239 {
00240     epoll_event events[64];
00241     memset(events, 0, sizeof(events));
00242 
00243     int res = epoll_wait(epollSocket_, events, 64, 0);
00244     if (res == -1) {
00245         if (errno == EBADF) {
00246             cerr << "got bad FD" << endl;
00247             return;
00248         }
00249         else if (errno != EINTR)
00250             throw ML::Exception(errno, "epoll_wait");
00251     }
00252     else {
00253         for (unsigned i = 0; i < res; i++) {
00254             // cerr << "handling event on fd: " << events[i].data.fd << endl;
00255             handleEvent(events[i]);
00256         }
00257     }
00258 }
00259 
00260 bool
00261 FullPoller::
00262 poll()
00263     const
00264 {
00265     struct epoll_event ev;
00266     return !shutdown_ && (epoll_wait(epollSocket_, &ev, 1, 0) > 0);
00267 }
00268 
00269 /* TCPNAMEDENDPOINT */
00270 
00271 TcpNamedEndpoint::
00272 TcpNamedEndpoint()
00273     : NamedEndpoint(), FullPoller(),
00274       recvBuffer(bufferSizePow), sendBuffer(bufferSizePow)
00275 {
00276     needsPoll = true;
00277 }
00278 
00279 TcpNamedEndpoint::
00280 ~TcpNamedEndpoint()
00281 {
00282     shutdown();
00283 }
00284 
00285 void
00286 TcpNamedEndpoint::
00287 init(shared_ptr<ConfigurationService> config,
00288      const string & endpointName)
00289 {
00290     NamedEndpoint::init(config, endpointName);
00291     FullPoller::init();
00292 
00293     socket_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
00294     // cerr << "endpoint socket: " << socket_ << endl;
00295     uint32_t value = 1 << tcpBufferSizePow;
00296     setsockopt(socket_, SOL_TCP, SO_SNDBUF, &value, sizeof(value));
00297     setsockopt(socket_, SOL_TCP, SO_RCVBUF, &value, sizeof(value));
00298 
00299     addFd(socket_);
00300 }
00301 
00302 void
00303 TcpNamedEndpoint::
00304 onDisconnect(int fd)
00305 {
00306 }
00307 
00308 void
00309 TcpNamedEndpoint::
00310 handleEvent(epoll_event & event)
00311 {
00312     if (event.data.fd == socket_) {
00313         if ((event.events & EPOLLIN) == EPOLLIN) {
00314             int newFd = accept4(socket_, NULL, NULL, SOCK_NONBLOCK);
00315             if (newFd == -1) {
00316                 throw ML::Exception(errno, "accept", "failure in handleEvent");
00317             }
00318 #if 0
00319             int flags = fcntl(newFd, F_GETFL);
00320             flags |= O_NONBLOCK;
00321             fcntl(newFd, F_SETFL, &flags);
00322 #endif
00323             // cerr << "epoll connected\n";
00324             onConnect(newFd);
00325         }
00326         else {
00327             throw ML::Exception("unhandled");
00328         }
00329     }
00330     else if ((event.events & EPOLLIN) == EPOLLIN) {
00331         // cerr << "epollin on socket" << event.data.fd << endl;
00332         /* fill the ring buffer */
00333         ssize_t nBytes = recvBuffer.availableForWriting();
00334         if (nBytes > 0) {
00335             // cerr << "reading " << nBytes << " from client socket\n";
00336             while (nBytes > 0) {
00337                 char buffer[nBytes];
00338                 int rc = ::read(event.data.fd, buffer, nBytes);
00339                 // cerr << "read returned  " << rc << "\n";
00340                 if (rc == -1) {
00341                     if (errno == EAGAIN)
00342                         break;
00343                     cerr << "errno = " << errno << endl;
00344                     throw ML::Exception(errno, "read", "handleEvent");
00345                 }
00346                 if (rc == 0)
00347                     break;
00348                 recvBuffer.write(buffer, rc);
00349                 nBytes -= rc;
00350             }
00351             flushMessages();
00352         }
00353     }
00354     else if ((event.events & EPOLLHUP) == EPOLLHUP) {
00355         onDisconnect(event.data.fd);
00356     }
00357 }
00358 
00359 void
00360 TcpNamedEndpoint::
00361 flushMessages()
00362 {
00363     string message;
00364     while (recvBuffer.readMessage(message)) {
00365         onMessage_(message);
00366     }
00367     // cerr << "flushMessages done\n";
00368 }
00369 
00370 void
00371 TcpNamedEndpoint::
00372 onConnect(int newFd)
00373 {
00374     addFd(newFd);
00375 }
00376 
00377 void
00378 TcpNamedEndpoint::
00379 bindTcp(int port)
00380 {
00381     struct sockaddr_in addr;
00382     memset(&addr, 0, sizeof(addr));
00383     addr.sin_family = AF_INET;
00384     addr.sin_port = htons(port);
00385     addr.sin_addr.s_addr = inet_addr("127.0.0.1");
00386 
00387     if (::bind(socket_, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
00388         throw ML::Exception(errno, "failure", "bind");
00389     }
00390     if (::listen(socket_, 1024) == -1) {
00391         throw ML::Exception(errno, "failure", "listen");
00392     }
00393     cerr << "listening\n";
00394 }
00395 
00396 void
00397 TcpNamedEndpoint::
00398 shutdown()
00399 {
00400     FullPoller::shutdown();
00401     if (socket_ != -1) {
00402         // shutdown(socket_, SHUT_RDRW);
00403         ::close(socket_);
00404         socket_ = -1;
00405     }
00406 }
00407 
00408 /* TCPNAMEDPROXY */
00409 TcpNamedProxy::
00410 TcpNamedProxy()
00411     : FullPoller(),
00412       recvBuffer(bufferSizePow), sendBuffer(bufferSizePow)
00413 {
00414     needsPoll = true;
00415 }
00416 
00417 TcpNamedProxy::
00418 ~TcpNamedProxy()
00419 {
00420     shutdown();
00421 }
00422 
00423 void
00424 TcpNamedProxy::
00425 init(shared_ptr<ConfigurationService> config)
00426 {
00427     FullPoller::init();
00428 
00429     socket_ = socket(AF_INET, SOCK_STREAM, 0);
00430     // uint32_t value(1);
00431     // setsockopt(socket_, SOL_TCP, TCP_NODELAY, &value, sizeof(value));
00432     uint32_t value = 1 << tcpBufferSizePow;
00433     setsockopt(socket_, SOL_TCP, SO_SNDBUF, &value, sizeof(value));
00434     setsockopt(socket_, SOL_TCP, SO_RCVBUF, &value, sizeof(value));
00435 
00436     addFd(socket_);
00437 }
00438 
00439 void
00440 TcpNamedProxy::
00441 connectTo(string host, int port)
00442 {
00443     struct sockaddr_in addr;
00444 
00445     bzero((char *) &addr, sizeof(addr));
00446     addr.sin_family = AF_INET;
00447     addr.sin_port = htons(port);
00448     addr.sin_addr.s_addr = inet_addr(host.c_str());
00449     if (addr.sin_addr.s_addr == -1) {
00450         throw ML::Exception(errno, "lookup failed", "inet_addr");
00451     }
00452 
00453     int rc = connect(socket_,
00454                      (const struct sockaddr *) &addr, sizeof(addr));
00455     cerr << "connect rc: " << rc << endl;
00456     if (rc == 0) {
00457         state_ = CONNECTED;
00458         int flags = fcntl(socket_, F_GETFL);
00459         flags |= O_NONBLOCK;
00460         fcntl(socket_, F_SETFL, &flags);
00461     }
00462     else if (rc == -1) {
00463         throw ML::Exception(errno, "connection failed", "connectTo");
00464     }
00465     else {
00466         throw ML::Exception(errno, "unexpected return code");
00467     }
00468 }
00469 
00470 void
00471 TcpNamedProxy::
00472 shutdown()
00473 {
00474     FullPoller::shutdown();
00475     if (socket_ != -1) {
00476         // shutdown(socket_, SHUT_RDRW);
00477         removeFd(socket_);
00478         ::close(socket_);
00479         socket_ = -1;
00480     }
00481 }
00482 
00483 bool
00484 TcpNamedProxy::
00485 isConnected()
00486     const
00487 {
00488     return state_ == CONNECTED;
00489 }
00490 
00491 bool
00492 TcpNamedProxy::
00493 sendMessage(const string & message)
00494 {
00495     return sendBuffer.writeMessage(message);
00496 }
00497 
00498 void
00499 TcpNamedProxy::
00500 handleEvent(epoll_event & event)
00501 {
00502     // cerr << "handleEvent: " << event.events << endl;
00503     if ((event.events & EPOLLIN) == EPOLLIN) {
00504         // cerr << "proxy pollin\n";
00505     }
00506     else if ((event.events & EPOLLOUT) == EPOLLOUT) {
00507         int nBytes = sendBuffer.availableForReading();
00508         if (nBytes > 0) {
00509             // cerr << "available for reading: " << nBytes << endl;
00510             char buffer[nBytes];
00511             sendBuffer.read(buffer, nBytes);
00512             int rc = ::write(socket_, buffer, nBytes);
00513             if (rc == -1) {
00514                 throw ML::Exception(errno, "handleEvent", "write");
00515             }
00516             // ExcAssertEqual(rc, nBytes);
00517             // nBytes = sendBuffer.availableForReading();
00518             // cerr << "available for reading after send: " << nBytes << endl;
00519         }
00520     }
00521     else if ((event.events & EPOLLHUP) == EPOLLHUP) {
00522         cerr << "proxy pollhup\n";
00523     }
00524 }
00525 
00526 void
00527 TcpNamedProxy::
00528 onMessage(string && newMessage)
00529 {
00530     cerr << "received message: " << newMessage << endl;
00531 }
00532 
00533 void
00534 TcpNamedProxy::
00535 onDisconnect(int fd)
00536 {
00537     cerr << "socket disconnected\n";
00538 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator