RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* multiple_service_test.cc 00002 Jeremy Barnes, 10 December 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #define BOOST_TEST_MAIN 00008 #define BOOST_TEST_DYN_LINK 00009 00010 #include <boost/test/unit_test.hpp> 00011 #include <boost/make_shared.hpp> 00012 #include "soa/service/named_endpoint.h" 00013 #include "soa/service/message_loop.h" 00014 #include "soa/service/zmq_endpoint.h" 00015 #include "soa/service/testing/zookeeper_temporary_server.h" 00016 #include "jml/utils/guard.h" 00017 #include "jml/arch/exception_handler.h" 00018 #include "jml/utils/testing/watchdog.h" 00019 #include "jml/utils/testing/fd_exhauster.h" 00020 #include "jml/utils/vector_utils.h" 00021 #include "jml/arch/timers.h" 00022 #include <thread> 00023 #include "soa/service/zmq_utils.h" 00024 00025 00026 using namespace std; 00027 using namespace ML; 00028 using namespace Datacratic; 00029 00030 00031 /*****************************************************************************/ 00032 /* ECHO SERVICE */ 00033 /*****************************************************************************/ 00034 00039 struct EchoService : public ServiceBase { 00040 00041 EchoService(std::shared_ptr<ServiceProxies> proxies, 00042 const std::string & serviceName) 00043 : ServiceBase(serviceName, proxies), 00044 toClients(getZmqContext()) 00045 { 00046 proxies->config->removePath(serviceName); 00047 registerServiceProvider(serviceName, { "echo" }); 00048 00049 auto handler = [=] (vector<string> message) 00050 { 00051 //cerr << "got message " << message << endl; 00052 ExcAssertEqual(message.size(), 3); 00053 ExcAssertEqual(message[1], "ECHO"); 00054 message[1] = "REPLY"; 00055 return message; 00056 }; 00057 00058 toClients.clientMessageHandler = handler; 00059 } 00060 00061 ~EchoService() 00062 { 00063 shutdown(); 00064 } 00065 00066 void init() 00067 { 00068 toClients.init(getServices()->config, serviceName() + "/echo"); 00069 } 00070 00071 void start() 00072 { 00073 toClients.start(); 00074 } 00075 00076 void shutdown() 00077 { 00078 toClients.shutdown(); 00079 } 00080 00081 std::string bindTcp() 00082 { 00083 return toClients.bindTcp(); 00084 } 00085 00086 ZmqNamedClientBus toClients; 00087 }; 00088 00089 BOOST_AUTO_TEST_CASE( test_early_connection ) 00090 { 00094 cerr << "Testing early connection..." << endl; 00095 00096 ZooKeeper::TemporaryServer zookeeper; 00097 zookeeper.start(); 00098 00099 auto proxies = std::make_shared<ServiceProxies>(); 00100 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00101 00102 ZmqNamedClientBusProxy connection(proxies->zmqContext); 00103 connection.init(proxies->config, "client1"); 00104 connection.connectHandler = [&] (const std::string & svc) { 00105 cerr << "connected to " << svc << endl; 00106 }; 00107 00108 connection.disconnectHandler = [&] (const std::string & svc) { 00109 cerr << "disconnected from " << svc << endl; 00110 }; 00111 00112 connection.start(); 00113 connection.connectToServiceClass("echo", "echo"); 00114 00115 //for(int i = 0; i != 2; ++i) 00116 { 00117 while(connection.isConnected()) { 00118 ML::sleep(0.1); 00119 } 00120 00121 BOOST_CHECK_EQUAL(connection.isConnected(), false); 00122 00123 proxies->config->removePath(""); 00124 00125 EchoService service(proxies, "echo"); 00126 service.init(); 00127 auto addr = service.bindTcp(); 00128 cerr << "echo service is listening on " << addr << endl; 00129 service.start(); 00130 00131 //proxies->config->dump(cerr); 00132 00133 while(!connection.isConnected()) { 00134 ML::sleep(0.1); 00135 } 00136 00137 cerr << "Checking that we are connected " << endl; 00138 BOOST_CHECK_EQUAL(connection.isConnected(), true); 00139 } 00140 00141 std::cerr << "done." << std::endl; 00142 } 00143 00144 #if 1 00145 BOOST_AUTO_TEST_CASE( test_multiple_services ) 00146 { 00147 ZooKeeper::TemporaryServer zookeeper; 00148 zookeeper.start(); 00149 00150 auto proxies = std::make_shared<ServiceProxies>(); 00151 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00152 00153 cerr << "Starting multiple services test " << endl; 00154 00155 ZmqMultipleNamedClientBusProxy connection(proxies->zmqContext); 00156 connection.init(proxies->config, "client1"); 00157 00158 connection.connectHandler = [&] (const std::string & svc) 00159 { 00160 cerr << "connected to " << svc << endl; 00161 }; 00162 00163 connection.disconnectHandler = [&] (const std::string & svc) 00164 { 00165 cerr << "disconnected from " << svc << endl; 00166 }; 00167 00168 connection.start(); 00169 00170 BOOST_CHECK_EQUAL(connection.connectionCount(), 0); 00171 00172 connection.connectAllServiceProviders("echo", "echo"); 00173 00174 BOOST_CHECK_EQUAL(connection.connectionCount(), 0); 00175 00176 std::vector<unique_ptr<EchoService> > services; 00177 00178 auto startService = [&] () 00179 { 00180 services.emplace_back(new EchoService(proxies, "echo" + to_string(services.size()))); 00181 EchoService & service = *services.back(); 00182 service.init(); 00183 auto addr = service.bindTcp(); 00184 cerr << "echo service is listening on " << addr << endl; 00185 service.start(); 00186 }; 00187 00188 startService(); 00189 00190 proxies->config->dump(cerr); 00191 00192 ML::sleep(0.1); 00193 00194 BOOST_CHECK_EQUAL(connection.connectionCount(), 1); 00195 00196 cerr << "shutting down" << endl; 00197 00198 connection.shutdown(); 00199 00200 for (unsigned i = 0; i < services.size(); ++i) 00201 services[i]->shutdown(); 00202 00203 } 00204 #endif