RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/endpoint_closed_connection_test.cc
00001 /* endpoint_closed_connection_test.cc
00002    Jeremy Barnes, 26 July 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 #define BOOST_TEST_MAIN
00007 #define BOOST_TEST_DYN_LINK
00008 
00009 #include <boost/test/unit_test.hpp>
00010 #include "jml/arch/format.h"
00011 #include "jml/utils/vector_utils.h"
00012 #include "jml/utils/exc_assert.h"
00013 #include "jml/utils/hex_dump.h"
00014 #include "jml/utils/environment.h"
00015 #include "jml/arch/exception_handler.h"
00016 #include "jml/arch/futex.h"
00017 #include "jml/arch/timers.h"
00018 #include "soa/service/http_endpoint.h"
00019 #include "soa/service/json_endpoint.h"
00020 #include <boost/thread/thread.hpp>
00021 #include <boost/thread.hpp>
00022 #include <boost/thread/barrier.hpp>
00023 #include <boost/function.hpp>
00024 #include <boost/make_shared.hpp>
00025 
00026 #include <poll.h>
00027 #include <sys/socket.h>
00028 
00029 using namespace std;
00030 using namespace ML;
00031 using namespace Datacratic;
00032 
00033 
00034 BOOST_AUTO_TEST_CASE( test_protocol_dump )
00035 {
00036     ML::set_default_trace_exceptions(false);
00037 
00038     std::function<std::shared_ptr<JsonConnectionHandler> ()> handlerFactory;
00039 
00040     auto onGotJson = [&] (const HttpHeader & header,
00041                           const Json::Value & payload,
00042                           const std::string & jsonStr,
00043                           AdHocJsonConnectionHandler * conn)
00044         {
00045             //cerr << "hello I got some JSON " << payload << endl;
00046 
00047             auto onSendFinished = [=] ()
00048             {
00049                 conn->transport().associateWhenHandlerFinished
00050                     (handlerFactory(), "gotMeSomeJson");
00051             };
00052 
00053             conn->sendHttpChunk("1", PassiveConnectionHandler::NEXT_CONTINUE,
00054                                 onSendFinished);
00055         };
00056 
00057     HttpEndpoint server("testJsonServer");
00058     server.handlerFactory = handlerFactory = [&] ()
00059         {
00060             return std::make_shared<AdHocJsonConnectionHandler>(onGotJson);
00061         };
00062     
00063     int nServerThreads = 2;
00064 
00065     int port = server.init(-1, "localhost", nServerThreads);
00066     cerr << "listening on port " << port << endl;
00067 
00068     ACE_INET_Addr addr(port, "localhost", AF_INET);
00069             
00070     int nClientThreads = 10;
00071 
00072     boost::thread_group tg;
00073 
00074     int shutdown = false;
00075 
00076     volatile int maxFd = 0;
00077 
00078     uint64_t doneRequests = 0;
00079 
00080     auto doReadyThread = [&] ()
00081         {
00082             while (!shutdown) {
00083                 // Get a connection
00084                 int fd = socket(AF_INET, SOCK_STREAM, 0);
00085                 if (fd == -1)
00086                     throw ML::Exception("couldn't get socket");
00087             
00088                 int res = connect(fd, (sockaddr *)addr.get_addr(),
00089                                   addr.get_addr_size());
00090                 if (res != 0) {
00091                     cerr << "fd = " << fd << endl;
00092                     cerr << "done " << doneRequests << " requests" << endl;
00093                     throw ML::Exception(errno, "couldn't connect to server");
00094                 }
00095 
00096                 if (fd > maxFd) {
00097                     maxFd = fd;
00098                     cerr << "maxFd now " << fd << endl;
00099                     cerr << "done " << doneRequests << " requests" << endl;
00100                 }
00101 
00102                 //cerr << "connected on fd " << fd << endl;
00103 
00104                 //int nrequests = 0;
00105                 //int errors = 0;
00106 
00107                 while (!shutdown) {
00108                     string request = 
00109                         "POST /ready HTTP/1.1\r\n"
00110                         "Transfer-Encoding: Chunked\r\n"
00111                         "Content-Type: application/json\r\n"
00112                         "Keepalive: true\r\n"
00113                         "\r\n"
00114                         "2\r\n"
00115                         "{}";
00116 
00117                     const char * current = request.c_str();
00118                     const char * end = current + request.size();
00119 
00120                     Date before = Date::now();
00121 
00122                     while (current != end) {
00123                         res = send(fd, current, end - current, MSG_NOSIGNAL);
00124                         if (res == -1)
00125                             throw ML::Exception(errno, "send()");
00126                         current += res;
00127                     }
00128                     
00129                     // Close our writing half
00130                     //res = ::shutdown(fd, SHUT_WR);
00131                     //cerr << "shutdown reader " << res << " " << strerror(errno)
00132                     //<< endl;
00133                     if (res == -1)
00134                         throw ML::Exception(errno, "shutdown");
00135                     
00136                     ExcAssertEqual((void *)current, (void *)end);
00137                     
00138                     struct pollfd fds[1] = {
00139                         { fd, POLLIN | POLLRDHUP, 0 }
00140                     };
00141 
00142                     int res = poll(fds, 1, 500 /* ms timeout */);
00143                     if (res == -1)
00144                         throw ML::Exception(errno, "poll");
00145 
00146                     if (res == 0) {
00147                         cerr << "fd " << fd << " timed out after 500ms"
00148                              << endl;
00149                         break;
00150                     }
00151 
00152                     // Wait for a response
00153                     char buf[16384];
00154                     res = recv(fd, buf, 16384, 0);
00155                     if (res == -1)
00156                         throw ML::Exception(errno, "recv");
00157                     
00158                     //double timeTaken = Date::now().secondsSince(before);
00159                     //cerr << "took " << timeTaken * 1000 << "ms" << endl;
00160 
00161                     if (res == 0) {
00162                         cerr << "connection " << fd << " was closed" << endl;
00163                         break;  // connection closed
00164                     }
00165 
00166                     //cerr << "got " << res << " bytes back from server" << endl;
00167                     //string response(buf, buf + res);
00168                     //cerr << "response is " << response << endl;
00169                 
00170                     futex_wait(shutdown, 0, 0.001 /* seconds */);
00171 
00172                     ML::atomic_inc(doneRequests);
00173 
00174                     //break;  // close the connection
00175                 }
00176 
00177                 errno = 0;
00178 
00179                 // Close our writing half
00180                 //res = ::shutdown(fd, SHUT_WR);
00181                 //cerr << "shutdown reader " << res << " " << strerror(errno)
00182                 //<< endl;
00183                 //if (res == -1)
00184                 //    throw ML::Exception(errno, "shutdown");
00185 
00186                 // Wait for the other end to close down
00187                 //char buf[16384];
00188                 //res = recv(fd, buf, 16384, 0);
00189                 //cerr << "recv " << res << " " << strerror(errno)
00190                 //<< endl;
00191                 //if (res == -1)
00192                 //    throw ML::Exception(errno, "recv");
00193                 //if (res != 0)
00194                 //    throw ML::Exception("got garbage");
00195                 
00196                 // Close our writing half
00197                 //res = ::shutdown(fd, SHUT_RD);
00198                 //cerr << "shutdown writer " << res << " " << strerror(errno)
00199                 //<< endl;
00200                 if (res == -1)
00201                     throw ML::Exception(errno, "shutdown");
00202             
00203                 res = close(fd);
00204                 //cerr << "close " << res << " " << strerror(errno)
00205                 //<< endl;
00206                 if (res == -1)
00207                     throw ML::Exception(errno, "close");
00208             }            
00209         };
00210     
00211     
00212     for (unsigned i = 0;  i <= nClientThreads;  ++i)
00213         tg.create_thread(doReadyThread);
00214 
00215     for (unsigned i = 0;  i < 10;  ++i) {
00216         ML::sleep(0.1);
00217         cerr << "done " << doneRequests << " requests" << endl;
00218     }
00219 
00220     //ML::sleep(10.0);
00221 
00222     shutdown = true;
00223     futex_wake(shutdown);
00224 
00225     tg.join_all();
00226 
00227     cerr << "done " << doneRequests << " requests" << endl;
00228 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator