RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/carbon_connector_test.cc
00001 /* graphite_connector_test.cc
00002    Jeremy Barnes, 3 August 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Test for the carbon connector.
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include "soa/service/carbon_connector.h"
00013 #include "jml/arch/atomic_ops.h"
00014 #include "jml/arch/timers.h"
00015 #include "soa/service/passive_endpoint.h"
00016 #include <boost/make_shared.hpp>
00017 
00018 
00019 using namespace std;
00020 using namespace Datacratic;
00021 using namespace ML;
00022 
00023 
00024 BOOST_AUTO_TEST_CASE( test_counter_aggregator )
00025 {
00026     // We record events to aggregate from multiple threads with simultaneous
00027     // occasional resets of the counter and we make sure that no events are
00028     // lost.
00029 
00030     cerr << "counter aggregator" << endl;
00031 
00032     CounterAggregator aggregator;
00033 
00034     uint64_t nthreads = 8, iter = 100000;
00035     boost::barrier barrier(nthreads);
00036     uint64_t total = 0;
00037     boost::thread_group tg;
00038     for (unsigned i = 0;  i < nthreads;  ++i) {
00039         auto doThread = [&] ()
00040             {
00041                 barrier.wait();
00042 
00043                 for (unsigned i = 0;  i < iter;  ++i) {
00044                     aggregator.record(1.0);
00045 
00046                     if (random() % 100 == 0) {
00047                         double val = aggregator.reset().first;
00048                         uint64_t val2 = val;
00049 
00050                         //cerr << "val2 = " << val2 << endl;
00051 
00052                         //BOOST_CHECK_EQUAL(val, val2);
00053 
00054                         atomic_add(total, val2);
00055                     }
00056                 }
00057             };
00058         
00059         tg.create_thread(doThread);
00060     }
00061 
00062     tg.join_all();
00063 
00064     double val = aggregator.reset().first;
00065     uint64_t val2 = val;
00066 
00067     //cerr << "val2 = " << val2 << endl;
00068 
00069     BOOST_CHECK_EQUAL(val, val2);
00070     atomic_add(total, val2);
00071 
00072     BOOST_CHECK_EQUAL(total, iter * nthreads);
00073 }
00074 
00075 BOOST_AUTO_TEST_CASE( test_gauge_aggregator )
00076 {
00077     // We record events to aggregate from multiple threads with simultaneous
00078     // occasional resets of the counter and we make sure that no events are
00079     // lost.
00080 
00081     cerr << "gauge aggregator" << endl;
00082 
00083     GaugeAggregator aggregator;
00084 
00085     uint64_t nthreads = 8, iter = 100000;
00086     boost::barrier barrier(nthreads);
00087     boost::thread_group tg;
00088 
00089     boost::mutex mutex;
00090 
00091     ML::distribution<float> allValues;
00092 
00093     for (unsigned i = 0;  i < nthreads;  ++i) {
00094         auto doThread = [&] ()
00095             {
00096                 ML::distribution<float> threadValues;
00097 
00098                 barrier.wait();
00099 
00100                 for (unsigned i = 0;  i < iter;  ++i) {
00101                     aggregator.record(1.0 + (i % 2));
00102 
00103                     if (random() % 1000 == 0) {
00104                         ML::distribution<float> * values
00105                             = aggregator.reset().first;
00106                        
00107                         threadValues.insert(threadValues.end(),
00108                                             values->begin(), values->end());
00109 
00110                         delete values;
00111                     }
00112                 }
00113                 
00114                 boost::lock_guard<boost::mutex> lock(mutex);
00115                 allValues.insert(allValues.end(),
00116                                  threadValues.begin(), threadValues.end());
00117             };
00118         
00119         tg.create_thread(doThread);
00120     }
00121 
00122     tg.join_all();
00123 
00124     ML::distribution<float> * values
00125         = aggregator.reset().first;
00126     
00127     allValues.insert(allValues.end(),
00128                      values->begin(), values->end());
00129     std::sort(allValues.begin(), allValues.end());
00130 
00131     BOOST_CHECK_EQUAL(allValues.size(), iter * nthreads);
00132     BOOST_CHECK_EQUAL(allValues.mean(), 1.5);
00133     BOOST_CHECK_EQUAL(allValues.std(), 0.5);
00134 }
00135 
00136 BOOST_AUTO_TEST_CASE( test_multi_aggregator )
00137 {
00138     std::vector<StatReading> readings;
00139 
00140     boost::mutex m;
00141     m.lock();
00142 
00143     auto recordReading = [&] (const std::vector<StatReading> & stats)
00144         {
00145             cerr << "got reading of " << stats.size() << " counters"
00146                  << endl;
00147             for (unsigned i = 0;  i < stats.size();  ++i)
00148                 cerr << stats[i].name << " " << stats[i].value << endl;
00149             readings.insert(readings.end(), stats.begin(), stats.end());
00150 
00151             m.unlock();
00152         };
00153 
00154     MultiAggregator agg("hello", recordReading, 0.0);
00155 
00156     for (unsigned i = 0;  i <= 100;  ++i)
00157         agg.recordLevel("bonus", i);
00158 
00159     agg.dump();
00160 
00161     // agg.dump() will eventually call recordReading from another thread,
00162     // which will unlock m when it's finished, allowing this code to
00163     // proceed.
00164 
00165     m.lock();
00166 
00167     BOOST_REQUIRE_EQUAL(readings.size(), 8);
00168     BOOST_CHECK_EQUAL(readings[0].name, "bonus.mean");
00169     BOOST_CHECK_EQUAL(readings[0].value, 50.0);
00170 }
00171 
00172 struct FakeCarbon : public PassiveEndpointT<SocketTransport> {
00173 
00174     FakeCarbon()
00175         : PassiveEndpointT<SocketTransport>("Carbon"),
00176           numConnections(0), numDisconnections(0),
00177           numDataMessages(0), numErrorMessages(0)
00178     {
00179     }
00180 
00181     struct CarbonConnection: public PassiveConnectionHandler {
00182 
00183         CarbonConnection(FakeCarbon * owner)
00184             : owner(owner)
00185         {
00186         }
00187 
00188         FakeCarbon * owner;
00189 
00191         virtual void handleData(const std::string & data)
00192         {
00193             cerr << "got data " << data << endl;
00194             ML::atomic_inc(owner->numDataMessages);
00195         }
00196     
00198         virtual void handleError(const std::string & message)
00199         {
00200             cerr << "got error " << message << endl;
00201             ML::atomic_inc(owner->numErrorMessages);
00202         }
00203 
00204         virtual void onGotTransport()
00205         {
00206             cerr << "on got transport" << endl;
00207             startReading();
00208         }
00209 
00210         virtual void handleDisconnect()
00211         {
00212             cerr << "got disconnection" << endl;
00213             ML::atomic_inc(owner->numDisconnections);
00214             closeWhenHandlerFinished();
00215         }
00216     };
00217 
00218     virtual std::shared_ptr<ConnectionHandler>
00219     makeNewHandler()
00220     {
00221         ML::atomic_inc(numConnections);
00222         cerr << "got new connection" << endl;
00223         return std::make_shared<CarbonConnection>(this);
00224     }
00225 
00226     int numConnections, numDisconnections, numDataMessages, numErrorMessages;
00227 };
00228 
00229 BOOST_AUTO_TEST_CASE( test_multiple_carbon_connectors )
00230 {
00231     FakeCarbon carbon1, carbon2;
00232     int port1 = carbon1.init();
00233     int port2 = carbon2.init();
00234 
00235     string addr1 = ML::format("localhost:%d", port1);
00236     string addr2 = ML::format("localhost:%d", port2);
00237 
00238     cerr << "fake carbons are on " << port1 << " and " << port2 << endl;
00239 
00240     vector<string> addrs({ addr1, addr2 });
00241 
00242     CarbonConnector x(addrs, "test");
00243 
00244     for (unsigned i = 0;  i < 1000;  ++i) {
00245         x.recordOccurrence("hit");
00246     }
00247 
00248     x.dump();
00249 
00250     ML::sleep(0.5);
00251 
00252     BOOST_CHECK_EQUAL(carbon1.numConnections, 1);
00253     BOOST_CHECK_EQUAL(carbon2.numConnections, 1);
00254     BOOST_CHECK_EQUAL(carbon1.numDisconnections, 0);
00255     BOOST_CHECK_EQUAL(carbon2.numDisconnections, 0);
00256     BOOST_CHECK_EQUAL(carbon1.numDataMessages, 1);
00257     BOOST_CHECK_EQUAL(carbon2.numDataMessages, 1);
00258     BOOST_CHECK_EQUAL(carbon1.numErrorMessages, 0);
00259     BOOST_CHECK_EQUAL(carbon2.numErrorMessages, 0);
00260     
00261     carbon1.shutdown();
00262 
00263     for (unsigned i = 0;  i < 1000;  ++i) {
00264         x.recordOccurrence("hit");
00265     }
00266     
00267     x.dump();
00268 
00269     ML::sleep(1.0);
00270 
00271     carbon1.init(port1);
00272 
00273     ML::sleep(10.0);
00274 
00275     cerr << "done carbon connectors" << endl;
00276 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator