RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* json_service_endpoint_test.cc 00002 Jeremy Barnes, 9 November 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Test for the JSON service endpoint. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include <boost/make_shared.hpp> 00013 #include "soa/service/named_endpoint.h" 00014 #include "soa/service/message_loop.h" 00015 #include "soa/service/rest_service_endpoint.h" 00016 #include "soa/service/rest_proxy.h" 00017 #include <sys/socket.h> 00018 #include "jml/utils/guard.h" 00019 #include "jml/arch/exception_handler.h" 00020 #include "jml/utils/testing/watchdog.h" 00021 #include "jml/utils/testing/fd_exhauster.h" 00022 #include "jml/utils/vector_utils.h" 00023 #include "jml/arch/timers.h" 00024 #include <thread> 00025 #include "soa/service/zmq_utils.h" 00026 #include "soa/service/testing/zookeeper_temporary_server.h" 00027 00028 00029 using namespace std; 00030 using namespace ML; 00031 using namespace Datacratic; 00032 00033 00034 /*****************************************************************************/ 00035 /* ECHO SERVICE */ 00036 /*****************************************************************************/ 00037 00042 struct EchoService : public ServiceBase, public RestServiceEndpoint { 00043 00044 EchoService(std::shared_ptr<ServiceProxies> proxies, 00045 const std::string & serviceName) 00046 : ServiceBase(serviceName, proxies), 00047 RestServiceEndpoint(proxies->zmqContext) 00048 { 00049 proxies->config->removePath(serviceName); 00050 RestServiceEndpoint::init(proxies->config, 00051 serviceName, 0.0005 /* maxAddedLatency */); 00052 } 00053 00054 ~EchoService() 00055 { 00056 shutdown(); 00057 } 00058 00059 virtual void handleRequest(const ConnectionId & connection, 00060 const RestRequest & request) const 00061 { 00062 //cerr << "handling request " << request << endl; 00063 if (request.verb != "POST") 00064 throw ML::Exception("echo service needs POST"); 00065 if (request.resource != "/echo") 00066 throw ML::Exception("echo service only responds to /echo"); 00067 connection.sendResponse(200, request.payload, "text/plain"); 00068 } 00069 }; 00070 00071 BOOST_AUTO_TEST_CASE( test_named_endpoint ) 00072 { 00073 ZooKeeper::TemporaryServer zookeeper; 00074 zookeeper.start(); 00075 00076 auto proxies = std::make_shared<ServiceProxies>(); 00077 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00078 00079 int totalPings = 1000; 00080 00081 EchoService service(proxies, "echo"); 00082 auto addr = service.bindTcp(); 00083 cerr << "echo service is listening on " << addr.first << " and " 00084 << addr.second << endl; 00085 00086 service.start(); 00087 00088 proxies->config->dump(cerr); 00089 00090 00091 volatile int numPings = 0; 00092 00093 auto runZmqThread = [=, &numPings] () 00094 { 00095 RestProxy proxy(proxies->zmqContext); 00096 proxy.init(proxies->config, "echo"); 00097 proxy.start(); 00098 cerr << "connected" << endl; 00099 00100 volatile int numOutstanding = 0; 00101 00102 while (numPings < totalPings) { 00103 int i = __sync_add_and_fetch(&numPings, 1); 00104 00105 if (i && i % 1000 == 0) 00106 cerr << i << " with " << numOutstanding << " outstanding" 00107 << endl; 00108 00109 auto onResponse = [=, &numOutstanding] 00110 (std::exception_ptr ptr, 00111 int responseCode, 00112 std::string body) 00113 { 00114 //cerr << "got response " << responseCode 00115 // << endl; 00116 ML::atomic_dec(numOutstanding); 00117 00118 if (ptr) 00119 throw ML::Exception("response returned exception"); 00120 ExcAssertEqual(responseCode, 200); 00121 ExcAssertEqual(body, to_string(i)); 00122 00123 futex_wake(numOutstanding); 00124 }; 00125 00126 proxy.push(onResponse, 00127 "POST", "/echo", {}, to_string(i)); 00128 ML::atomic_inc(numOutstanding); 00129 } 00130 00131 proxy.sleepUntilIdle(); 00132 00133 //ML::sleep(1.0); 00134 00135 cerr << "shutting down proxy " << this << endl; 00136 proxy.shutdown(); 00137 cerr << "done proxy shutdown" << endl; 00138 }; 00139 00140 auto runHttpThread = [&] () 00141 { 00142 HttpNamedRestProxy proxy; 00143 proxy.init(proxies->config); 00144 proxy.connect("echo/http"); 00145 00146 while (numPings < totalPings) { 00147 int i = __sync_add_and_fetch(&numPings, 1); 00148 00149 if (i && i % 1000 == 0) 00150 cerr << i << endl; 00151 00152 auto response = proxy.post("/echo", to_string(i)); 00153 00154 ExcAssertEqual(response.code_, 200); 00155 ExcAssertEqual(response.body_, to_string(i)); 00156 } 00157 00158 }; 00159 00160 boost::thread_group threads; 00161 00162 for (unsigned i = 0; i < 8; ++i) { 00163 threads.create_thread(runZmqThread); 00164 } 00165 00166 //for (unsigned i = 0; i < 5; ++i) { 00167 // threads.create_thread(runHttpThread); 00168 //} 00169 00170 threads.join_all(); 00171 00172 cerr << "finished requests" << endl; 00173 00174 service.shutdown(); 00175 }