RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* endpoint_closed_connection_test.cc 00002 Jeremy Barnes, 26 July 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 #define BOOST_TEST_MAIN 00007 #define BOOST_TEST_DYN_LINK 00008 00009 #include <boost/test/unit_test.hpp> 00010 #include "jml/arch/format.h" 00011 #include "jml/utils/vector_utils.h" 00012 #include "jml/utils/exc_assert.h" 00013 #include "jml/utils/hex_dump.h" 00014 #include "jml/utils/environment.h" 00015 #include "jml/arch/exception_handler.h" 00016 #include "jml/arch/futex.h" 00017 #include "jml/arch/timers.h" 00018 #include "soa/service/http_endpoint.h" 00019 #include "soa/service/json_endpoint.h" 00020 #include <boost/thread/thread.hpp> 00021 #include <boost/thread.hpp> 00022 #include <boost/thread/barrier.hpp> 00023 #include <boost/function.hpp> 00024 #include <boost/make_shared.hpp> 00025 00026 #include <poll.h> 00027 #include <sys/socket.h> 00028 00029 using namespace std; 00030 using namespace ML; 00031 using namespace Datacratic; 00032 00033 00034 BOOST_AUTO_TEST_CASE( test_protocol_dump ) 00035 { 00036 ML::set_default_trace_exceptions(false); 00037 00038 std::function<std::shared_ptr<JsonConnectionHandler> ()> handlerFactory; 00039 00040 auto onGotJson = [&] (const HttpHeader & header, 00041 const Json::Value & payload, 00042 const std::string & jsonStr, 00043 AdHocJsonConnectionHandler * conn) 00044 { 00045 //cerr << "hello I got some JSON " << payload << endl; 00046 00047 auto onSendFinished = [=] () 00048 { 00049 conn->transport().associateWhenHandlerFinished 00050 (handlerFactory(), "gotMeSomeJson"); 00051 }; 00052 00053 conn->sendHttpChunk("1", PassiveConnectionHandler::NEXT_CONTINUE, 00054 onSendFinished); 00055 }; 00056 00057 HttpEndpoint server("testJsonServer"); 00058 server.handlerFactory = handlerFactory = [&] () 00059 { 00060 return std::make_shared<AdHocJsonConnectionHandler>(onGotJson); 00061 }; 00062 00063 int nServerThreads = 2; 00064 00065 int port = server.init(-1, "localhost", nServerThreads); 00066 cerr << "listening on port " << port << endl; 00067 00068 ACE_INET_Addr addr(port, "localhost", AF_INET); 00069 00070 int nClientThreads = 10; 00071 00072 boost::thread_group tg; 00073 00074 int shutdown = false; 00075 00076 volatile int maxFd = 0; 00077 00078 uint64_t doneRequests = 0; 00079 00080 auto doReadyThread = [&] () 00081 { 00082 while (!shutdown) { 00083 // Get a connection 00084 int fd = socket(AF_INET, SOCK_STREAM, 0); 00085 if (fd == -1) 00086 throw ML::Exception("couldn't get socket"); 00087 00088 int res = connect(fd, (sockaddr *)addr.get_addr(), 00089 addr.get_addr_size()); 00090 if (res != 0) { 00091 cerr << "fd = " << fd << endl; 00092 cerr << "done " << doneRequests << " requests" << endl; 00093 throw ML::Exception(errno, "couldn't connect to server"); 00094 } 00095 00096 if (fd > maxFd) { 00097 maxFd = fd; 00098 cerr << "maxFd now " << fd << endl; 00099 cerr << "done " << doneRequests << " requests" << endl; 00100 } 00101 00102 //cerr << "connected on fd " << fd << endl; 00103 00104 //int nrequests = 0; 00105 //int errors = 0; 00106 00107 while (!shutdown) { 00108 string request = 00109 "POST /ready HTTP/1.1\r\n" 00110 "Transfer-Encoding: Chunked\r\n" 00111 "Content-Type: application/json\r\n" 00112 "Keepalive: true\r\n" 00113 "\r\n" 00114 "2\r\n" 00115 "{}"; 00116 00117 const char * current = request.c_str(); 00118 const char * end = current + request.size(); 00119 00120 Date before = Date::now(); 00121 00122 while (current != end) { 00123 res = send(fd, current, end - current, MSG_NOSIGNAL); 00124 if (res == -1) 00125 throw ML::Exception(errno, "send()"); 00126 current += res; 00127 } 00128 00129 // Close our writing half 00130 //res = ::shutdown(fd, SHUT_WR); 00131 //cerr << "shutdown reader " << res << " " << strerror(errno) 00132 //<< endl; 00133 if (res == -1) 00134 throw ML::Exception(errno, "shutdown"); 00135 00136 ExcAssertEqual((void *)current, (void *)end); 00137 00138 struct pollfd fds[1] = { 00139 { fd, POLLIN | POLLRDHUP, 0 } 00140 }; 00141 00142 int res = poll(fds, 1, 500 /* ms timeout */); 00143 if (res == -1) 00144 throw ML::Exception(errno, "poll"); 00145 00146 if (res == 0) { 00147 cerr << "fd " << fd << " timed out after 500ms" 00148 << endl; 00149 break; 00150 } 00151 00152 // Wait for a response 00153 char buf[16384]; 00154 res = recv(fd, buf, 16384, 0); 00155 if (res == -1) 00156 throw ML::Exception(errno, "recv"); 00157 00158 //double timeTaken = Date::now().secondsSince(before); 00159 //cerr << "took " << timeTaken * 1000 << "ms" << endl; 00160 00161 if (res == 0) { 00162 cerr << "connection " << fd << " was closed" << endl; 00163 break; // connection closed 00164 } 00165 00166 //cerr << "got " << res << " bytes back from server" << endl; 00167 //string response(buf, buf + res); 00168 //cerr << "response is " << response << endl; 00169 00170 futex_wait(shutdown, 0, 0.001 /* seconds */); 00171 00172 ML::atomic_inc(doneRequests); 00173 00174 //break; // close the connection 00175 } 00176 00177 errno = 0; 00178 00179 // Close our writing half 00180 //res = ::shutdown(fd, SHUT_WR); 00181 //cerr << "shutdown reader " << res << " " << strerror(errno) 00182 //<< endl; 00183 //if (res == -1) 00184 // throw ML::Exception(errno, "shutdown"); 00185 00186 // Wait for the other end to close down 00187 //char buf[16384]; 00188 //res = recv(fd, buf, 16384, 0); 00189 //cerr << "recv " << res << " " << strerror(errno) 00190 //<< endl; 00191 //if (res == -1) 00192 // throw ML::Exception(errno, "recv"); 00193 //if (res != 0) 00194 // throw ML::Exception("got garbage"); 00195 00196 // Close our writing half 00197 //res = ::shutdown(fd, SHUT_RD); 00198 //cerr << "shutdown writer " << res << " " << strerror(errno) 00199 //<< endl; 00200 if (res == -1) 00201 throw ML::Exception(errno, "shutdown"); 00202 00203 res = close(fd); 00204 //cerr << "close " << res << " " << strerror(errno) 00205 //<< endl; 00206 if (res == -1) 00207 throw ML::Exception(errno, "close"); 00208 } 00209 }; 00210 00211 00212 for (unsigned i = 0; i <= nClientThreads; ++i) 00213 tg.create_thread(doReadyThread); 00214 00215 for (unsigned i = 0; i < 10; ++i) { 00216 ML::sleep(0.1); 00217 cerr << "done " << doneRequests << " requests" << endl; 00218 } 00219 00220 //ML::sleep(10.0); 00221 00222 shutdown = true; 00223 futex_wake(shutdown); 00224 00225 tg.join_all(); 00226 00227 cerr << "done " << doneRequests << " requests" << endl; 00228 }