![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* graphite_connector_test.cc 00002 Jeremy Barnes, 3 August 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Test for the carbon connector. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include "soa/service/carbon_connector.h" 00013 #include "jml/arch/atomic_ops.h" 00014 #include "jml/arch/timers.h" 00015 #include "soa/service/passive_endpoint.h" 00016 #include <boost/make_shared.hpp> 00017 00018 00019 using namespace std; 00020 using namespace Datacratic; 00021 using namespace ML; 00022 00023 00024 BOOST_AUTO_TEST_CASE( test_counter_aggregator ) 00025 { 00026 // We record events to aggregate from multiple threads with simultaneous 00027 // occasional resets of the counter and we make sure that no events are 00028 // lost. 00029 00030 cerr << "counter aggregator" << endl; 00031 00032 CounterAggregator aggregator; 00033 00034 uint64_t nthreads = 8, iter = 100000; 00035 boost::barrier barrier(nthreads); 00036 uint64_t total = 0; 00037 boost::thread_group tg; 00038 for (unsigned i = 0; i < nthreads; ++i) { 00039 auto doThread = [&] () 00040 { 00041 barrier.wait(); 00042 00043 for (unsigned i = 0; i < iter; ++i) { 00044 aggregator.record(1.0); 00045 00046 if (random() % 100 == 0) { 00047 double val = aggregator.reset().first; 00048 uint64_t val2 = val; 00049 00050 //cerr << "val2 = " << val2 << endl; 00051 00052 //BOOST_CHECK_EQUAL(val, val2); 00053 00054 atomic_add(total, val2); 00055 } 00056 } 00057 }; 00058 00059 tg.create_thread(doThread); 00060 } 00061 00062 tg.join_all(); 00063 00064 double val = aggregator.reset().first; 00065 uint64_t val2 = val; 00066 00067 //cerr << "val2 = " << val2 << endl; 00068 00069 BOOST_CHECK_EQUAL(val, val2); 00070 atomic_add(total, val2); 00071 00072 BOOST_CHECK_EQUAL(total, iter * nthreads); 00073 } 00074 00075 BOOST_AUTO_TEST_CASE( test_gauge_aggregator ) 00076 { 00077 // We record events to aggregate from multiple threads with simultaneous 00078 // occasional resets of the counter and we make sure that no events are 00079 // lost. 00080 00081 cerr << "gauge aggregator" << endl; 00082 00083 GaugeAggregator aggregator; 00084 00085 uint64_t nthreads = 8, iter = 100000; 00086 boost::barrier barrier(nthreads); 00087 boost::thread_group tg; 00088 00089 boost::mutex mutex; 00090 00091 ML::distribution<float> allValues; 00092 00093 for (unsigned i = 0; i < nthreads; ++i) { 00094 auto doThread = [&] () 00095 { 00096 ML::distribution<float> threadValues; 00097 00098 barrier.wait(); 00099 00100 for (unsigned i = 0; i < iter; ++i) { 00101 aggregator.record(1.0 + (i % 2)); 00102 00103 if (random() % 1000 == 0) { 00104 ML::distribution<float> * values 00105 = aggregator.reset().first; 00106 00107 threadValues.insert(threadValues.end(), 00108 values->begin(), values->end()); 00109 00110 delete values; 00111 } 00112 } 00113 00114 boost::lock_guard<boost::mutex> lock(mutex); 00115 allValues.insert(allValues.end(), 00116 threadValues.begin(), threadValues.end()); 00117 }; 00118 00119 tg.create_thread(doThread); 00120 } 00121 00122 tg.join_all(); 00123 00124 ML::distribution<float> * values 00125 = aggregator.reset().first; 00126 00127 allValues.insert(allValues.end(), 00128 values->begin(), values->end()); 00129 std::sort(allValues.begin(), allValues.end()); 00130 00131 BOOST_CHECK_EQUAL(allValues.size(), iter * nthreads); 00132 BOOST_CHECK_EQUAL(allValues.mean(), 1.5); 00133 BOOST_CHECK_EQUAL(allValues.std(), 0.5); 00134 } 00135 00136 BOOST_AUTO_TEST_CASE( test_multi_aggregator ) 00137 { 00138 std::vector<StatReading> readings; 00139 00140 boost::mutex m; 00141 m.lock(); 00142 00143 auto recordReading = [&] (const std::vector<StatReading> & stats) 00144 { 00145 cerr << "got reading of " << stats.size() << " counters" 00146 << endl; 00147 for (unsigned i = 0; i < stats.size(); ++i) 00148 cerr << stats[i].name << " " << stats[i].value << endl; 00149 readings.insert(readings.end(), stats.begin(), stats.end()); 00150 00151 m.unlock(); 00152 }; 00153 00154 MultiAggregator agg("hello", recordReading, 0.0); 00155 00156 for (unsigned i = 0; i <= 100; ++i) 00157 agg.recordLevel("bonus", i); 00158 00159 agg.dump(); 00160 00161 // agg.dump() will eventually call recordReading from another thread, 00162 // which will unlock m when it's finished, allowing this code to 00163 // proceed. 00164 00165 m.lock(); 00166 00167 BOOST_REQUIRE_EQUAL(readings.size(), 8); 00168 BOOST_CHECK_EQUAL(readings[0].name, "bonus.mean"); 00169 BOOST_CHECK_EQUAL(readings[0].value, 50.0); 00170 } 00171 00172 struct FakeCarbon : public PassiveEndpointT<SocketTransport> { 00173 00174 FakeCarbon() 00175 : PassiveEndpointT<SocketTransport>("Carbon"), 00176 numConnections(0), numDisconnections(0), 00177 numDataMessages(0), numErrorMessages(0) 00178 { 00179 } 00180 00181 struct CarbonConnection: public PassiveConnectionHandler { 00182 00183 CarbonConnection(FakeCarbon * owner) 00184 : owner(owner) 00185 { 00186 } 00187 00188 FakeCarbon * owner; 00189 00191 virtual void handleData(const std::string & data) 00192 { 00193 cerr << "got data " << data << endl; 00194 ML::atomic_inc(owner->numDataMessages); 00195 } 00196 00198 virtual void handleError(const std::string & message) 00199 { 00200 cerr << "got error " << message << endl; 00201 ML::atomic_inc(owner->numErrorMessages); 00202 } 00203 00204 virtual void onGotTransport() 00205 { 00206 cerr << "on got transport" << endl; 00207 startReading(); 00208 } 00209 00210 virtual void handleDisconnect() 00211 { 00212 cerr << "got disconnection" << endl; 00213 ML::atomic_inc(owner->numDisconnections); 00214 closeWhenHandlerFinished(); 00215 } 00216 }; 00217 00218 virtual std::shared_ptr<ConnectionHandler> 00219 makeNewHandler() 00220 { 00221 ML::atomic_inc(numConnections); 00222 cerr << "got new connection" << endl; 00223 return std::make_shared<CarbonConnection>(this); 00224 } 00225 00226 int numConnections, numDisconnections, numDataMessages, numErrorMessages; 00227 }; 00228 00229 BOOST_AUTO_TEST_CASE( test_multiple_carbon_connectors ) 00230 { 00231 FakeCarbon carbon1, carbon2; 00232 int port1 = carbon1.init(); 00233 int port2 = carbon2.init(); 00234 00235 string addr1 = ML::format("localhost:%d", port1); 00236 string addr2 = ML::format("localhost:%d", port2); 00237 00238 cerr << "fake carbons are on " << port1 << " and " << port2 << endl; 00239 00240 vector<string> addrs({ addr1, addr2 }); 00241 00242 CarbonConnector x(addrs, "test"); 00243 00244 for (unsigned i = 0; i < 1000; ++i) { 00245 x.recordOccurrence("hit"); 00246 } 00247 00248 x.dump(); 00249 00250 ML::sleep(0.5); 00251 00252 BOOST_CHECK_EQUAL(carbon1.numConnections, 1); 00253 BOOST_CHECK_EQUAL(carbon2.numConnections, 1); 00254 BOOST_CHECK_EQUAL(carbon1.numDisconnections, 0); 00255 BOOST_CHECK_EQUAL(carbon2.numDisconnections, 0); 00256 BOOST_CHECK_EQUAL(carbon1.numDataMessages, 1); 00257 BOOST_CHECK_EQUAL(carbon2.numDataMessages, 1); 00258 BOOST_CHECK_EQUAL(carbon1.numErrorMessages, 0); 00259 BOOST_CHECK_EQUAL(carbon2.numErrorMessages, 0); 00260 00261 carbon1.shutdown(); 00262 00263 for (unsigned i = 0; i < 1000; ++i) { 00264 x.recordOccurrence("hit"); 00265 } 00266 00267 x.dump(); 00268 00269 ML::sleep(1.0); 00270 00271 carbon1.init(port1); 00272 00273 ML::sleep(10.0); 00274 00275 cerr << "done carbon connectors" << endl; 00276 }