RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/rest_service_endpoint_test.cc
00001 /* json_service_endpoint_test.cc
00002    Jeremy Barnes, 9 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Test for the JSON service endpoint.
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include <boost/make_shared.hpp>
00013 #include "soa/service/named_endpoint.h"
00014 #include "soa/service/message_loop.h"
00015 #include "soa/service/rest_service_endpoint.h"
00016 #include "soa/service/rest_proxy.h"
00017 #include <sys/socket.h>
00018 #include "jml/utils/guard.h"
00019 #include "jml/arch/exception_handler.h"
00020 #include "jml/utils/testing/watchdog.h"
00021 #include "jml/utils/testing/fd_exhauster.h"
00022 #include "jml/utils/vector_utils.h"
00023 #include "jml/arch/timers.h"
00024 #include <thread>
00025 #include "soa/service/zmq_utils.h"
00026 #include "soa/service/testing/zookeeper_temporary_server.h"
00027 
00028 
00029 using namespace std;
00030 using namespace ML;
00031 using namespace Datacratic;
00032 
00033 
00034 /*****************************************************************************/
00035 /* ECHO SERVICE                                                              */
00036 /*****************************************************************************/
00037 
00042 struct EchoService : public ServiceBase, public RestServiceEndpoint {
00043 
00044     EchoService(std::shared_ptr<ServiceProxies> proxies,
00045                 const std::string & serviceName)
00046         : ServiceBase(serviceName, proxies),
00047           RestServiceEndpoint(proxies->zmqContext)
00048     {
00049         proxies->config->removePath(serviceName);
00050         RestServiceEndpoint::init(proxies->config,
00051                                   serviceName, 0.0005 /* maxAddedLatency */);
00052     }
00053 
00054     ~EchoService()
00055     {
00056         shutdown();
00057     }
00058 
00059     virtual void handleRequest(const ConnectionId & connection,
00060                                const RestRequest & request) const
00061     {
00062         //cerr << "handling request " << request << endl;
00063         if (request.verb != "POST")
00064             throw ML::Exception("echo service needs POST");
00065         if (request.resource != "/echo")
00066             throw ML::Exception("echo service only responds to /echo");
00067         connection.sendResponse(200, request.payload, "text/plain");
00068     }
00069 };
00070 
00071 BOOST_AUTO_TEST_CASE( test_named_endpoint )
00072 {
00073     ZooKeeper::TemporaryServer zookeeper;
00074     zookeeper.start();
00075 
00076     auto proxies = std::make_shared<ServiceProxies>();
00077     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00078 
00079     int totalPings = 1000;
00080 
00081     EchoService service(proxies, "echo");
00082     auto addr = service.bindTcp();
00083     cerr << "echo service is listening on " << addr.first << " and "
00084          << addr.second << endl;
00085 
00086     service.start();
00087 
00088     proxies->config->dump(cerr);
00089 
00090 
00091     volatile int numPings = 0;
00092 
00093     auto runZmqThread = [=, &numPings] ()
00094         {
00095             RestProxy proxy(proxies->zmqContext);
00096             proxy.init(proxies->config, "echo");
00097             proxy.start();
00098             cerr << "connected" << endl;
00099 
00100             volatile int numOutstanding = 0;
00101 
00102             while (numPings < totalPings) {
00103                 int i = __sync_add_and_fetch(&numPings, 1);
00104 
00105                 if (i && i % 1000 == 0)
00106                     cerr << i << " with " << numOutstanding << " outstanding"
00107                          << endl;
00108 
00109                 auto onResponse = [=, &numOutstanding]
00110                     (std::exception_ptr ptr,
00111                      int responseCode,
00112                      std::string body)
00113                     {
00114                         //cerr << "got response " << responseCode
00115                         //     << endl;
00116                         ML::atomic_dec(numOutstanding);
00117 
00118                         if (ptr)
00119                             throw ML::Exception("response returned exception");
00120                         ExcAssertEqual(responseCode, 200);
00121                         ExcAssertEqual(body, to_string(i));
00122 
00123                         futex_wake(numOutstanding);
00124                     };
00125                 
00126                 proxy.push(onResponse,
00127                            "POST", "/echo", {}, to_string(i));
00128                 ML::atomic_inc(numOutstanding);
00129             }
00130 
00131             proxy.sleepUntilIdle();
00132 
00133             //ML::sleep(1.0);
00134 
00135             cerr << "shutting down proxy " << this << endl;
00136             proxy.shutdown();
00137             cerr << "done proxy shutdown" << endl;
00138         };
00139 
00140     auto runHttpThread = [&] ()
00141         {
00142             HttpNamedRestProxy proxy;
00143             proxy.init(proxies->config);
00144             proxy.connect("echo/http");
00145 
00146             while (numPings < totalPings) {
00147                 int i = __sync_add_and_fetch(&numPings, 1);
00148 
00149                 if (i && i % 1000 == 0)
00150                     cerr << i << endl;
00151 
00152                 auto response = proxy.post("/echo", to_string(i));
00153                 
00154                 ExcAssertEqual(response.code_, 200);
00155                 ExcAssertEqual(response.body_, to_string(i));
00156             }
00157 
00158         };
00159 
00160     boost::thread_group threads;
00161 
00162     for (unsigned i = 0;  i < 8;  ++i) {
00163         threads.create_thread(runZmqThread);
00164     }
00165 
00166     //for (unsigned i = 0;  i < 5;  ++i) {
00167     //    threads.create_thread(runHttpThread);
00168     //}
00169 
00170     threads.join_all();
00171 
00172     cerr << "finished requests" << endl;
00173 
00174     service.shutdown();
00175 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator