RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/named_endpoint_test.cc
00001 /* named_endpoint_test.cc                                          -*- C++ -*-
00002    Jeremy Barnes, 24 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Test for named endpoint.
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include <boost/make_shared.hpp>
00013 #include "soa/service/named_endpoint.h"
00014 #include "soa/service/message_loop.h"
00015 #include "soa/service/zmq_endpoint.h"
00016 #include <sys/socket.h>
00017 #include "jml/utils/guard.h"
00018 #include "jml/arch/exception_handler.h"
00019 #include "jml/utils/testing/watchdog.h"
00020 #include "jml/utils/testing/fd_exhauster.h"
00021 #include "jml/utils/vector_utils.h"
00022 #include "jml/arch/timers.h"
00023 #include <thread>
00024 #include "soa/service/zmq_utils.h"
00025 #include "soa/service/testing/zookeeper_temporary_server.h"
00026 
00027 
00028 using namespace std;
00029 using namespace ML;
00030 using namespace Datacratic;
00031 
00032 
00033 /*****************************************************************************/
00034 /* ECHO SERVICE                                                              */
00035 /*****************************************************************************/
00036 
00041 struct EchoService : public ServiceBase {
00042 
00043     EchoService(std::shared_ptr<ServiceProxies> proxies,
00044                 const std::string & serviceName)
00045         : ServiceBase(serviceName, proxies),
00046           context(new zmq::context_t(1)),
00047           endpoint(context),
00048           loop(1 /* num threads */, 0.0001 /* maxAddedLatency */)
00049     {
00050         proxies->config->removePath(serviceName);
00051         //registerService();
00052         endpoint.init(proxies->config, ZMQ_XREP, serviceName + "/echo");
00053 
00054         auto handler = [=] (vector<string> message)
00055             {
00056                 //cerr << "got message " << message << endl;
00057                 ExcAssertEqual(message.size(), 3);
00058                 ExcAssertEqual(message[1], "ECHO");
00059                 message[1] = "REPLY";
00060 
00061                 endpoint.sendMessage(message);
00062             };
00063 
00064         endpoint.messageHandler = handler;
00065 
00066         loop.addSource("EchoService::endpoint", endpoint);
00067     }
00068 
00069     void start()
00070     {
00071         loop.start();
00072     }
00073 
00074     void shutdown()
00075     {
00076         loop.shutdown();
00077     }
00078 
00079     std::string bindTcp()
00080     {
00081         return endpoint.bindTcp();
00082     }
00083 
00084     std::shared_ptr<zmq::context_t> context;
00085     ZmqNamedEndpoint endpoint;
00086     MessageLoop loop;
00087 };
00088 
00089 BOOST_AUTO_TEST_CASE( test_named_endpoint )
00090 {
00091     ZooKeeper::TemporaryServer zookeeper;
00092     zookeeper.start();
00093 
00094     auto proxies = std::make_shared<ServiceProxies>();
00095     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00096 
00097     EchoService service(proxies, "echo");
00098     auto addr = service.bindTcp();
00099     cerr << "echo service is listening on " << addr << endl;
00100 
00101     service.start();
00102 
00103     proxies->config->dump(cerr);
00104 
00105 
00106     volatile int numPings = 0;
00107 
00108     auto runThread = [&] ()
00109         {
00110             ZmqNamedProxy proxy;
00111             proxy.init(proxies->config, ZMQ_XREQ);
00112             proxy.connect("echo/echo");
00113 
00114             ML::sleep(0.1);
00115     
00116             cerr << "connected" << endl;
00117 
00118             while (numPings < 100000) {
00119                 int i = __sync_add_and_fetch(&numPings, 1);
00120 
00121                 if (i && i % 1000 == 0)
00122                     cerr << i << endl;
00123 
00124                 vector<string> request;
00125                 request.push_back("ECHO");
00126                 request.push_back(to_string(i));
00127 
00128                 sendAll(proxy.socket(), request);
00129 
00130                 vector<string> res = recvAll(proxy.socket());
00131 
00132                 ExcAssertEqual(res.size(), 2);
00133                 ExcAssertEqual(res[0], "REPLY");
00134                 ExcAssertEqual(res[1], to_string(i));
00135             }
00136         };
00137 
00138     boost::thread_group threads;
00139     for (unsigned i = 0;  i < 10;  ++i) {
00140         threads.create_thread(runThread);
00141     }
00142 
00143     threads.join_all();
00144 
00145     cerr << "finished requests" << endl;
00146 
00147     service.shutdown();
00148 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator