RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/zmq_named_pub_sub_test.cc
00001 /* named_endpoint_test.cc                                          -*- C++ -*-
00002    Jeremy Barnes, 24 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Test for named endpoint.
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include <boost/make_shared.hpp>
00013 #include <sys/socket.h>
00014 #include "jml/utils/guard.h"
00015 #include "jml/arch/exception_handler.h"
00016 #include "jml/utils/vector_utils.h"
00017 #include "jml/arch/timers.h"
00018 #include <thread>
00019 #include "soa/service/zmq_utils.h"
00020 #include "soa/service/zmq_named_pub_sub.h"
00021 #include "soa/service/testing/zookeeper_temporary_server.h"
00022 
00023 using namespace std;
00024 using namespace ML;
00025 using namespace Datacratic;
00026 
00027 BOOST_AUTO_TEST_CASE( test_zookeeper_watches )
00028 {
00029     ZooKeeper::TemporaryServer zookeeper;
00030     zookeeper.start();
00031 
00032     auto proxies = std::make_shared<ServiceProxies>();
00033     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00034 
00035     int numChangesRoot = 0;
00036     int numChangesLeaf = 0;
00037 
00038     ConfigurationService::Watch watchRoot;
00039     watchRoot.init([&] (const std::string & node, const ConfigurationService::ChangeType change) {
00040         ++numChangesRoot;
00041         cerr << "got root change " << node << " of type " << change << endl;
00042     });
00043 
00044     ConfigurationService::Watch watchLeaf;
00045     watchLeaf.init([&] (const std::string & node, const ConfigurationService::ChangeType change) {
00046         ++numChangesLeaf;
00047         cerr << "got leaf change " << node << " of type " << change << endl;
00048     });
00049 
00050     cerr << endl << "Watching parent" << endl;
00051 
00052     auto config = proxies->config;
00053     config->removePath("");
00054     BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>());
00055     BOOST_CHECK_EQUAL(numChangesRoot, 0);
00056     BOOST_CHECK_EQUAL(numChangesLeaf, 0);
00057 
00058     cerr << endl << "Setting child to howdy" << endl;
00059 
00060     config->set("parent/child", Json::Value("howdy"));
00061     ML::sleep(0.1);
00062     BOOST_CHECK_EQUAL(numChangesRoot, 1);
00063     BOOST_CHECK_EQUAL(numChangesLeaf, 0);
00064     BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value("howdy"));
00065     BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>{"child"});
00066 
00067     cerr << endl << "Setting child to doody" << endl;
00068 
00069     config->set("parent/child", Json::Value("doody"));
00070     ML::sleep(0.1);
00071     BOOST_CHECK_EQUAL(numChangesRoot, 1);
00072     BOOST_CHECK_EQUAL(numChangesLeaf, 1);
00073     BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value("doody"));
00074     BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>{"child"});
00075     ML::sleep(0.1);
00076 
00077     cerr << endl << "Removing" << endl;
00078 
00079     config->removePath("");
00080     ML::sleep(0.1);
00081     BOOST_CHECK_EQUAL(numChangesRoot, 3); // 3 registrations - 1 that already fired
00082     BOOST_CHECK_EQUAL(numChangesLeaf, 2); // 2 registrations - 1 that already fired
00083     BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>());
00084     BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value());
00085 
00086     cerr << endl << "Setting child back again" << endl;
00087 
00088     config->set("parent/child", "hello");
00089     ML::sleep(0.1);
00090     BOOST_CHECK_EQUAL(numChangesRoot, 4);
00091     BOOST_CHECK_EQUAL(numChangesLeaf, 2); // no notification changes on creation
00092     BOOST_CHECK_EQUAL(config->getJson("parent/child"), Json::Value("hello"));
00093 }
00094 
00095 struct Publisher : public ServiceBase, public ZmqNamedPublisher {
00096 
00097     Publisher(const std::string & name,
00098               std::shared_ptr<ServiceProxies> proxies)
00099         : ServiceBase(name, proxies),
00100           ZmqNamedPublisher(proxies->zmqContext)
00101     {
00102     }
00103 
00104     ~Publisher()
00105     {
00106         unregisterServiceProvider(serviceName(), { "publisher" });
00107         shutdown();
00108     }
00109 
00110     void init()
00111     {
00112         ZmqNamedPublisher::init(getServices()->config, serviceName() + "/publish");
00113         registerServiceProvider(serviceName(), { "publisher" });
00114     }
00115 };
00116 
00117 BOOST_AUTO_TEST_CASE( test_named_publisher )
00118 {
00119     ZooKeeper::TemporaryServer zookeeper;
00120     zookeeper.start();
00121 
00122     auto proxies = std::make_shared<ServiceProxies>();
00123     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00124 
00125     ZmqNamedSubscriber subscriber(*proxies->zmqContext);
00126     subscriber.init(proxies->config);
00127 
00128     int numMessages = 0;
00129     vector<vector<string> > subscriberMessages;
00130 
00131     subscriber.messageHandler = [&] (const std::vector<zmq::message_t> & message)
00132         {
00133             vector<string> msg2;
00134             for (unsigned i = 0;  i < message.size();  ++i) {
00135                 msg2.push_back(message[i].toString());
00136             }
00137             cerr << "got subscriber message " << msg2 << endl;
00138             
00139             subscriberMessages.push_back(msg2);
00140             ++numMessages;
00141             futex_wake(numMessages);
00142         };
00143     
00144     subscriber.connectToEndpoint("pub/publish");
00145     subscriber.start();
00146     subscriber.subscribe("hello");
00147 
00148     int numIter = 1;
00149     //numIter = 10;  // TODO: test fails here
00150 
00151     for (unsigned i = 0;  i < numIter;  ++i) {
00152         ML::sleep(0.1);
00153 
00154         cerr << endl << endl << endl << endl;
00155 
00156         BOOST_CHECK_NE(subscriber.getConnectionState(),
00157                        ZmqNamedSubscriber::CONNECTED);
00158         
00159         Publisher pub("pub", proxies);
00160 
00161         pub.init();
00162         pub.bindTcp();
00163         pub.start();
00164 
00165         proxies->config->dump(cerr);
00166 
00167         ML::sleep(0.1);
00168 
00169         BOOST_CHECK_EQUAL(subscriber.getConnectionState(),
00170                           ZmqNamedSubscriber::CONNECTED);
00171 
00172 #if 1
00173         {
00174             //auto subp = new ZmqNamedSubscriber(*proxies->zmqContext);
00175             //auto & sub = *subp;
00176 
00177             ZmqNamedSubscriber sub(*proxies->zmqContext);
00178             sub.init(proxies->config);
00179             sub.start();
00180 
00181             vector<vector<string> > subscriberMessages;
00182             volatile int numMessages = 0;
00183 
00184             auto onSubscriptionMessage = [&] (const std::vector<zmq::message_t> & message)
00185                 {
00186                     vector<string> msg2;
00187                     for (unsigned i = 0;  i < message.size();  ++i) {
00188                         msg2.push_back(message[i].toString());
00189                     }
00190                     cerr << "got message " << msg2 << endl;
00191 
00192                     subscriberMessages.push_back(msg2);
00193                     ++numMessages;
00194                     futex_wake(numMessages);
00195                 };
00196             
00197 
00198             sub.messageHandler = onSubscriptionMessage;
00199 
00200             sub.connectToEndpoint("pub/publish");
00201 
00202             // Busy wait (for now)
00203             for (unsigned i = 0;  subscriber.getConnectionState() != ZmqNamedSubscriber::CONNECTED;  ++i) {
00204                 ML::sleep(0.01);
00205                 if (i && i % 10 == 0)
00206                     cerr << "warning: waited " << i / 10 << "ds for subscriber to connect" << endl;
00207                 //if (i == 200)
00208                 //    throw ML::Exception("no connection in 2 seconds");
00209             }
00210 
00211             sub.subscribe("hello");
00212 
00213             // Give the subscription message time to percolate through
00214             ML::sleep(0.5);
00215 
00216             // Publish some messages
00217             pub.publish("hello", "world");
00218             pub.publish("dog", "eats", "dog");
00219             pub.publish("hello", "stranger");
00220 
00221             cerr << "published" << endl;
00222 
00223             // Wait until they are received
00224             for (;;) {
00225                 int nm = numMessages;
00226                 if (nm == 2) break;
00227                 ML::futex_wait(numMessages, nm);
00228             }
00229 
00230             BOOST_CHECK_EQUAL(subscriberMessages.size(), 2);
00231             BOOST_CHECK_EQUAL(subscriberMessages.at(0), vector<string>({ "hello", "world"}) );
00232             BOOST_CHECK_EQUAL(subscriberMessages.at(1), vector<string>({ "hello", "stranger"}) );
00233 
00234             sub.shutdown();
00235         }
00236 #else
00237         // Publish some messages
00238         pub.publish("hello", "world");
00239         pub.publish("dog", "eats", "dog");
00240         pub.publish("hello", "stranger");
00241 #endif
00242 
00243         ML::sleep(0.1);
00244 
00245         cerr << "unregistered publisher" << endl;
00246 
00247         pub.shutdown();
00248     }
00249 
00250     ML::sleep(0.1);
00251 
00252     cerr << "got a total of " << numMessages << " subscriber messages" << endl;
00253 
00254     // Check that it got all of the messages
00255     BOOST_CHECK_EQUAL(numMessages, numIter * 2);
00256 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator