RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* named_endpoint_test.cc -*- C++ -*- 00002 Jeremy Barnes, 24 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Test for named endpoint. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include <boost/make_shared.hpp> 00013 #include "soa/service/named_endpoint.h" 00014 #include "soa/service/message_loop.h" 00015 #include "soa/service/zmq_endpoint.h" 00016 #include <sys/socket.h> 00017 #include "jml/utils/guard.h" 00018 #include "jml/arch/exception_handler.h" 00019 #include "jml/utils/testing/watchdog.h" 00020 #include "jml/utils/testing/fd_exhauster.h" 00021 #include "jml/utils/vector_utils.h" 00022 #include "jml/arch/timers.h" 00023 #include <thread> 00024 #include "soa/service/zmq_utils.h" 00025 #include "soa/service/testing/zookeeper_temporary_server.h" 00026 00027 00028 using namespace std; 00029 using namespace ML; 00030 using namespace Datacratic; 00031 00032 00033 /*****************************************************************************/ 00034 /* ECHO SERVICE */ 00035 /*****************************************************************************/ 00036 00041 struct EchoService : public ServiceBase { 00042 00043 EchoService(std::shared_ptr<ServiceProxies> proxies, 00044 const std::string & serviceName) 00045 : ServiceBase(serviceName, proxies), 00046 context(new zmq::context_t(1)), 00047 endpoint(context), 00048 loop(1 /* num threads */, 0.0001 /* maxAddedLatency */) 00049 { 00050 proxies->config->removePath(serviceName); 00051 //registerService(); 00052 endpoint.init(proxies->config, ZMQ_XREP, serviceName + "/echo"); 00053 00054 auto handler = [=] (vector<string> message) 00055 { 00056 //cerr << "got message " << message << endl; 00057 ExcAssertEqual(message.size(), 3); 00058 ExcAssertEqual(message[1], "ECHO"); 00059 message[1] = "REPLY"; 00060 00061 endpoint.sendMessage(message); 00062 }; 00063 00064 endpoint.messageHandler = handler; 00065 00066 loop.addSource("EchoService::endpoint", endpoint); 00067 } 00068 00069 void start() 00070 { 00071 loop.start(); 00072 } 00073 00074 void shutdown() 00075 { 00076 loop.shutdown(); 00077 } 00078 00079 std::string bindTcp() 00080 { 00081 return endpoint.bindTcp(); 00082 } 00083 00084 std::shared_ptr<zmq::context_t> context; 00085 ZmqNamedEndpoint endpoint; 00086 MessageLoop loop; 00087 }; 00088 00089 BOOST_AUTO_TEST_CASE( test_named_endpoint ) 00090 { 00091 ZooKeeper::TemporaryServer zookeeper; 00092 zookeeper.start(); 00093 00094 auto proxies = std::make_shared<ServiceProxies>(); 00095 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00096 00097 EchoService service(proxies, "echo"); 00098 auto addr = service.bindTcp(); 00099 cerr << "echo service is listening on " << addr << endl; 00100 00101 service.start(); 00102 00103 proxies->config->dump(cerr); 00104 00105 00106 volatile int numPings = 0; 00107 00108 auto runThread = [&] () 00109 { 00110 ZmqNamedProxy proxy; 00111 proxy.init(proxies->config, ZMQ_XREQ); 00112 proxy.connect("echo/echo"); 00113 00114 ML::sleep(0.1); 00115 00116 cerr << "connected" << endl; 00117 00118 while (numPings < 100000) { 00119 int i = __sync_add_and_fetch(&numPings, 1); 00120 00121 if (i && i % 1000 == 0) 00122 cerr << i << endl; 00123 00124 vector<string> request; 00125 request.push_back("ECHO"); 00126 request.push_back(to_string(i)); 00127 00128 sendAll(proxy.socket(), request); 00129 00130 vector<string> res = recvAll(proxy.socket()); 00131 00132 ExcAssertEqual(res.size(), 2); 00133 ExcAssertEqual(res[0], "REPLY"); 00134 ExcAssertEqual(res[1], to_string(i)); 00135 } 00136 }; 00137 00138 boost::thread_group threads; 00139 for (unsigned i = 0; i < 10; ++i) { 00140 threads.create_thread(runThread); 00141 } 00142 00143 threads.join_all(); 00144 00145 cerr << "finished requests" << endl; 00146 00147 service.shutdown(); 00148 }