RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* named_endpoint_test.cc -*- C++ -*- 00002 Jeremy Barnes, 24 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Test for named endpoint. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include <boost/make_shared.hpp> 00013 #include <sys/socket.h> 00014 #include "jml/utils/guard.h" 00015 #include "jml/arch/exception_handler.h" 00016 #include "jml/utils/vector_utils.h" 00017 #include "jml/arch/timers.h" 00018 #include <thread> 00019 #include "soa/service/zmq_utils.h" 00020 #include "soa/service/zmq_named_pub_sub.h" 00021 #include "soa/service/testing/zookeeper_temporary_server.h" 00022 00023 using namespace std; 00024 using namespace ML; 00025 using namespace Datacratic; 00026 00027 BOOST_AUTO_TEST_CASE( test_zookeeper_watches ) 00028 { 00029 ZooKeeper::TemporaryServer zookeeper; 00030 zookeeper.start(); 00031 00032 auto proxies = std::make_shared<ServiceProxies>(); 00033 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00034 00035 int numChangesRoot = 0; 00036 int numChangesLeaf = 0; 00037 00038 ConfigurationService::Watch watchRoot; 00039 watchRoot.init([&] (const std::string & node, const ConfigurationService::ChangeType change) { 00040 ++numChangesRoot; 00041 cerr << "got root change " << node << " of type " << change << endl; 00042 }); 00043 00044 ConfigurationService::Watch watchLeaf; 00045 watchLeaf.init([&] (const std::string & node, const ConfigurationService::ChangeType change) { 00046 ++numChangesLeaf; 00047 cerr << "got leaf change " << node << " of type " << change << endl; 00048 }); 00049 00050 cerr << endl << "Watching parent" << endl; 00051 00052 auto config = proxies->config; 00053 config->removePath(""); 00054 BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>()); 00055 BOOST_CHECK_EQUAL(numChangesRoot, 0); 00056 BOOST_CHECK_EQUAL(numChangesLeaf, 0); 00057 00058 cerr << endl << "Setting child to howdy" << endl; 00059 00060 config->set("parent/child", Json::Value("howdy")); 00061 ML::sleep(0.1); 00062 BOOST_CHECK_EQUAL(numChangesRoot, 1); 00063 BOOST_CHECK_EQUAL(numChangesLeaf, 0); 00064 BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value("howdy")); 00065 BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>{"child"}); 00066 00067 cerr << endl << "Setting child to doody" << endl; 00068 00069 config->set("parent/child", Json::Value("doody")); 00070 ML::sleep(0.1); 00071 BOOST_CHECK_EQUAL(numChangesRoot, 1); 00072 BOOST_CHECK_EQUAL(numChangesLeaf, 1); 00073 BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value("doody")); 00074 BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>{"child"}); 00075 ML::sleep(0.1); 00076 00077 cerr << endl << "Removing" << endl; 00078 00079 config->removePath(""); 00080 ML::sleep(0.1); 00081 BOOST_CHECK_EQUAL(numChangesRoot, 3); // 3 registrations - 1 that already fired 00082 BOOST_CHECK_EQUAL(numChangesLeaf, 2); // 2 registrations - 1 that already fired 00083 BOOST_CHECK_EQUAL(config->getChildren("parent", watchRoot), vector<string>()); 00084 BOOST_CHECK_EQUAL(config->getJson("parent/child", watchLeaf), Json::Value()); 00085 00086 cerr << endl << "Setting child back again" << endl; 00087 00088 config->set("parent/child", "hello"); 00089 ML::sleep(0.1); 00090 BOOST_CHECK_EQUAL(numChangesRoot, 4); 00091 BOOST_CHECK_EQUAL(numChangesLeaf, 2); // no notification changes on creation 00092 BOOST_CHECK_EQUAL(config->getJson("parent/child"), Json::Value("hello")); 00093 } 00094 00095 struct Publisher : public ServiceBase, public ZmqNamedPublisher { 00096 00097 Publisher(const std::string & name, 00098 std::shared_ptr<ServiceProxies> proxies) 00099 : ServiceBase(name, proxies), 00100 ZmqNamedPublisher(proxies->zmqContext) 00101 { 00102 } 00103 00104 ~Publisher() 00105 { 00106 unregisterServiceProvider(serviceName(), { "publisher" }); 00107 shutdown(); 00108 } 00109 00110 void init() 00111 { 00112 ZmqNamedPublisher::init(getServices()->config, serviceName() + "/publish"); 00113 registerServiceProvider(serviceName(), { "publisher" }); 00114 } 00115 }; 00116 00117 BOOST_AUTO_TEST_CASE( test_named_publisher ) 00118 { 00119 ZooKeeper::TemporaryServer zookeeper; 00120 zookeeper.start(); 00121 00122 auto proxies = std::make_shared<ServiceProxies>(); 00123 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00124 00125 ZmqNamedSubscriber subscriber(*proxies->zmqContext); 00126 subscriber.init(proxies->config); 00127 00128 int numMessages = 0; 00129 vector<vector<string> > subscriberMessages; 00130 00131 subscriber.messageHandler = [&] (const std::vector<zmq::message_t> & message) 00132 { 00133 vector<string> msg2; 00134 for (unsigned i = 0; i < message.size(); ++i) { 00135 msg2.push_back(message[i].toString()); 00136 } 00137 cerr << "got subscriber message " << msg2 << endl; 00138 00139 subscriberMessages.push_back(msg2); 00140 ++numMessages; 00141 futex_wake(numMessages); 00142 }; 00143 00144 subscriber.connectToEndpoint("pub/publish"); 00145 subscriber.start(); 00146 subscriber.subscribe("hello"); 00147 00148 int numIter = 1; 00149 //numIter = 10; // TODO: test fails here 00150 00151 for (unsigned i = 0; i < numIter; ++i) { 00152 ML::sleep(0.1); 00153 00154 cerr << endl << endl << endl << endl; 00155 00156 BOOST_CHECK_NE(subscriber.getConnectionState(), 00157 ZmqNamedSubscriber::CONNECTED); 00158 00159 Publisher pub("pub", proxies); 00160 00161 pub.init(); 00162 pub.bindTcp(); 00163 pub.start(); 00164 00165 proxies->config->dump(cerr); 00166 00167 ML::sleep(0.1); 00168 00169 BOOST_CHECK_EQUAL(subscriber.getConnectionState(), 00170 ZmqNamedSubscriber::CONNECTED); 00171 00172 #if 1 00173 { 00174 //auto subp = new ZmqNamedSubscriber(*proxies->zmqContext); 00175 //auto & sub = *subp; 00176 00177 ZmqNamedSubscriber sub(*proxies->zmqContext); 00178 sub.init(proxies->config); 00179 sub.start(); 00180 00181 vector<vector<string> > subscriberMessages; 00182 volatile int numMessages = 0; 00183 00184 auto onSubscriptionMessage = [&] (const std::vector<zmq::message_t> & message) 00185 { 00186 vector<string> msg2; 00187 for (unsigned i = 0; i < message.size(); ++i) { 00188 msg2.push_back(message[i].toString()); 00189 } 00190 cerr << "got message " << msg2 << endl; 00191 00192 subscriberMessages.push_back(msg2); 00193 ++numMessages; 00194 futex_wake(numMessages); 00195 }; 00196 00197 00198 sub.messageHandler = onSubscriptionMessage; 00199 00200 sub.connectToEndpoint("pub/publish"); 00201 00202 // Busy wait (for now) 00203 for (unsigned i = 0; subscriber.getConnectionState() != ZmqNamedSubscriber::CONNECTED; ++i) { 00204 ML::sleep(0.01); 00205 if (i && i % 10 == 0) 00206 cerr << "warning: waited " << i / 10 << "ds for subscriber to connect" << endl; 00207 //if (i == 200) 00208 // throw ML::Exception("no connection in 2 seconds"); 00209 } 00210 00211 sub.subscribe("hello"); 00212 00213 // Give the subscription message time to percolate through 00214 ML::sleep(0.5); 00215 00216 // Publish some messages 00217 pub.publish("hello", "world"); 00218 pub.publish("dog", "eats", "dog"); 00219 pub.publish("hello", "stranger"); 00220 00221 cerr << "published" << endl; 00222 00223 // Wait until they are received 00224 for (;;) { 00225 int nm = numMessages; 00226 if (nm == 2) break; 00227 ML::futex_wait(numMessages, nm); 00228 } 00229 00230 BOOST_CHECK_EQUAL(subscriberMessages.size(), 2); 00231 BOOST_CHECK_EQUAL(subscriberMessages.at(0), vector<string>({ "hello", "world"}) ); 00232 BOOST_CHECK_EQUAL(subscriberMessages.at(1), vector<string>({ "hello", "stranger"}) ); 00233 00234 sub.shutdown(); 00235 } 00236 #else 00237 // Publish some messages 00238 pub.publish("hello", "world"); 00239 pub.publish("dog", "eats", "dog"); 00240 pub.publish("hello", "stranger"); 00241 #endif 00242 00243 ML::sleep(0.1); 00244 00245 cerr << "unregistered publisher" << endl; 00246 00247 pub.shutdown(); 00248 } 00249 00250 ML::sleep(0.1); 00251 00252 cerr << "got a total of " << numMessages << " subscriber messages" << endl; 00253 00254 // Check that it got all of the messages 00255 BOOST_CHECK_EQUAL(numMessages, numIter * 2); 00256 }