RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/multiple_service_test.cc
00001 /* multiple_service_test.cc
00002    Jeremy Barnes, 10 December 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #define BOOST_TEST_MAIN
00008 #define BOOST_TEST_DYN_LINK
00009 
00010 #include <boost/test/unit_test.hpp>
00011 #include <boost/make_shared.hpp>
00012 #include "soa/service/named_endpoint.h"
00013 #include "soa/service/message_loop.h"
00014 #include "soa/service/zmq_endpoint.h"
00015 #include "soa/service/testing/zookeeper_temporary_server.h"
00016 #include "jml/utils/guard.h"
00017 #include "jml/arch/exception_handler.h"
00018 #include "jml/utils/testing/watchdog.h"
00019 #include "jml/utils/testing/fd_exhauster.h"
00020 #include "jml/utils/vector_utils.h"
00021 #include "jml/arch/timers.h"
00022 #include <thread>
00023 #include "soa/service/zmq_utils.h"
00024 
00025 
00026 using namespace std;
00027 using namespace ML;
00028 using namespace Datacratic;
00029 
00030 
00031 /*****************************************************************************/
00032 /* ECHO SERVICE                                                              */
00033 /*****************************************************************************/
00034 
00039 struct EchoService : public ServiceBase {
00040 
00041     EchoService(std::shared_ptr<ServiceProxies> proxies,
00042                 const std::string & serviceName)
00043         : ServiceBase(serviceName, proxies),
00044           toClients(getZmqContext())
00045     {
00046         proxies->config->removePath(serviceName);
00047         registerServiceProvider(serviceName, { "echo" });
00048 
00049         auto handler = [=] (vector<string> message)
00050             {
00051                 //cerr << "got message " << message << endl;
00052                 ExcAssertEqual(message.size(), 3);
00053                 ExcAssertEqual(message[1], "ECHO");
00054                 message[1] = "REPLY";
00055                 return message;
00056             };
00057 
00058         toClients.clientMessageHandler = handler;
00059     }
00060 
00061     ~EchoService()
00062     {
00063         shutdown();
00064     }
00065 
00066     void init()
00067     {
00068         toClients.init(getServices()->config, serviceName() + "/echo");
00069     }
00070 
00071     void start()
00072     {
00073         toClients.start();
00074     }
00075 
00076     void shutdown()
00077     {
00078         toClients.shutdown();
00079     }
00080 
00081     std::string bindTcp()
00082     {
00083         return toClients.bindTcp();
00084     }
00085 
00086     ZmqNamedClientBus toClients;
00087 };
00088 
00089 BOOST_AUTO_TEST_CASE( test_early_connection )
00090 {
00094     cerr << "Testing early connection..." << endl;
00095 
00096     ZooKeeper::TemporaryServer zookeeper;
00097     zookeeper.start();
00098 
00099     auto proxies = std::make_shared<ServiceProxies>();
00100     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00101 
00102     ZmqNamedClientBusProxy connection(proxies->zmqContext);
00103     connection.init(proxies->config, "client1");
00104     connection.connectHandler = [&] (const std::string & svc) {
00105         cerr << "connected to " << svc << endl;
00106     };
00107 
00108     connection.disconnectHandler = [&] (const std::string  & svc) {
00109         cerr << "disconnected from " << svc << endl;
00110     };
00111 
00112     connection.start();
00113     connection.connectToServiceClass("echo", "echo");
00114 
00115     //for(int i = 0; i != 2; ++i)
00116     {
00117         while(connection.isConnected()) {
00118             ML::sleep(0.1);
00119         }
00120 
00121         BOOST_CHECK_EQUAL(connection.isConnected(), false);
00122 
00123         proxies->config->removePath("");
00124 
00125         EchoService service(proxies, "echo");
00126         service.init();
00127         auto addr = service.bindTcp();
00128         cerr << "echo service is listening on " << addr << endl;
00129         service.start();
00130 
00131         //proxies->config->dump(cerr);
00132 
00133         while(!connection.isConnected()) {
00134             ML::sleep(0.1);
00135         }
00136 
00137         cerr << "Checking that we are connected " << endl;
00138         BOOST_CHECK_EQUAL(connection.isConnected(), true);
00139     }
00140 
00141     std::cerr << "done." << std::endl;
00142 }
00143 
00144 #if 1
00145 BOOST_AUTO_TEST_CASE( test_multiple_services )
00146 {
00147     ZooKeeper::TemporaryServer zookeeper;
00148     zookeeper.start();
00149 
00150     auto proxies = std::make_shared<ServiceProxies>();
00151     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00152 
00153     cerr << "Starting multiple services test " << endl;
00154 
00155     ZmqMultipleNamedClientBusProxy connection(proxies->zmqContext);
00156     connection.init(proxies->config, "client1");
00157 
00158     connection.connectHandler = [&] (const std::string & svc)
00159         {
00160             cerr << "connected to " << svc << endl;
00161         };
00162 
00163     connection.disconnectHandler = [&] (const std::string  & svc)
00164         {
00165             cerr << "disconnected from " << svc << endl;
00166         };
00167 
00168     connection.start();
00169 
00170     BOOST_CHECK_EQUAL(connection.connectionCount(), 0);
00171 
00172     connection.connectAllServiceProviders("echo", "echo");
00173 
00174     BOOST_CHECK_EQUAL(connection.connectionCount(), 0);
00175 
00176     std::vector<unique_ptr<EchoService> > services;
00177 
00178     auto startService = [&] ()
00179         {
00180             services.emplace_back(new EchoService(proxies, "echo" + to_string(services.size())));
00181             EchoService & service = *services.back();
00182             service.init();
00183             auto addr = service.bindTcp();
00184             cerr << "echo service is listening on " << addr << endl;
00185             service.start();
00186         };
00187 
00188     startService();
00189 
00190     proxies->config->dump(cerr);
00191 
00192     ML::sleep(0.1);
00193 
00194     BOOST_CHECK_EQUAL(connection.connectionCount(), 1);
00195 
00196     cerr << "shutting down" << endl;
00197 
00198     connection.shutdown();
00199 
00200     for (unsigned i = 0;  i < services.size();  ++i)
00201         services[i]->shutdown();
00202 
00203 }
00204 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator