![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* endpoint_test.cc 00002 Jeremy Barnes, 31 January 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Tests for the endpoints. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include "soa/service/http_endpoint.h" 00013 #include "soa/service/active_endpoint.h" 00014 #include "soa/service/passive_endpoint.h" 00015 #include <sys/socket.h> 00016 #include "jml/utils/guard.h" 00017 #include "jml/arch/exception_handler.h" 00018 #include "jml/utils/testing/watchdog.h" 00019 #include "jml/utils/testing/fd_exhauster.h" 00020 #include "test_connection_error.h" 00021 #include "ping_pong.h" 00022 00023 using namespace std; 00024 using namespace ML; 00025 using namespace Datacratic; 00026 00027 BOOST_AUTO_TEST_CASE( test_ping_pong ) 00028 { 00029 BOOST_REQUIRE_EQUAL(TransportBase::created, TransportBase::destroyed); 00030 BOOST_REQUIRE_EQUAL(ConnectionHandler::created, 00031 ConnectionHandler::destroyed); 00032 00033 Watchdog watchdog(5.0); 00034 00035 string connectionError; 00036 00037 PassiveEndpointT<SocketTransport> acceptor("acceptor"); 00038 00039 acceptor.onMakeNewHandler = [&] () 00040 { 00041 return ML::make_std_sp(new PongConnectionHandler(connectionError)); 00042 }; 00043 00044 int port = acceptor.init(); 00045 00046 cerr << "port = " << port << endl; 00047 00048 BOOST_CHECK_EQUAL(acceptor.numConnections(), 0); 00049 00050 ActiveEndpointT<SocketTransport> connector("connector"); 00051 int nconnections = 1; 00052 connector.init(port, "localhost", nconnections); 00053 00054 ACE_Semaphore gotConnectionSem(0), finishedTestSem(0); 00055 00056 auto onNewConnection = [&] (std::shared_ptr<TransportBase> transport) 00057 { 00058 transport->associate 00059 (ML::make_std_sp(new PingConnectionHandler(connectionError, 00060 finishedTestSem))); 00061 gotConnectionSem.release(); 00062 }; 00063 00064 auto onConnectionError = [&] (const std::string & error) 00065 { 00066 cerr << "onConnectionError " << error << endl; 00067 00068 connectionError = error; 00069 gotConnectionSem.release(); 00070 }; 00071 00072 connector.getConnection(onNewConnection, 00073 onConnectionError, 00074 1.0); 00075 00076 ACE_Time_Value waitUntil(Date::now().plusSeconds(1.0).toAce()); 00077 int semWaitRes = gotConnectionSem.acquire(waitUntil); 00078 00079 BOOST_CHECK_EQUAL(semWaitRes, 0); 00080 BOOST_CHECK_EQUAL(connectionError, ""); 00081 00082 00083 BOOST_CHECK_EQUAL(acceptor.numConnections(), 1); 00084 BOOST_CHECK_EQUAL(connector.numConnections(), 1); 00085 BOOST_CHECK_EQUAL(connector.numActiveConnections(), 1); 00086 BOOST_CHECK_EQUAL(connector.numInactiveConnections(), 0); 00087 00088 waitUntil = Date::now().plusSeconds(5.0).toAce(); 00089 semWaitRes = finishedTestSem.acquire(waitUntil); 00090 00091 BOOST_CHECK_EQUAL(semWaitRes, 0); 00092 BOOST_CHECK_EQUAL(connectionError, ""); 00093 00094 acceptor.closePeer(); 00095 00096 connector.sleepUntilIdle(); 00097 acceptor.sleepUntilIdle(); 00098 00099 connector.shutdown(); 00100 acceptor.shutdown(); 00101 00102 BOOST_CHECK_EQUAL(TransportBase::created, TransportBase::destroyed); 00103 BOOST_CHECK_EQUAL(ConnectionHandler::created, 00104 ConnectionHandler::destroyed); 00105 }