![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* router_banker_test.cc 00002 Sunil Rottoo, 4th April 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Test for the banker class when used in the router. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include "jml/arch/format.h" 00013 #include "jml/arch/exception_handler.h" 00014 #include "jml/utils/guard.h" 00015 #include "rtbkit/plugins/bidding_agent/bidding_agent.h" 00016 #include "rtbkit/core/banker/master_banker.h" 00017 #include "rtbkit/core/banker/slave_banker.h" 00018 #include "rtbkit/common/auction.h" 00019 #include "rtbkit/core/router/router.h" 00020 #include "jml/utils/environment.h" 00021 #include "jml/arch/timers.h" 00022 #include <future> 00023 00024 using namespace std; 00025 using namespace ML; 00026 using namespace Datacratic; 00027 using namespace RTBKIT; 00028 00029 Env_Option<string> tmpDir("TMP", "./tmp"); 00030 int numBidRequests; 00031 int numErrors; 00032 int numGotConfig; 00033 int numWins; 00034 int numLosses; 00035 int numNoBudgets; 00036 int numTooLates; 00037 int numAuctions; 00038 00039 class RouterTester; 00040 00041 struct TestAgent: public BiddingAgent { 00042 TestAgent(RouterTester &routerTester); 00043 00044 // we only want to send requests only the router is configured 00045 future<bool> getConfigFuture() 00046 { 00047 return configPromise_.get_future(); 00048 } 00049 00050 future<bool> getCompleteFuture() 00051 { 00052 return completePromise_.get_future(); 00053 } 00054 00055 void setDefaultConfig() 00056 { 00057 AgentConfig config; 00058 config.campaign = "TestCampaign"; 00059 config.strategy = "strategy1"; 00060 config.maxInFlight = 20000; 00061 config.creatives.push_back(Creative::sampleLB); 00062 config.creatives.push_back(Creative::sampleWS); 00063 config.creatives.push_back(Creative::sampleBB); 00064 this->config = config; 00065 } 00066 00067 void defaultError(double timestamp, const std::string & error, 00068 const std::vector<std::string> & message) 00069 { 00070 cerr << "agent got error: " << error << " from message: " 00071 << message << endl; 00072 } 00073 00074 void defaultNeedConfig(double) 00075 { 00076 cerr << "need config" << endl; 00077 } 00078 00079 void defaultGotConfig(double) 00080 { 00081 configPromise_.set_value(true); 00082 haveGotConfig = true; 00083 } 00084 00085 void defaultAckHeartbeat(double) 00086 { 00087 cerr << "ack heartbeat" << endl; 00088 ++numHeartbeats; 00089 } 00090 00091 void finishBid(const BiddingAgent::BidResultArgs & args); 00092 00093 void step1Check(); 00094 void step2Check(); 00095 void step3Check(); 00096 00097 void defaultWin(const BiddingAgent::BidResultArgs & args) 00098 { 00099 Guard guard(lock); 00100 numWins++; 00101 //cerr << "Got a default win for bid " << args.auctionId << " num wins " << numWins <<endl; 00102 finishBid(args); 00103 } 00104 00105 void injectWins(); 00106 00107 void defaultLoss(const BiddingAgent::BidResultArgs & args) 00108 { 00109 cerr << "default loss" << endl; 00110 } 00111 00112 void defaultNoBudget(const BiddingAgent::BidResultArgs & args); 00113 void defaultBid(double timestamp, const Id & id, 00114 std::shared_ptr<BidRequest> br, const Json::Value & imp, 00115 double timeLeftMs); 00116 00117 void setupCallbacks() 00118 { 00119 onError 00120 = boost::bind(&TestAgent::defaultError, this, _1, _2, _3); 00121 onNeedConfig 00122 = boost::bind(&TestAgent::defaultNeedConfig, this, _1); 00123 onGotConfig 00124 = boost::bind(&TestAgent::defaultGotConfig, this, _1); 00125 onAckHeartbeat 00126 = boost::bind(&TestAgent::defaultAckHeartbeat, this, _1); 00127 onWin 00128 = boost::bind(&TestAgent::defaultWin, this, _1); 00129 onLoss 00130 = boost::bind(&TestAgent::defaultLoss, this, _1); 00131 onNoBudget 00132 = boost::bind(&TestAgent::defaultNoBudget, this, _1); 00133 onBidRequest 00134 = boost::bind(&TestAgent::defaultBid, this, _1, _2, _3, _4, _5); 00135 } 00136 00137 void configure() 00138 { 00139 doConfig(config.toJson()); 00140 } 00141 00142 void doBid(const Id & id, const Json::Value & response, 00143 const Json::Value & metadata) 00144 { 00145 if (response.size() != 0) 00146 { 00147 recordBid(id); 00148 } 00149 BiddingAgent::doBid(id, response, metadata); 00150 } 00151 00152 void recordBid(const Id & id) 00153 { 00154 Guard guard(lock); 00155 if (!awaitingStatus.insert(id).second) 00156 throw ML::Exception("auction already in progress"); 00157 00158 numBidsOutstanding = awaitingStatus.size(); 00159 } 00160 00161 AgentConfig config; 00162 promise<bool> configPromise_; 00163 promise<bool> completePromise_; 00164 RouterTester &tester_;bool haveGotConfig; 00165 int numHeartbeats; 00166 int numBidRequests; 00167 int numErrors; 00168 int numGotConfig; 00169 int numWins; 00170 int numLosses; 00171 int numNoBudgets; 00172 int numTooLates; 00173 int numAuctions; 00174 int expectedWins_, expectedNoBudgets_; 00175 int totalRequests_; 00176 uint64_t bidPrice_; 00177 uint64_t winPrice_; 00178 00179 unsigned int step_; // What is the step of the test plan 00180 typedef ML::Spinlock Lock; 00181 typedef boost::lock_guard<Lock> Guard; 00182 mutable Lock lock; 00183 00184 std::set<Id> awaitingStatus; 00185 int numBidsOutstanding; 00186 00187 }; 00188 struct RouterTester { 00189 00190 RouterTester(const std::string & agentUri, const std::string & agentName) 00191 : agentUri(agentUri), agentName(agentName), 00192 router(std::make_shared<ServiceProxies>(), "router"), 00193 agent(*this),campaign_("TestCampaign"), 00194 strategy_("strategy1"),numAuctionsDone_(0) 00195 { 00196 router.bindAgents(agentUri); 00197 } 00198 00199 ~RouterTester() 00200 { 00201 shutdown(); 00202 } 00203 00204 void start() 00205 { 00206 agent.start(agentUri, agentName); 00207 router.start(); 00208 agent.configure(); 00209 future<bool> configFuture = agent.getConfigFuture(); 00210 // Wait till the agent has registered - otherwise injection of bids will fail 00211 // router.enterSimulationMode(); 00212 00213 configFuture.get(); 00214 // Set the budget for the test campaign of $10 00215 setBudget(campaign_, 10000000); 00216 // transfer $1 to strategy1 00217 // Since this is all asynchronous we want to make sure 00218 bool budgetSet = false; 00219 Json::Value result; 00220 unsigned int numTries = 0; 00221 while (!budgetSet && numTries < 10) 00222 { 00223 result = router.topupTransferSync({campaign_, strategy_}, 1000000); 00224 Json::Value strat = result["strategy"]; 00225 if (strat["available"]["micro-USD"] == 1000000) 00226 { 00227 cerr << "Amount has been transferred to strategy" << endl; 00228 budgetSet = true; 00229 break; 00230 } 00231 ML::sleep(0.1); 00232 numTries++; 00233 } 00234 if (!budgetSet) 00235 throw ML::Exception( 00236 "Failed to transfer to strategy after 10 tries"); 00237 } 00238 00239 Json::Value setBudget(const std::string &campaign, uint64_t amountInMicros, 00240 unsigned int totalTries = 10) 00241 { 00242 bool budgetSet = false; 00243 Json::Value result; 00244 unsigned int numTries = 0; 00245 while (!budgetSet && numTries < totalTries) 00246 { 00247 // Since this is all being done asynchronous 00248 result = router.setBudget(campaign, amountInMicros); 00249 //cerr << "result = " << result << endl; 00250 if (result["available"]["micro-USD"] == 10000000) 00251 { 00252 cerr << "budget is set to correct value " << endl; 00253 budgetSet = true; 00254 break; 00255 } 00256 ML::sleep(0.1); 00257 numTries++; 00258 } 00259 if (!budgetSet) 00260 { 00261 throw ML::Exception( 00262 "Failed to set budget after " 00263 + boost::lexical_cast<string>(totalTries) 00264 + " tries"); 00265 } 00266 return result; 00267 } 00268 00269 void sleepUntilIdle() 00270 { 00271 router.sleepUntilIdle(); 00272 } 00273 00274 void shutdown() 00275 { 00276 router.shutdown(); 00277 //sleepUntilIdle(); 00278 agent.shutdown(); 00279 } 00280 00281 std::shared_ptr<Auction> createAuction(unsigned int id) 00282 { 00283 auto handleAuction = [&] (std::shared_ptr<Auction> auction) 00284 { 00285 ML::atomic_inc(numAuctionsDone_); 00286 }; 00287 00288 Date start = Date::now(); 00289 Date expiry = start.plusSeconds(0.05); 00290 std::shared_ptr<BidRequest> request(new BidRequest()); 00291 request->auctionId = Id(id); 00292 AdSpot spot1(request->auctionId); 00293 spot1.formats.push_back(Format(300, 250)); 00294 request->imp.push_back(spot1); 00295 string current = request->toJsonStr(); 00296 std::shared_ptr<Auction> auction( 00297 new Auction(handleAuction, request, current, start, expiry, 00298 id)); 00299 return auction; 00300 } 00301 00302 // This method will submit 11 auctions for which we expect 10 to pass and 1 to fail with 00303 // no budget. 00304 void runStep(unsigned int step, unsigned expectedWins, 00305 unsigned expectedNoBudgets, uint64_t bidPrice, uint64_t winPrice) 00306 { 00307 cerr << "Running step " << step << endl; 00308 agent.bidPrice_ = bidPrice; 00309 agent.winPrice_ = winPrice; 00310 agent.expectedWins_ += expectedWins; 00311 agent.expectedNoBudgets_ += expectedNoBudgets; 00312 agent.totalRequests_ += expectedWins + expectedNoBudgets ; 00313 cerr << "Total Requests: " << agent.totalRequests_ << endl; 00314 agent.step_ = step ; 00315 unsigned auctionStart = numAuctionsDone_; 00316 unsigned auctionEnd = auctionStart + expectedWins + expectedNoBudgets; 00317 cerr << "AuctionStart: " << auctionStart << " auctionEnd:" << auctionEnd 00318 << endl; 00319 //----------------------------------------------------------------------------------- 00320 // we run this part in a separate thread because this function is being 00321 // called as part of a callback and we do not want to block the sender which we will 00322 // because of the numerous calls to sleep. 00323 // Please note that we capture by copy since this is run as a detached thread and 00324 // references to auctionStart and auctionEnd would cause problems 00325 //------------------------------------------------------------------------------------ 00326 std::thread([this,auctionStart,auctionEnd]() 00327 { 00328 //cerr << "inject auction from thread " << std::this_thread::get_id() << endl; 00329 cerr << "AuctionStart: " << auctionStart << " auctionEnd:" << auctionEnd << endl; 00330 for (unsigned i = auctionStart; i < auctionEnd; ++i) 00331 { 00332 ML::sleep(0.1); 00333 Date start = Date::now(); 00334 this->router.injectAuction(this->createAuction(i+1),start.secondsSinceEpoch() + 2.0); 00335 } 00336 }).detach(); 00337 } 00338 00339 string agentUri; 00340 string agentName; 00341 Router router; 00342 TestAgent agent; 00343 string campaign_, strategy_; 00344 uint64_t numAuctionsDone_; 00345 }; 00346 00347 TestAgent::TestAgent(RouterTester &routerTester) 00348 : BiddingAgent(routerTester.router.getZmqContext()),tester_(routerTester), numBidRequests(0), numErrors(0), numGotConfig(0),numWins(0), 00349 numLosses(0), numNoBudgets(0), numTooLates(0), numAuctions(0),expectedWins_(0), 00350 expectedNoBudgets_(0),totalRequests_(0) 00351 { 00352 setDefaultConfig(); 00353 setupCallbacks(); 00354 } 00355 00356 void TestAgent::step1Check() 00357 { 00358 // Now check that value that the banker should have 00359 const Campaigns &theCampaigns 00360 = dynamic_cast<RedisBanker &>(*tester_.router.getBanker()) 00361 .getCampaigns(); 00362 Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign"); 00363 BOOST_CHECK(cIt != theCampaigns.end()); 00364 // Check that we have $9 available 00365 BOOST_CHECK_EQUAL(cIt->second.available_, USD(9)); 00366 // Get the first strategy 00367 Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1"); 00368 BOOST_CHECK(sIt1 != cIt->second.strategies_.end()); 00369 BOOST_CHECK_EQUAL(sIt1->second.committed_, MicroUSD(0)); 00370 BOOST_CHECK_EQUAL(sIt1->second.available_, MicroUSD(500000)); 00371 BOOST_CHECK_EQUAL(sIt1->second.spent_, MicroUSD(500000)); 00372 } 00373 00374 void TestAgent::step2Check() 00375 { 00376 // Now check that value that the banker should have 00377 const Campaigns &theCampaigns 00378 = dynamic_cast<RedisBanker &>(*tester_.router.getBanker()) 00379 .getCampaigns(); 00380 Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign"); 00381 BOOST_CHECK(cIt != theCampaigns.end()); 00382 // Check that we have $9 available 00383 BOOST_CHECK_EQUAL(cIt->second.available_, USD(9)); 00384 // Get the first strategy 00385 Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1"); 00386 BOOST_CHECK(sIt1 != cIt->second.strategies_.end()); 00387 BOOST_CHECK_EQUAL(sIt1->second.committed_, MicroUSD(0)); 00388 BOOST_CHECK_EQUAL(sIt1->second.available_, MicroUSD(250000)); 00389 BOOST_CHECK_EQUAL(sIt1->second.spent_, MicroUSD(750000)); 00390 } 00391 00392 void TestAgent::defaultNoBudget(const BiddingAgent::BidResultArgs & args) 00393 { 00394 Guard guard(lock); 00395 numNoBudgets++; 00396 // Those that have no budgets we do not want to inject wins for 00397 awaitingStatus.erase(args.auctionId); 00398 if (step_ == 1) 00399 { 00400 injectWins(); 00401 } else if (step_ == 2 && numNoBudgets == 6) 00402 { 00403 //cerr << "(step 2):we will inject wins for :" << endl; 00404 //copy(awaitingStatus.begin(), awaitingStatus.end(),ostream_iterator<Id>(cerr,",")); 00405 //cerr << endl; 00406 injectWins(); 00407 } else if (step_ == 3 && numNoBudgets == 7) 00408 { 00409 //cerr << "(step 2):we will inject wins for :" << endl; 00410 //copy(awaitingStatus.begin(), awaitingStatus.end(),ostream_iterator<Id>(cerr,",")); 00411 //cerr << endl; 00412 injectWins(); 00413 } 00414 } 00415 00416 void TestAgent::injectWins() 00417 { 00418 for (auto it = awaitingStatus.begin(); it != awaitingStatus.end(); ++it) 00419 { 00420 UserIds userIds; 00421 Date now = Date::now(); 00422 ML::sleep(0.2); 00423 //cerr << " We have received all bid requests. We can now schedule wins for all of them " << endl; 00424 //cerr << "Injecting win " << *it << " with spot it " << *it << endl; 00425 tester_.router.injectWin(Id(*it), Id(*it), MicroUSD(winPrice_), 00426 now.plusSeconds(0.05), 00427 Json::Value(), userIds, 00428 tester_.campaign_, tester_.strategy_, 00429 now.plusSeconds(0.15)); 00430 00431 } 00432 awaitingStatus.clear(); 00433 } 00434 00435 void TestAgent::step3Check() 00436 { 00437 // Now check that value that the banker should have 00438 const Campaigns &theCampaigns 00439 = dynamic_cast<RedisBanker &>(*tester_.router.getBanker()) 00440 .getCampaigns(); 00441 Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign"); 00442 BOOST_CHECK(cIt != theCampaigns.end()); 00443 // Check that we have $9 available 00444 BOOST_CHECK_EQUAL(cIt->second.available_, USD(9)); 00445 // Get the first strategy 00446 Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1"); 00447 BOOST_CHECK(sIt1 != cIt->second.strategies_.end()); 00448 BOOST_CHECK_EQUAL(sIt1->second.committed_, USD(0)); 00449 BOOST_CHECK_EQUAL(sIt1->second.available_, USD(0)); 00450 BOOST_CHECK_EQUAL(sIt1->second.spent_, MicroUSD(1000000)); 00451 } 00452 00453 void TestAgent::defaultBid(double timestamp, const Id & id, 00454 std::shared_ptr<BidRequest> br, const Json::Value & imp, 00455 double timeLeftMs) 00456 { 00457 Json::Value response; 00458 //cerr << "imp = " << imp << endl; 00459 response[0u]["creative"] = imp[0u]["creatives"][0u]; 00460 response[0u]["price"] = bidPrice_; //100000000; 00461 response[0u]["surplus"] = 1; 00462 00463 Json::Value metadata; 00464 doBid(id, response, metadata); 00465 ML::atomic_inc(numBidRequests); 00466 } 00467 00468 void TestAgent::finishBid(const BiddingAgent::BidResultArgs & args) 00469 { 00470 // cerr << "finishBid for bid " << args.auctionId << endl; 00471 // If we have accounted for all requests we can signal 00472 if (numWins == expectedWins_ && numNoBudgets == expectedNoBudgets_) 00473 { 00474 if (step_ == 1) 00475 { 00476 step1Check(); 00477 // Now run another series trying to run down the budget. We expect 5 wins and 5 no budgets 00478 tester_.runStep(2, 5, 5, bidPrice_, winPrice_); 00479 } else if (step_ == 2) 00480 { 00481 cerr << "Step 2 was finished " << endl; 00482 step2Check(); 00483 // At the start of step 3 we have 250 000 micro $available we want to run this down 00484 // by submitting 100 bid requests with a bid price of 2500 micro$ and win price of 2500 00485 // Please note that we submit an additional request to test that we cannot increase the budget 00486 bidPrice_ = 2500000; 00487 winPrice_ = 2500; 00488 tester_.runStep(3, 100, 1, bidPrice_, winPrice_); 00489 } else 00490 { 00491 cerr << "Step 3 was finished " << endl; 00492 step3Check(); 00493 // Now try to set the budget to something that is less than the amount spent 00494 // we expect this to fail after synchronously since this checked in the 00495 // calling thread. Therefore we should only need to do this once 00496 BOOST_CHECK_THROW(tester_.setBudget(tester_.campaign_, 900000, 1), 00497 ML::Exception); 00498 completePromise_.set_value(true); 00499 } 00500 } 00501 } 00502 00503 BOOST_AUTO_TEST_CASE( test_banker_via_router ) 00504 { 00505 string campaignPrefix = "bankerTest"; 00506 00512 ML::set_default_trace_exceptions(false); 00513 std::map<Id, uint64_t> winNotifications; 00514 ML::Spinlock lock; 00515 00516 RouterTester tester("inproc://rrat", "test"); 00517 TestAgent & agent = tester.agent; 00518 00519 std::shared_ptr<SlaveBanker> banker(new SlaveBanker(campaignPrefix, "banker", std::make_shared<ServiceProxies>(), redis)); 00520 std::shared_ptr<SlaveBudgetController> budgetController 00521 (new SlaveBudgetController(campaignPrefix, redis)); 00522 tester.router.setBanker(banker); 00523 tester.router.setBudgetController(budgetController); 00524 00525 tester.start(); 00526 unsigned expectedWins = 10; 00527 unsigned expectedNoBudgets = 1; 00528 uint64_t bidPrice = 100000000; // cpm in micro$ - this will be divided by 1000 by the router 00529 uint64_t winPrice = 50000; // micro $ as is 00530 tester.runStep(1, expectedWins, expectedNoBudgets, bidPrice, winPrice); 00531 // Now wait for all requests to arrive before exiting 00532 future<bool> completeFuture = agent.getCompleteFuture(); 00533 completeFuture.get(); 00534 cerr 00535 << "All pending requests have been processed ....shutting down(banker_test)" 00536 << endl; 00537 tester.shutdown(); 00538 } 00539