RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* endpoint_test.cc 00002 Jeremy Barnes, 31 January 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Tests for the endpoints. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include "soa/service/http_endpoint.h" 00013 #include "soa/service/active_endpoint.h" 00014 #include "soa/service/passive_endpoint.h" 00015 #include <sys/socket.h> 00016 #include "jml/utils/guard.h" 00017 #include "jml/arch/exception_handler.h" 00018 #include "jml/utils/testing/watchdog.h" 00019 #include "jml/utils/testing/fd_exhauster.h" 00020 #include "test_connection_error.h" 00021 #include "ping_pong.h" 00022 #include <poll.h> 00023 00024 using namespace std; 00025 using namespace ML; 00026 using namespace Datacratic; 00027 00028 void runAcceptThread(int & port, ACE_Semaphore & started, bool & finished) 00029 { 00030 int backlog = 1; 00031 00032 int s = socket(AF_INET, SOCK_STREAM, 0); 00033 BOOST_REQUIRE(s > 0); 00034 00035 for (port = 9666; port < 9777; ++port) { 00036 struct sockaddr_in addr = { AF_INET, htons(port), { INADDR_ANY } }; 00037 int b = ::bind(s, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)); 00038 if (b == -1 && errno == EADDRINUSE) continue; 00039 if (b == 0) break; 00040 throw Exception("runAcceptThread: bind returned %s", 00041 strerror(errno)); 00042 } 00043 00044 if (port == 9777) 00045 throw Exception("couldn't bind to any port"); 00046 00047 int r = listen(s, backlog); 00048 00049 if (r == -1) 00050 throw Exception("error on listen: %s", strerror(errno)); 00051 00052 started.release(); 00053 00054 pollfd fds[1] = { { s, POLLIN, 0 } }; 00055 00056 while (!finished) { 00057 int res = poll(fds, 1, 10); 00058 //cerr << "poll: res = " << res << endl; 00059 if (res == 0) continue; 00060 if (res == -1) 00061 throw Exception("error on poll: %s", strerror(errno)); 00062 00063 res = accept(s, 0, 0); 00064 00065 //cerr << "accept returned " << res << endl; 00066 00067 if (res == -1) 00068 throw Exception("error on accept: %s", strerror(errno)); 00069 } 00070 } 00071 00072 BOOST_AUTO_TEST_CASE( test_connect_speed ) 00073 { 00074 BOOST_REQUIRE_EQUAL(TransportBase::created, TransportBase::destroyed); 00075 BOOST_REQUIRE_EQUAL(ConnectionHandler::created, 00076 ConnectionHandler::destroyed); 00077 00078 //Watchdog watchdog(5.0); 00079 00080 int port = 0; 00081 ACE_Semaphore started(0); 00082 bool finished = false; 00083 00084 boost::thread thread([&] () { runAcceptThread(port, started, finished); }); 00085 00086 started.acquire(); 00087 00088 cerr << "accept started, port = " << port << endl; 00089 00090 ActiveEndpointT<SocketTransport> connector("connector"); 00091 int nconnections = 100; 00092 00093 Date before = Date::now(); 00094 00095 connector.init(port, "localhost", nconnections); 00096 00097 Date after = Date::now(); 00098 00099 BOOST_CHECK_LT(after.secondsSince(before), 1); 00100 00101 BOOST_CHECK_EQUAL(connector.numConnections(), nconnections); 00102 BOOST_CHECK_EQUAL(connector.numActiveConnections(), 0); 00103 BOOST_CHECK_EQUAL(connector.numInactiveConnections(), nconnections); 00104 00105 finished = true; 00106 00107 connector.shutdown(); 00108 00109 BOOST_CHECK_EQUAL(TransportBase::created, TransportBase::destroyed); 00110 BOOST_CHECK_EQUAL(ConnectionHandler::created, 00111 ConnectionHandler::destroyed); 00112 } 00113 00114 #if 0 00115 BOOST_AUTO_TEST_CASE( test_ping_pong ) 00116 { 00117 BOOST_REQUIRE_EQUAL(TransportBase::created, TransportBase::destroyed); 00118 BOOST_REQUIRE_EQUAL(ConnectionHandler::created, 00119 ConnectionHandler::destroyed); 00120 00121 Watchdog watchdog(5.0); 00122 00123 string connectionError; 00124 00125 PassiveEndpointT<SocketTransport> acceptor; 00126 00127 acceptor.onMakeNewHandler = [&] () 00128 { 00129 return ML::make_std_sp(new PongConnectionHandler(connectionError)); 00130 }; 00131 00132 int port = acceptor.init(); 00133 00134 cerr << "port = " << port << endl; 00135 00136 BOOST_CHECK_EQUAL(acceptor.numConnections(), 1); 00137 00138 ActiveEndpointT<SocketTransport> connector; 00139 int nconnections = 100; 00140 00141 Date before = Date::now(); 00142 00143 connector.init(port, "localhost", nconnections); 00144 00145 Date after = Date::now(); 00146 00147 BOOST_CHECK_LT(after.secondsSince(before), 1); 00148 00149 BOOST_CHECK_EQUAL(acceptor.numConnections(), nconnections + 1); 00150 BOOST_CHECK_EQUAL(connector.numConnections(), nconnections); 00151 BOOST_CHECK_EQUAL(connector.numActiveConnections(), 0); 00152 BOOST_CHECK_EQUAL(connector.numInactiveConnections(), nconnections); 00153 00154 acceptor.closePeer(); 00155 00156 connector.shutdown(); 00157 acceptor.shutdown(); 00158 00159 BOOST_CHECK_EQUAL(TransportBase::created, TransportBase::destroyed); 00160 BOOST_CHECK_EQUAL(ConnectionHandler::created, 00161 ConnectionHandler::destroyed); 00162 } 00163 #endif