RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/testing/router_banker_test.cc
00001 /* router_banker_test.cc
00002    Sunil Rottoo, 4th April 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004    
00005    Test for the banker class when used in the router.
00006  */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include "jml/arch/format.h"
00013 #include "jml/arch/exception_handler.h"
00014 #include "jml/utils/guard.h"
00015 #include "rtbkit/plugins/bidding_agent/bidding_agent.h"
00016 #include "rtbkit/core/banker/master_banker.h"
00017 #include "rtbkit/core/banker/slave_banker.h"
00018 #include "rtbkit/common/auction.h"
00019 #include "rtbkit/core/router/router.h"
00020 #include "jml/utils/environment.h"
00021 #include "jml/arch/timers.h"
00022 #include <future>
00023 
00024 using namespace std;
00025 using namespace ML;
00026 using namespace Datacratic;
00027 using namespace RTBKIT;
00028 
00029 Env_Option<string> tmpDir("TMP", "./tmp");
00030 int numBidRequests;
00031 int numErrors;
00032 int numGotConfig;
00033 int numWins;
00034 int numLosses;
00035 int numNoBudgets;
00036 int numTooLates;
00037 int numAuctions;
00038 
00039 class RouterTester;
00040 
00041 struct TestAgent: public BiddingAgent {
00042     TestAgent(RouterTester &routerTester);
00043 
00044     // we only want to send requests only the router is configured
00045     future<bool> getConfigFuture()
00046     {
00047         return configPromise_.get_future();
00048     }
00049 
00050     future<bool> getCompleteFuture()
00051     {
00052         return completePromise_.get_future();
00053     }
00054 
00055     void setDefaultConfig()
00056     {
00057         AgentConfig config;
00058         config.campaign = "TestCampaign";
00059         config.strategy = "strategy1";
00060         config.maxInFlight = 20000;
00061         config.creatives.push_back(Creative::sampleLB);
00062         config.creatives.push_back(Creative::sampleWS);
00063         config.creatives.push_back(Creative::sampleBB);
00064         this->config = config;
00065     }
00066 
00067     void defaultError(double timestamp, const std::string & error,
00068             const std::vector<std::string> & message)
00069     {
00070          cerr << "agent got error: " << error << " from message: "
00071               << message << endl;
00072     }
00073 
00074     void defaultNeedConfig(double)
00075     {
00076         cerr << "need config" << endl;
00077     }
00078 
00079     void defaultGotConfig(double)
00080     {
00081         configPromise_.set_value(true);
00082         haveGotConfig = true;
00083     }
00084 
00085     void defaultAckHeartbeat(double)
00086     {
00087         cerr << "ack heartbeat" << endl;
00088         ++numHeartbeats;
00089     }
00090 
00091     void finishBid(const BiddingAgent::BidResultArgs & args);
00092 
00093     void step1Check();
00094     void step2Check();
00095     void step3Check();
00096 
00097     void defaultWin(const BiddingAgent::BidResultArgs & args)
00098     {
00099         Guard guard(lock);
00100         numWins++;
00101         //cerr << "Got a default win for bid " << args.auctionId << " num wins " << numWins <<endl;
00102         finishBid(args);
00103     }
00104 
00105     void injectWins();
00106 
00107     void defaultLoss(const BiddingAgent::BidResultArgs & args)
00108     {
00109         cerr << "default loss" << endl;
00110     }
00111 
00112     void defaultNoBudget(const BiddingAgent::BidResultArgs & args);
00113     void defaultBid(double timestamp, const Id & id,
00114             std::shared_ptr<BidRequest> br, const Json::Value & imp,
00115             double timeLeftMs);
00116 
00117     void setupCallbacks()
00118     {
00119         onError
00120             = boost::bind(&TestAgent::defaultError, this, _1, _2, _3);
00121         onNeedConfig
00122             = boost::bind(&TestAgent::defaultNeedConfig, this, _1);
00123         onGotConfig
00124             = boost::bind(&TestAgent::defaultGotConfig, this, _1);
00125         onAckHeartbeat
00126             = boost::bind(&TestAgent::defaultAckHeartbeat, this, _1);
00127          onWin
00128             = boost::bind(&TestAgent::defaultWin, this, _1);
00129         onLoss
00130             = boost::bind(&TestAgent::defaultLoss, this, _1);
00131         onNoBudget
00132             = boost::bind(&TestAgent::defaultNoBudget, this, _1);
00133         onBidRequest
00134              = boost::bind(&TestAgent::defaultBid, this, _1, _2, _3, _4, _5);
00135     }
00136 
00137     void configure()
00138     {
00139         doConfig(config.toJson());
00140     }
00141 
00142     void doBid(const Id & id, const Json::Value & response,
00143             const Json::Value & metadata)
00144     {
00145         if (response.size() != 0)
00146         {
00147             recordBid(id);
00148         }
00149         BiddingAgent::doBid(id, response, metadata);
00150     }
00151 
00152     void recordBid(const Id & id)
00153     {
00154         Guard guard(lock);
00155         if (!awaitingStatus.insert(id).second)
00156             throw ML::Exception("auction already in progress");
00157 
00158         numBidsOutstanding = awaitingStatus.size();
00159     }
00160 
00161     AgentConfig config;
00162     promise<bool> configPromise_;
00163     promise<bool> completePromise_;
00164     RouterTester &tester_;bool haveGotConfig;
00165     int numHeartbeats;
00166     int numBidRequests;
00167     int numErrors;
00168     int numGotConfig;
00169     int numWins;
00170     int numLosses;
00171     int numNoBudgets;
00172     int numTooLates;
00173     int numAuctions;
00174     int expectedWins_, expectedNoBudgets_;
00175     int totalRequests_;
00176     uint64_t bidPrice_;
00177     uint64_t winPrice_;
00178 
00179     unsigned int step_; // What is the step of the test plan
00180     typedef ML::Spinlock Lock;
00181     typedef boost::lock_guard<Lock> Guard;
00182     mutable Lock lock;
00183 
00184     std::set<Id> awaitingStatus;
00185     int numBidsOutstanding;
00186 
00187 };
00188 struct RouterTester {
00189 
00190     RouterTester(const std::string & agentUri, const std::string & agentName)
00191         : agentUri(agentUri), agentName(agentName),
00192           router(std::make_shared<ServiceProxies>(), "router"),
00193           agent(*this),campaign_("TestCampaign"),
00194           strategy_("strategy1"),numAuctionsDone_(0)
00195     {
00196         router.bindAgents(agentUri);
00197     }
00198 
00199     ~RouterTester()
00200     {
00201         shutdown();
00202     }
00203 
00204     void start()
00205     {
00206         agent.start(agentUri, agentName);
00207         router.start();
00208         agent.configure();
00209         future<bool> configFuture = agent.getConfigFuture();
00210         // Wait till the agent has registered - otherwise injection of bids will fail
00211         // router.enterSimulationMode();
00212 
00213         configFuture.get();
00214         // Set the budget for the test campaign of $10
00215         setBudget(campaign_, 10000000);
00216         // transfer $1 to strategy1
00217         // Since this is all asynchronous we want to make sure
00218         bool budgetSet = false;
00219         Json::Value result;
00220         unsigned int numTries = 0;
00221         while (!budgetSet && numTries < 10)
00222         {
00223             result = router.topupTransferSync({campaign_, strategy_}, 1000000);
00224             Json::Value strat = result["strategy"];
00225             if (strat["available"]["micro-USD"] == 1000000)
00226             {
00227                 cerr << "Amount has been transferred to strategy" << endl;
00228                 budgetSet = true;
00229                 break;
00230             }
00231             ML::sleep(0.1);
00232             numTries++;
00233         }
00234         if (!budgetSet)
00235             throw ML::Exception(
00236                     "Failed to transfer to strategy after 10 tries");
00237     }
00238 
00239     Json::Value setBudget(const std::string &campaign, uint64_t amountInMicros,
00240             unsigned int totalTries = 10)
00241     {
00242         bool budgetSet = false;
00243         Json::Value result;
00244         unsigned int numTries = 0;
00245         while (!budgetSet && numTries < totalTries)
00246         {
00247             // Since this is all being done asynchronous
00248             result = router.setBudget(campaign, amountInMicros);
00249             //cerr << "result = " << result << endl;
00250             if (result["available"]["micro-USD"] == 10000000)
00251             {
00252                 cerr << "budget is set to correct value " << endl;
00253                 budgetSet = true;
00254                 break;
00255             }
00256             ML::sleep(0.1);
00257             numTries++;
00258         }
00259         if (!budgetSet)
00260         {
00261             throw ML::Exception(
00262                     "Failed to set budget after "
00263                             + boost::lexical_cast<string>(totalTries)
00264                             + " tries");
00265         }
00266         return result;
00267     }
00268 
00269     void sleepUntilIdle()
00270     {
00271         router.sleepUntilIdle();
00272     }
00273 
00274     void shutdown()
00275     {
00276         router.shutdown();
00277         //sleepUntilIdle();
00278         agent.shutdown();
00279     }
00280 
00281     std::shared_ptr<Auction> createAuction(unsigned int id)
00282     {
00283         auto handleAuction = [&] (std::shared_ptr<Auction> auction)
00284         {
00285             ML::atomic_inc(numAuctionsDone_);
00286         };
00287 
00288         Date start = Date::now();
00289         Date expiry = start.plusSeconds(0.05);
00290         std::shared_ptr<BidRequest> request(new BidRequest());
00291         request->auctionId = Id(id);
00292         AdSpot spot1(request->auctionId);
00293         spot1.formats.push_back(Format(300, 250));
00294         request->imp.push_back(spot1);
00295         string current = request->toJsonStr();
00296         std::shared_ptr<Auction> auction(
00297                 new Auction(handleAuction, request, current, start, expiry,
00298                         id));
00299         return auction;
00300     }
00301 
00302     // This method will submit 11 auctions for which we expect 10 to pass and 1 to fail with
00303     // no budget.
00304     void runStep(unsigned int step, unsigned expectedWins,
00305             unsigned expectedNoBudgets, uint64_t bidPrice, uint64_t winPrice)
00306     {
00307         cerr << "Running step " << step << endl;
00308         agent.bidPrice_ = bidPrice;
00309         agent.winPrice_ = winPrice;
00310         agent.expectedWins_ += expectedWins;
00311         agent.expectedNoBudgets_ += expectedNoBudgets;
00312         agent.totalRequests_ += expectedWins + expectedNoBudgets ;
00313         cerr << "Total Requests: " << agent.totalRequests_ << endl;
00314         agent.step_ = step ;
00315         unsigned auctionStart = numAuctionsDone_;
00316         unsigned auctionEnd = auctionStart + expectedWins + expectedNoBudgets;
00317         cerr << "AuctionStart: " << auctionStart << " auctionEnd:" << auctionEnd
00318                 << endl;
00319         //-----------------------------------------------------------------------------------
00320         // we run this part in a separate thread because this function is being
00321         // called as part of a callback and we do not want to block the sender which we will
00322         // because of the numerous calls to sleep.
00323         // Please note that we capture by copy since this is run as a detached thread and
00324         // references to auctionStart and auctionEnd would cause problems
00325         //------------------------------------------------------------------------------------
00326         std::thread([this,auctionStart,auctionEnd]()
00327         {
00328             //cerr << "inject auction from thread " << std::this_thread::get_id() << endl;
00329                 cerr << "AuctionStart: " << auctionStart << " auctionEnd:" << auctionEnd << endl;
00330                 for (unsigned i = auctionStart; i < auctionEnd; ++i)
00331                 {
00332                     ML::sleep(0.1);
00333                     Date start = Date::now();
00334                     this->router.injectAuction(this->createAuction(i+1),start.secondsSinceEpoch() + 2.0);
00335                 }
00336             }).detach();
00337     }
00338 
00339     string agentUri;
00340     string agentName;
00341     Router router;
00342     TestAgent agent;
00343     string campaign_, strategy_;
00344     uint64_t numAuctionsDone_;
00345 };
00346 
00347 TestAgent::TestAgent(RouterTester &routerTester)
00348     :  BiddingAgent(routerTester.router.getZmqContext()),tester_(routerTester), numBidRequests(0), numErrors(0), numGotConfig(0),numWins(0),
00349       numLosses(0), numNoBudgets(0), numTooLates(0), numAuctions(0),expectedWins_(0),
00350       expectedNoBudgets_(0),totalRequests_(0)
00351 {
00352     setDefaultConfig();
00353     setupCallbacks();
00354 }
00355 
00356 void TestAgent::step1Check()
00357 {
00358     // Now check that value that the banker should have
00359     const Campaigns &theCampaigns
00360         = dynamic_cast<RedisBanker &>(*tester_.router.getBanker())
00361         .getCampaigns();
00362     Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign");
00363     BOOST_CHECK(cIt != theCampaigns.end());
00364     // Check that we have $9 available
00365     BOOST_CHECK_EQUAL(cIt->second.available_, USD(9));
00366     // Get the first strategy
00367     Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1");
00368     BOOST_CHECK(sIt1 != cIt->second.strategies_.end());
00369     BOOST_CHECK_EQUAL(sIt1->second.committed_, MicroUSD(0));
00370     BOOST_CHECK_EQUAL(sIt1->second.available_, MicroUSD(500000));
00371     BOOST_CHECK_EQUAL(sIt1->second.spent_, MicroUSD(500000));
00372 }
00373 
00374 void TestAgent::step2Check()
00375 {
00376     // Now check that value that the banker should have
00377     const Campaigns &theCampaigns
00378         = dynamic_cast<RedisBanker &>(*tester_.router.getBanker())
00379         .getCampaigns();
00380     Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign");
00381     BOOST_CHECK(cIt != theCampaigns.end());
00382     // Check that we have $9 available
00383     BOOST_CHECK_EQUAL(cIt->second.available_, USD(9));
00384     // Get the first strategy
00385     Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1");
00386     BOOST_CHECK(sIt1 != cIt->second.strategies_.end());
00387     BOOST_CHECK_EQUAL(sIt1->second.committed_, MicroUSD(0));
00388     BOOST_CHECK_EQUAL(sIt1->second.available_, MicroUSD(250000));
00389     BOOST_CHECK_EQUAL(sIt1->second.spent_,     MicroUSD(750000));
00390 }
00391 
00392 void TestAgent::defaultNoBudget(const BiddingAgent::BidResultArgs & args)
00393 {
00394     Guard guard(lock);
00395     numNoBudgets++;
00396     // Those that have no budgets we do not want to inject wins for
00397     awaitingStatus.erase(args.auctionId);
00398     if (step_ == 1)
00399     {
00400         injectWins();
00401     } else if (step_ == 2 && numNoBudgets == 6)
00402     {
00403         //cerr << "(step 2):we will inject wins for :" << endl;
00404         //copy(awaitingStatus.begin(), awaitingStatus.end(),ostream_iterator<Id>(cerr,","));
00405         //cerr << endl;
00406         injectWins();
00407     } else if (step_ == 3 && numNoBudgets == 7)
00408     {
00409         //cerr << "(step 2):we will inject wins for :" << endl;
00410         //copy(awaitingStatus.begin(), awaitingStatus.end(),ostream_iterator<Id>(cerr,","));
00411         //cerr << endl;
00412         injectWins();
00413     }
00414 }
00415 
00416 void TestAgent::injectWins()
00417 {
00418     for (auto it = awaitingStatus.begin(); it != awaitingStatus.end(); ++it)
00419     {
00420         UserIds userIds;
00421         Date now = Date::now();
00422         ML::sleep(0.2);
00423         //cerr << " We have received all bid requests. We can now schedule wins for all of them " << endl;
00424         //cerr << "Injecting win " << *it << " with spot it " << *it << endl;
00425         tester_.router.injectWin(Id(*it), Id(*it), MicroUSD(winPrice_),
00426                                  now.plusSeconds(0.05),
00427                                  Json::Value(), userIds,
00428                                  tester_.campaign_, tester_.strategy_,
00429                                  now.plusSeconds(0.15));
00430 
00431     }
00432     awaitingStatus.clear();
00433 }
00434 
00435 void TestAgent::step3Check()
00436 {
00437     // Now check that value that the banker should have
00438     const Campaigns &theCampaigns
00439         = dynamic_cast<RedisBanker &>(*tester_.router.getBanker())
00440         .getCampaigns();
00441     Campaigns::const_iterator cIt = theCampaigns.find("TestCampaign");
00442     BOOST_CHECK(cIt != theCampaigns.end());
00443     // Check that we have $9 available
00444     BOOST_CHECK_EQUAL(cIt->second.available_, USD(9));
00445     // Get the first strategy
00446     Strategies::const_iterator sIt1 = cIt->second.strategies_.find("strategy1");
00447     BOOST_CHECK(sIt1 != cIt->second.strategies_.end());
00448     BOOST_CHECK_EQUAL(sIt1->second.committed_, USD(0));
00449     BOOST_CHECK_EQUAL(sIt1->second.available_, USD(0));
00450     BOOST_CHECK_EQUAL(sIt1->second.spent_, MicroUSD(1000000));
00451 }
00452 
00453 void TestAgent::defaultBid(double timestamp, const Id & id,
00454         std::shared_ptr<BidRequest> br, const Json::Value & imp,
00455         double timeLeftMs)
00456 {
00457     Json::Value response;
00458     //cerr << "imp = " << imp << endl;
00459     response[0u]["creative"] = imp[0u]["creatives"][0u];
00460     response[0u]["price"] = bidPrice_; //100000000;
00461     response[0u]["surplus"] = 1;
00462 
00463     Json::Value metadata;
00464     doBid(id, response, metadata);
00465     ML::atomic_inc(numBidRequests);
00466 }
00467 
00468 void TestAgent::finishBid(const BiddingAgent::BidResultArgs & args)
00469 {
00470     // cerr << "finishBid for bid " << args.auctionId << endl;
00471     // If we have accounted for all requests we can signal
00472     if (numWins == expectedWins_ && numNoBudgets == expectedNoBudgets_)
00473     {
00474         if (step_ == 1)
00475         {
00476             step1Check();
00477             // Now run another series trying to run down the budget. We expect 5 wins and 5 no budgets
00478             tester_.runStep(2, 5, 5, bidPrice_, winPrice_);
00479         } else if (step_ == 2)
00480         {
00481             cerr << "Step 2 was finished " << endl;
00482             step2Check();
00483             // At the start of step 3 we have 250 000 micro $available we want to run this down
00484             // by submitting 100 bid requests with a bid price of 2500 micro$ and win price of 2500
00485             // Please note that we submit an additional request to test that we cannot increase the budget
00486             bidPrice_ = 2500000;
00487             winPrice_ = 2500;
00488             tester_.runStep(3, 100, 1, bidPrice_, winPrice_);
00489         } else
00490         {
00491             cerr << "Step 3 was finished " << endl;
00492             step3Check();
00493             // Now try to set the budget to something that is less than the amount spent
00494             // we expect this to fail after synchronously since this checked in the
00495             // calling thread. Therefore we should only need to do this once
00496             BOOST_CHECK_THROW(tester_.setBudget(tester_.campaign_, 900000, 1),
00497                     ML::Exception);
00498             completePromise_.set_value(true);
00499         }
00500     }
00501 }
00502 
00503 BOOST_AUTO_TEST_CASE( test_banker_via_router )
00504 {
00505     string campaignPrefix = "bankerTest";
00506 
00512     ML::set_default_trace_exceptions(false);
00513     std::map<Id, uint64_t> winNotifications;
00514     ML::Spinlock lock;
00515 
00516     RouterTester tester("inproc://rrat", "test");
00517     TestAgent & agent = tester.agent;
00518 
00519     std::shared_ptr<SlaveBanker> banker(new SlaveBanker(campaignPrefix, "banker", std::make_shared<ServiceProxies>(), redis));
00520     std::shared_ptr<SlaveBudgetController> budgetController
00521         (new SlaveBudgetController(campaignPrefix, redis));
00522     tester.router.setBanker(banker);
00523     tester.router.setBudgetController(budgetController);
00524 
00525     tester.start();
00526     unsigned expectedWins = 10;
00527     unsigned expectedNoBudgets = 1;
00528     uint64_t bidPrice = 100000000; // cpm in micro$ - this will be divided by 1000 by the router
00529     uint64_t winPrice = 50000; // micro $ as is
00530     tester.runStep(1, expectedWins, expectedNoBudgets, bidPrice, winPrice);
00531     // Now wait for all requests to arrive before exiting
00532     future<bool> completeFuture = agent.getCompleteFuture();
00533     completeFuture.get();
00534     cerr
00535             << "All pending requests have been processed ....shutting down(banker_test)"
00536             << endl;
00537     tester.shutdown();
00538 }
00539 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator