RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/testing/slave_banker_test.cc
00001 /* slave_banker_test.cc
00002    Jeremy Barnes, 8 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #define BOOST_TEST_MAIN
00008 #define BOOST_TEST_DYN_LINK
00009 #include <memory>
00010 #include <boost/test/unit_test.hpp>
00011 #include "jml/arch/format.h"
00012 #include "jml/arch/exception_handler.h"
00013 #include "jml/utils/guard.h"
00014 #include "rtbkit/core/banker/master_banker.h"
00015 #include "rtbkit/core/banker/slave_banker.h"
00016 #include "jml/utils/environment.h"
00017 #include "jml/arch/timers.h"
00018 #include "jml/utils/testing/watchdog.h"
00019 #include <future>
00020 #include <boost/thread/thread.hpp>
00021 #include "soa/service/testing/zookeeper_temporary_server.h"
00022 
00023 using namespace std;
00024 using namespace ML;
00025 using namespace Datacratic;
00026 using namespace RTBKIT;
00027 
00028 #if 1
00029 
00030 BOOST_AUTO_TEST_CASE( test_master_slave_banker )
00031 {
00032     ZooKeeper::TemporaryServer zookeeper;
00033     zookeeper.start();
00034 
00035     auto proxies = std::make_shared<ServiceProxies>();
00036     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00037 
00038     string bankerServiceName = "rtbBanker";
00039 
00040     MasterBanker master(proxies, bankerServiceName);
00041     master.init(make_shared<NoBankerPersistence>());
00042     master.monitorProviderClient.inhibit_ = true;
00043     auto addr = master.bindTcp();
00044 
00045     cerr << "master banker is listening on " << addr.first << ","
00046          << addr.second << endl;
00047 
00048     auto uri = proxies->getServiceClassInstances(bankerServiceName, "http");
00049     std::cout << "reachable from " << uri << std::endl;
00050 
00051     master.start();
00052 
00053     proxies->config->dump(cerr);
00054     
00055     SlaveBudgetController slave;
00056     slave.init(proxies->config);
00057     slave.start();
00058     slave.addAccountSync({"hello", "world"});
00059 
00060     BOOST_CHECK_THROW(slave.addAccountSync({"$$#$#Q@", "  asdad0321 "}),
00061                       std::exception);
00062 
00063     slave.setBudgetSync("hello", USD(200));
00064 
00065     cerr << "finished adding account" << endl;
00066 
00067     //slave.shutdown();
00068 
00069     SlaveBanker banker(proxies->zmqContext);
00070     banker.init(proxies->config, "slave");
00071     banker.start();
00072     banker.addSpendAccountSync({"hello", "world"});
00073 
00074     for (unsigned i = 0;  i < 2;  ++i) {
00075         ML::sleep(1.0);
00076         cerr << slave.getAccountSummarySync({"hello"}, 3) << endl;
00077     }
00078 
00079     banker.shutdown();
00080 }
00081 #endif
00082 
00083 #if 1
00084 BOOST_AUTO_TEST_CASE( test_initialization_and_spending )
00085 {
00086     /* We test that the initialization of an account works even when there
00087        was spending that occurred earlier on the account.
00088        
00089        The specific situation is to make sure that we properly track spend
00090        that occurs *before* a banker can make an initial synchronization with
00091        the master banker.
00092     */
00093 
00094     ZooKeeper::TemporaryServer zookeeper;
00095     zookeeper.start();
00096 
00097     auto proxies = std::make_shared<ServiceProxies>();
00098     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00099 
00100     MasterBanker master(proxies);
00101     master.init(make_shared<NoBankerPersistence>());
00102     master.monitorProviderClient.inhibit_ = true;
00103     auto addr = master.bindTcp();
00104     cerr << "master banker is listening on " << addr.first << ","
00105          << addr.second << endl;
00106 
00107     master.start();
00108 
00109     proxies->config->dump(cerr);
00110     
00111     SlaveBudgetController slave;
00112     slave.init(proxies->config);
00113     slave.start();
00114     slave.addAccountSync({"hello", "world"});
00115     slave.setBudgetSync("hello", USD(200));
00116     slave.topupTransferSync({"hello", "world"}, USD(20));
00117 
00118     // Record some spend in an initial slave
00119     {
00120         SlaveBanker banker(proxies->zmqContext);
00121         banker.init(proxies->config, "slave");
00122         banker.start();
00123         banker.addSpendAccountSync({"hello", "world"});
00124 
00125         // Spend $1
00126         banker.forceWinBid({"hello", "world"}, USD(1), LineItems());
00127 
00128         // Synchronize
00129         banker.syncAllSync();
00130         
00131         banker.shutdown();
00132     }
00133 
00134     // Check that the spend was recorded
00135     auto summ = slave.getAccountSummarySync({"hello", "world"}, 1 /* depth */);
00136     cerr << "after initial spend was recorded" << endl;
00137     cerr << summ << endl;
00138     BOOST_CHECK_EQUAL(summ.spent, USD(1));
00139 
00140     // Now asynchronously start up and record another dollar of spend
00141     {
00142         SlaveBanker banker(proxies->zmqContext);
00143         banker.init(proxies->config, "slave");
00144         banker.start();
00145         banker.addSpendAccountSync({"hello", "world"});
00146 
00147         // Spend $2
00148         banker.forceWinBid({"hello", "world"}, USD(2), LineItems());
00149         
00150         auto st = banker.getAccountStateDebug({"hello","world"});
00151         cerr << "after forceWinBid" << endl;
00152         cerr << st << endl;
00153 
00154         // Allow the synchronization to work
00155         ML::sleep(2.0);
00156 
00157         st = banker.getAccountStateDebug({"hello","world"});
00158         cerr << "after synchronization" << endl;
00159         cerr << st << endl;
00160 
00161         // Synchronize
00162         banker.syncAllSync();
00163         
00164         banker.shutdown();
00165     }
00166 
00167     // Check that the spend was recorded
00168     cerr << "after second spend recorded" << endl;
00169     summ = slave.getAccountSummarySync({"hello", "world"}, 1 /* depth */);
00170     
00171     cerr << summ << endl;
00172 
00173     BOOST_CHECK_EQUAL(summ.spent, USD(3));
00174 }
00175 #endif
00176 
00177 #if 1
00178 BOOST_AUTO_TEST_CASE( test_bidding_with_slave )
00179 {
00180     ZooKeeper::TemporaryServer zookeeper;
00181     zookeeper.start();
00182 
00183     auto proxies = std::make_shared<ServiceProxies>();
00184     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00185 
00186     MasterBanker master(proxies);
00187     master.init(make_shared<NoBankerPersistence>());
00188     master.monitorProviderClient.inhibit_ = true;
00189     auto addr = master.bindTcp();
00190     cerr << "master banker is listening on " << addr.first << ","
00191          << addr.second << endl;
00192 
00193     master.start();
00194 
00195     AccountKey campaign("campaign");
00196     AccountKey strategy("campaign:strategy");
00197 
00198     SlaveBudgetController slave;
00199     slave.init(proxies->config);
00200     slave.start();
00201 
00202     // Create a budget for the campaign
00203     slave.addAccountSync(strategy);
00204     slave.setBudgetSync(campaign.front(), USD(100));
00205 
00206     int nTopupThreads = 2;
00207     int nAddBudgetThreads = 2;
00208     int nBidThreads = 2; 
00209     int nCommitThreads = 1;
00210     int numTransfersPerThread = 10000;
00211     int numAddBudgetsPerThread = 10;
00212 
00213     volatile bool finished = false;
00214 
00215     auto runTopupThread = [&] ()
00216         {
00217             SlaveBudgetController slave;
00218             slave.init(proxies->config);
00219             slave.start();
00220 
00221             while (!finished) {
00222                 slave.topupTransferSync(strategy, USD(2.00));
00223                 ML::sleep(1.0);
00224             }
00225         };
00226 
00227     auto runAddBudgetThread = [&] ()
00228         {
00229             SlaveBudgetController slave;
00230             slave.init(proxies->config);
00231             slave.start();
00232             
00233             for (unsigned i = 0;  i < numAddBudgetsPerThread;  ++i) {
00234                 slave.setBudgetSync(campaign.front(), USD(100 + i * 2));
00235                 
00236                 AccountSummary summary
00237                     = slave.getAccountSummarySync(campaign, 3);
00238                 cerr << summary << endl;
00239 
00240                 ML::sleep(1.0);
00241             }
00242         };
00243 
00244     uint64_t numBidsCommitted = 0;
00245 
00246     ML::RingBufferSRMW<Amount> toCommitThread(10000);
00247     
00248 
00249     auto runBidThread = [&] (int threadNum)
00250         {
00251             SlaveBanker slave(proxies->zmqContext);
00252             slave.init(proxies->config, "bid" + to_string(threadNum));
00253             slave.start();
00254 
00255             AccountKey account = strategy;
00256 
00257             slave.addSpendAccountSync(account);
00258 
00259             int done = 0;
00260             for (;  !finished;  ++done) {
00261                 string item = "item";
00262 
00263                 // Authorize 10
00264                 if (!slave.authorizeBid(account, item, MicroUSD(1))) {
00265                     ML::sleep(0.01);
00266                     continue;
00267                 }
00268 
00269                 // In half of the cases, we cancel.  In the other half, we
00270                 // transfer it off to the commit thread
00271 
00272                 if (done % 2 == 0) {
00273                     // Commit 1
00274                     slave.commitBid(account, item, MicroUSD(1), LineItems());
00275                     ML::atomic_inc(numBidsCommitted);
00276                 }
00277                 else {
00278                     Amount amount = slave.detachBid(account, item);
00279                     toCommitThread.push(amount);
00280                 }
00281             }
00282 
00283             cerr << "finished slave account with "
00284                  << done << " bids" << endl;
00285 
00286             slave.syncAllSync();
00287         };
00288 
00289     auto runCommitThread = [&] (int threadNum)
00290         {
00291             SlaveBanker slave(proxies->zmqContext);
00292             slave.init(proxies->config, "commit" + to_string(threadNum));
00293             slave.start();
00294 
00295             AccountKey account = strategy;
00296             slave.addSpendAccountSync(account);
00297 
00298             while (!finished || toCommitThread.couldPop()) {
00299                 Amount amount;
00300                 if (toCommitThread.tryPop(amount, 0.1)) {
00301 
00302                     try {
00303                         slave.attachBid(account, "item", amount);
00304                         slave.commitBid(account, "item", MicroUSD(1), LineItems());
00305                         //slave.commitDetachedBid(account, amount, MicroUSD(1), LineItems());
00306                     } catch (...) {
00307                         cerr << "commit detached " << amount << " from "
00308                              << slave.getAccount(account) << endl;
00309                         throw;
00310                     }
00311                     ML::atomic_inc(numBidsCommitted);
00312                 }
00313                 //slave.syncTo(master);
00314             }
00315 
00316             //slave.syncTo(master);
00317             cerr << "done commit thread" << endl;
00318 
00319             slave.syncAllSync();
00320         };
00321 
00322     boost::thread_group budgetThreads;
00323 
00324     for (unsigned i = 0;  i < nAddBudgetThreads;  ++i)
00325         budgetThreads.create_thread(runAddBudgetThread);
00326 
00327     boost::thread_group bidThreads;
00328     for (unsigned i = 0;  i < nBidThreads;  ++i)
00329         bidThreads.create_thread(std::bind(runBidThread, i));
00330 
00331     for (unsigned i = 0;  i < nTopupThreads;  ++i)
00332         bidThreads.create_thread(runTopupThread);
00333 
00334     for (unsigned i = 0;  i < nCommitThreads;  ++i)
00335         bidThreads.create_thread(std::bind(runCommitThread, i));
00336     
00337 
00338     budgetThreads.join_all();
00339 
00340     finished = true;
00341 
00342     bidThreads.join_all();
00343 
00344     uint32_t amountAdded       = nAddBudgetThreads * numAddBudgetsPerThread;
00345     uint32_t amountTransferred = nTopupThreads * numTransfersPerThread;
00346 
00347     cerr << "numBidsCommitted = "  << numBidsCommitted << endl;
00348     cerr << "amountTransferred = " << amountTransferred << endl;
00349     cerr << "amountAdded =       " << amountAdded << endl;
00350     
00351     AccountSummary summary
00352         = slave.getAccountSummarySync(campaign, 3);
00353     cerr << summary << endl;
00354 
00355 #if 0
00356     cerr << "campaign" << endl;
00357     cerr << master.getAccountSummary(campaign) << endl;
00358     cerr << master.getAccount(campaign) << endl; 
00359 
00360     cerr << "strategy" << endl;
00361     cerr << master.getAccountSummary(strategy) << endl;
00362     cerr << master.getAccount(strategy) << endl; 
00363 #endif
00364 
00365 #if 0    
00366     RedisBanker banker("bankerTest", "b", s, redis);
00367     banker.sync();
00368     Json::Value status = banker.getCampaignStatusJson("testCampaign", "");
00369 
00370     cerr << status << endl;
00371 
00372 
00373 
00374 
00375     BOOST_CHECK_EQUAL(status["available"]["micro-USD"].asInt(), 1000000 - amountTransferred + amountAdded);
00376     BOOST_CHECK_EQUAL(status["strategies"][0]["available"]["micro-USD"].asInt(),
00377                       amountTransferred - numBidsCommitted);
00378     BOOST_CHECK_EQUAL(status["strategies"][0]["transferred"]["micro-USD"].asInt(),
00379                       amountTransferred);
00380     BOOST_CHECK_EQUAL(status["strategies"][0]["spent"]["micro-USD"].asInt(),
00381                       numBidsCommitted);
00382     BOOST_CHECK_EQUAL(status["spent"]["micro-USD"].asInt(), numBidsCommitted);
00383 
00384     //BOOST_CHECK_EQUAL(status["available"].
00385 #endif
00386 }
00387 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator