RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* slave_banker_test.cc 00002 Jeremy Barnes, 8 November 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #define BOOST_TEST_MAIN 00008 #define BOOST_TEST_DYN_LINK 00009 #include <memory> 00010 #include <boost/test/unit_test.hpp> 00011 #include "jml/arch/format.h" 00012 #include "jml/arch/exception_handler.h" 00013 #include "jml/utils/guard.h" 00014 #include "rtbkit/core/banker/master_banker.h" 00015 #include "rtbkit/core/banker/slave_banker.h" 00016 #include "jml/utils/environment.h" 00017 #include "jml/arch/timers.h" 00018 #include "jml/utils/testing/watchdog.h" 00019 #include <future> 00020 #include <boost/thread/thread.hpp> 00021 #include "soa/service/testing/zookeeper_temporary_server.h" 00022 00023 using namespace std; 00024 using namespace ML; 00025 using namespace Datacratic; 00026 using namespace RTBKIT; 00027 00028 #if 1 00029 00030 BOOST_AUTO_TEST_CASE( test_master_slave_banker ) 00031 { 00032 ZooKeeper::TemporaryServer zookeeper; 00033 zookeeper.start(); 00034 00035 auto proxies = std::make_shared<ServiceProxies>(); 00036 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00037 00038 string bankerServiceName = "rtbBanker"; 00039 00040 MasterBanker master(proxies, bankerServiceName); 00041 master.init(make_shared<NoBankerPersistence>()); 00042 master.monitorProviderClient.inhibit_ = true; 00043 auto addr = master.bindTcp(); 00044 00045 cerr << "master banker is listening on " << addr.first << "," 00046 << addr.second << endl; 00047 00048 auto uri = proxies->getServiceClassInstances(bankerServiceName, "http"); 00049 std::cout << "reachable from " << uri << std::endl; 00050 00051 master.start(); 00052 00053 proxies->config->dump(cerr); 00054 00055 SlaveBudgetController slave; 00056 slave.init(proxies->config); 00057 slave.start(); 00058 slave.addAccountSync({"hello", "world"}); 00059 00060 BOOST_CHECK_THROW(slave.addAccountSync({"$$#$#Q@", " asdad0321 "}), 00061 std::exception); 00062 00063 slave.setBudgetSync("hello", USD(200)); 00064 00065 cerr << "finished adding account" << endl; 00066 00067 //slave.shutdown(); 00068 00069 SlaveBanker banker(proxies->zmqContext); 00070 banker.init(proxies->config, "slave"); 00071 banker.start(); 00072 banker.addSpendAccountSync({"hello", "world"}); 00073 00074 for (unsigned i = 0; i < 2; ++i) { 00075 ML::sleep(1.0); 00076 cerr << slave.getAccountSummarySync({"hello"}, 3) << endl; 00077 } 00078 00079 banker.shutdown(); 00080 } 00081 #endif 00082 00083 #if 1 00084 BOOST_AUTO_TEST_CASE( test_initialization_and_spending ) 00085 { 00086 /* We test that the initialization of an account works even when there 00087 was spending that occurred earlier on the account. 00088 00089 The specific situation is to make sure that we properly track spend 00090 that occurs *before* a banker can make an initial synchronization with 00091 the master banker. 00092 */ 00093 00094 ZooKeeper::TemporaryServer zookeeper; 00095 zookeeper.start(); 00096 00097 auto proxies = std::make_shared<ServiceProxies>(); 00098 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00099 00100 MasterBanker master(proxies); 00101 master.init(make_shared<NoBankerPersistence>()); 00102 master.monitorProviderClient.inhibit_ = true; 00103 auto addr = master.bindTcp(); 00104 cerr << "master banker is listening on " << addr.first << "," 00105 << addr.second << endl; 00106 00107 master.start(); 00108 00109 proxies->config->dump(cerr); 00110 00111 SlaveBudgetController slave; 00112 slave.init(proxies->config); 00113 slave.start(); 00114 slave.addAccountSync({"hello", "world"}); 00115 slave.setBudgetSync("hello", USD(200)); 00116 slave.topupTransferSync({"hello", "world"}, USD(20)); 00117 00118 // Record some spend in an initial slave 00119 { 00120 SlaveBanker banker(proxies->zmqContext); 00121 banker.init(proxies->config, "slave"); 00122 banker.start(); 00123 banker.addSpendAccountSync({"hello", "world"}); 00124 00125 // Spend $1 00126 banker.forceWinBid({"hello", "world"}, USD(1), LineItems()); 00127 00128 // Synchronize 00129 banker.syncAllSync(); 00130 00131 banker.shutdown(); 00132 } 00133 00134 // Check that the spend was recorded 00135 auto summ = slave.getAccountSummarySync({"hello", "world"}, 1 /* depth */); 00136 cerr << "after initial spend was recorded" << endl; 00137 cerr << summ << endl; 00138 BOOST_CHECK_EQUAL(summ.spent, USD(1)); 00139 00140 // Now asynchronously start up and record another dollar of spend 00141 { 00142 SlaveBanker banker(proxies->zmqContext); 00143 banker.init(proxies->config, "slave"); 00144 banker.start(); 00145 banker.addSpendAccountSync({"hello", "world"}); 00146 00147 // Spend $2 00148 banker.forceWinBid({"hello", "world"}, USD(2), LineItems()); 00149 00150 auto st = banker.getAccountStateDebug({"hello","world"}); 00151 cerr << "after forceWinBid" << endl; 00152 cerr << st << endl; 00153 00154 // Allow the synchronization to work 00155 ML::sleep(2.0); 00156 00157 st = banker.getAccountStateDebug({"hello","world"}); 00158 cerr << "after synchronization" << endl; 00159 cerr << st << endl; 00160 00161 // Synchronize 00162 banker.syncAllSync(); 00163 00164 banker.shutdown(); 00165 } 00166 00167 // Check that the spend was recorded 00168 cerr << "after second spend recorded" << endl; 00169 summ = slave.getAccountSummarySync({"hello", "world"}, 1 /* depth */); 00170 00171 cerr << summ << endl; 00172 00173 BOOST_CHECK_EQUAL(summ.spent, USD(3)); 00174 } 00175 #endif 00176 00177 #if 1 00178 BOOST_AUTO_TEST_CASE( test_bidding_with_slave ) 00179 { 00180 ZooKeeper::TemporaryServer zookeeper; 00181 zookeeper.start(); 00182 00183 auto proxies = std::make_shared<ServiceProxies>(); 00184 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00185 00186 MasterBanker master(proxies); 00187 master.init(make_shared<NoBankerPersistence>()); 00188 master.monitorProviderClient.inhibit_ = true; 00189 auto addr = master.bindTcp(); 00190 cerr << "master banker is listening on " << addr.first << "," 00191 << addr.second << endl; 00192 00193 master.start(); 00194 00195 AccountKey campaign("campaign"); 00196 AccountKey strategy("campaign:strategy"); 00197 00198 SlaveBudgetController slave; 00199 slave.init(proxies->config); 00200 slave.start(); 00201 00202 // Create a budget for the campaign 00203 slave.addAccountSync(strategy); 00204 slave.setBudgetSync(campaign.front(), USD(100)); 00205 00206 int nTopupThreads = 2; 00207 int nAddBudgetThreads = 2; 00208 int nBidThreads = 2; 00209 int nCommitThreads = 1; 00210 int numTransfersPerThread = 10000; 00211 int numAddBudgetsPerThread = 10; 00212 00213 volatile bool finished = false; 00214 00215 auto runTopupThread = [&] () 00216 { 00217 SlaveBudgetController slave; 00218 slave.init(proxies->config); 00219 slave.start(); 00220 00221 while (!finished) { 00222 slave.topupTransferSync(strategy, USD(2.00)); 00223 ML::sleep(1.0); 00224 } 00225 }; 00226 00227 auto runAddBudgetThread = [&] () 00228 { 00229 SlaveBudgetController slave; 00230 slave.init(proxies->config); 00231 slave.start(); 00232 00233 for (unsigned i = 0; i < numAddBudgetsPerThread; ++i) { 00234 slave.setBudgetSync(campaign.front(), USD(100 + i * 2)); 00235 00236 AccountSummary summary 00237 = slave.getAccountSummarySync(campaign, 3); 00238 cerr << summary << endl; 00239 00240 ML::sleep(1.0); 00241 } 00242 }; 00243 00244 uint64_t numBidsCommitted = 0; 00245 00246 ML::RingBufferSRMW<Amount> toCommitThread(10000); 00247 00248 00249 auto runBidThread = [&] (int threadNum) 00250 { 00251 SlaveBanker slave(proxies->zmqContext); 00252 slave.init(proxies->config, "bid" + to_string(threadNum)); 00253 slave.start(); 00254 00255 AccountKey account = strategy; 00256 00257 slave.addSpendAccountSync(account); 00258 00259 int done = 0; 00260 for (; !finished; ++done) { 00261 string item = "item"; 00262 00263 // Authorize 10 00264 if (!slave.authorizeBid(account, item, MicroUSD(1))) { 00265 ML::sleep(0.01); 00266 continue; 00267 } 00268 00269 // In half of the cases, we cancel. In the other half, we 00270 // transfer it off to the commit thread 00271 00272 if (done % 2 == 0) { 00273 // Commit 1 00274 slave.commitBid(account, item, MicroUSD(1), LineItems()); 00275 ML::atomic_inc(numBidsCommitted); 00276 } 00277 else { 00278 Amount amount = slave.detachBid(account, item); 00279 toCommitThread.push(amount); 00280 } 00281 } 00282 00283 cerr << "finished slave account with " 00284 << done << " bids" << endl; 00285 00286 slave.syncAllSync(); 00287 }; 00288 00289 auto runCommitThread = [&] (int threadNum) 00290 { 00291 SlaveBanker slave(proxies->zmqContext); 00292 slave.init(proxies->config, "commit" + to_string(threadNum)); 00293 slave.start(); 00294 00295 AccountKey account = strategy; 00296 slave.addSpendAccountSync(account); 00297 00298 while (!finished || toCommitThread.couldPop()) { 00299 Amount amount; 00300 if (toCommitThread.tryPop(amount, 0.1)) { 00301 00302 try { 00303 slave.attachBid(account, "item", amount); 00304 slave.commitBid(account, "item", MicroUSD(1), LineItems()); 00305 //slave.commitDetachedBid(account, amount, MicroUSD(1), LineItems()); 00306 } catch (...) { 00307 cerr << "commit detached " << amount << " from " 00308 << slave.getAccount(account) << endl; 00309 throw; 00310 } 00311 ML::atomic_inc(numBidsCommitted); 00312 } 00313 //slave.syncTo(master); 00314 } 00315 00316 //slave.syncTo(master); 00317 cerr << "done commit thread" << endl; 00318 00319 slave.syncAllSync(); 00320 }; 00321 00322 boost::thread_group budgetThreads; 00323 00324 for (unsigned i = 0; i < nAddBudgetThreads; ++i) 00325 budgetThreads.create_thread(runAddBudgetThread); 00326 00327 boost::thread_group bidThreads; 00328 for (unsigned i = 0; i < nBidThreads; ++i) 00329 bidThreads.create_thread(std::bind(runBidThread, i)); 00330 00331 for (unsigned i = 0; i < nTopupThreads; ++i) 00332 bidThreads.create_thread(runTopupThread); 00333 00334 for (unsigned i = 0; i < nCommitThreads; ++i) 00335 bidThreads.create_thread(std::bind(runCommitThread, i)); 00336 00337 00338 budgetThreads.join_all(); 00339 00340 finished = true; 00341 00342 bidThreads.join_all(); 00343 00344 uint32_t amountAdded = nAddBudgetThreads * numAddBudgetsPerThread; 00345 uint32_t amountTransferred = nTopupThreads * numTransfersPerThread; 00346 00347 cerr << "numBidsCommitted = " << numBidsCommitted << endl; 00348 cerr << "amountTransferred = " << amountTransferred << endl; 00349 cerr << "amountAdded = " << amountAdded << endl; 00350 00351 AccountSummary summary 00352 = slave.getAccountSummarySync(campaign, 3); 00353 cerr << summary << endl; 00354 00355 #if 0 00356 cerr << "campaign" << endl; 00357 cerr << master.getAccountSummary(campaign) << endl; 00358 cerr << master.getAccount(campaign) << endl; 00359 00360 cerr << "strategy" << endl; 00361 cerr << master.getAccountSummary(strategy) << endl; 00362 cerr << master.getAccount(strategy) << endl; 00363 #endif 00364 00365 #if 0 00366 RedisBanker banker("bankerTest", "b", s, redis); 00367 banker.sync(); 00368 Json::Value status = banker.getCampaignStatusJson("testCampaign", ""); 00369 00370 cerr << status << endl; 00371 00372 00373 00374 00375 BOOST_CHECK_EQUAL(status["available"]["micro-USD"].asInt(), 1000000 - amountTransferred + amountAdded); 00376 BOOST_CHECK_EQUAL(status["strategies"][0]["available"]["micro-USD"].asInt(), 00377 amountTransferred - numBidsCommitted); 00378 BOOST_CHECK_EQUAL(status["strategies"][0]["transferred"]["micro-USD"].asInt(), 00379 amountTransferred); 00380 BOOST_CHECK_EQUAL(status["strategies"][0]["spent"]["micro-USD"].asInt(), 00381 numBidsCommitted); 00382 BOOST_CHECK_EQUAL(status["spent"]["micro-USD"].asInt(), numBidsCommitted); 00383 00384 //BOOST_CHECK_EQUAL(status["available"]. 00385 #endif 00386 } 00387 #endif