RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/test_endpoint_connection_speed.cc
00001 /* endpoint_test.cc
00002    Jeremy Barnes, 31 January 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Tests for the endpoints.
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include "soa/service/http_endpoint.h"
00013 #include "soa/service/active_endpoint.h"
00014 #include "soa/service/passive_endpoint.h"
00015 #include <sys/socket.h>
00016 #include "jml/utils/guard.h"
00017 #include "jml/arch/exception_handler.h"
00018 #include "jml/utils/testing/watchdog.h"
00019 #include "jml/utils/testing/fd_exhauster.h"
00020 #include "test_connection_error.h"
00021 #include "ping_pong.h"
00022 #include <poll.h>
00023 
00024 using namespace std;
00025 using namespace ML;
00026 using namespace Datacratic;
00027 
00028 void runAcceptThread(int & port, ACE_Semaphore & started, bool & finished)
00029 {
00030     int backlog = 1;
00031     
00032     int s = socket(AF_INET, SOCK_STREAM, 0);
00033     BOOST_REQUIRE(s > 0);
00034 
00035     for (port = 9666;  port < 9777;  ++port) {
00036         struct sockaddr_in addr = { AF_INET, htons(port), { INADDR_ANY } }; 
00037         int b = ::bind(s, reinterpret_cast<sockaddr *>(&addr), sizeof(addr));
00038         if (b == -1 && errno == EADDRINUSE) continue;
00039         if (b == 0) break;
00040         throw Exception("runAcceptThread: bind returned %s",
00041                         strerror(errno));
00042     }
00043     
00044     if (port == 9777)
00045         throw Exception("couldn't bind to any port");
00046 
00047     int r = listen(s, backlog);
00048 
00049     if (r == -1)
00050         throw Exception("error on listen: %s", strerror(errno));
00051 
00052     started.release();
00053 
00054     pollfd fds[1] = { { s, POLLIN, 0 } };
00055 
00056     while (!finished) {
00057         int res = poll(fds, 1, 10);
00058         //cerr << "poll: res = " << res << endl;
00059         if (res == 0) continue;
00060         if (res == -1)
00061             throw Exception("error on poll: %s", strerror(errno));
00062 
00063         res = accept(s, 0, 0);
00064         
00065         //cerr << "accept returned " << res << endl;
00066         
00067         if (res == -1)
00068             throw Exception("error on accept: %s", strerror(errno));
00069     }
00070 }
00071 
00072 BOOST_AUTO_TEST_CASE( test_connect_speed )
00073 {
00074     BOOST_REQUIRE_EQUAL(TransportBase::created, TransportBase::destroyed);
00075     BOOST_REQUIRE_EQUAL(ConnectionHandler::created,
00076                         ConnectionHandler::destroyed);
00077 
00078     //Watchdog watchdog(5.0);
00079 
00080     int port = 0;
00081     ACE_Semaphore started(0);
00082     bool finished = false;
00083 
00084     boost::thread thread([&] () { runAcceptThread(port, started, finished); });
00085 
00086     started.acquire();
00087 
00088     cerr << "accept started, port = " << port << endl;
00089 
00090     ActiveEndpointT<SocketTransport> connector("connector");
00091     int nconnections = 100;
00092 
00093     Date before = Date::now();
00094 
00095     connector.init(port, "localhost", nconnections);
00096 
00097     Date after = Date::now();
00098 
00099     BOOST_CHECK_LT(after.secondsSince(before), 1);
00100 
00101     BOOST_CHECK_EQUAL(connector.numConnections(), nconnections);
00102     BOOST_CHECK_EQUAL(connector.numActiveConnections(), 0);
00103     BOOST_CHECK_EQUAL(connector.numInactiveConnections(), nconnections);
00104 
00105     finished = true;
00106 
00107     connector.shutdown();
00108     
00109     BOOST_CHECK_EQUAL(TransportBase::created, TransportBase::destroyed);
00110     BOOST_CHECK_EQUAL(ConnectionHandler::created,
00111                       ConnectionHandler::destroyed);
00112 }
00113 
00114 #if 0
00115 BOOST_AUTO_TEST_CASE( test_ping_pong )
00116 {
00117     BOOST_REQUIRE_EQUAL(TransportBase::created, TransportBase::destroyed);
00118     BOOST_REQUIRE_EQUAL(ConnectionHandler::created,
00119                         ConnectionHandler::destroyed);
00120 
00121     Watchdog watchdog(5.0);
00122 
00123     string connectionError;
00124 
00125     PassiveEndpointT<SocketTransport> acceptor;
00126     
00127     acceptor.onMakeNewHandler = [&] ()
00128         {
00129             return ML::make_std_sp(new PongConnectionHandler(connectionError));
00130         };
00131     
00132     int port = acceptor.init();
00133 
00134     cerr << "port = " << port << endl;
00135 
00136     BOOST_CHECK_EQUAL(acceptor.numConnections(), 1);
00137 
00138     ActiveEndpointT<SocketTransport> connector;
00139     int nconnections = 100;
00140 
00141     Date before = Date::now();
00142 
00143     connector.init(port, "localhost", nconnections);
00144 
00145     Date after = Date::now();
00146 
00147     BOOST_CHECK_LT(after.secondsSince(before), 1);
00148 
00149     BOOST_CHECK_EQUAL(acceptor.numConnections(), nconnections + 1);
00150     BOOST_CHECK_EQUAL(connector.numConnections(), nconnections);
00151     BOOST_CHECK_EQUAL(connector.numActiveConnections(), 0);
00152     BOOST_CHECK_EQUAL(connector.numInactiveConnections(), nconnections);
00153 
00154     acceptor.closePeer();
00155 
00156     connector.shutdown();
00157     acceptor.shutdown();
00158     
00159     BOOST_CHECK_EQUAL(TransportBase::created, TransportBase::destroyed);
00160     BOOST_CHECK_EQUAL(ConnectionHandler::created,
00161                       ConnectionHandler::destroyed);
00162 }
00163 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator