RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* router_stack.cc 00002 Jeremy Barnes, 21 November 2012 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 RTB router code. 00006 */ 00007 00008 #include "router_stack.h" 00009 #include "rtbkit/core/banker/slave_banker.h" 00010 00011 using namespace std; 00012 using namespace ML; 00013 00014 00015 namespace RTBKIT { 00016 00017 00018 /*****************************************************************************/ 00019 /* ROUTER STACK */ 00020 /*****************************************************************************/ 00021 00022 RouterStack:: 00023 RouterStack(std::shared_ptr<ServiceProxies> services, 00024 const std::string & serviceName, 00025 double secondsUntilLossAssumed) 00026 : ServiceBase(serviceName, services), 00027 router(*this, "router", secondsUntilLossAssumed, 00028 false /* connect to post auction loop */), 00029 masterBanker(services, "masterBanker"), 00030 postAuctionLoop(*this, "postAuction"), 00031 config(services, "config"), 00032 monitor(services, "monitor"), 00033 initialized(false) 00034 { 00035 } 00036 00037 void 00038 RouterStack:: 00039 init() 00040 { 00041 ExcAssert(!initialized); 00042 00043 namespace p = std::placeholders; 00044 router.onSubmittedAuction = std::bind(&RouterStack::submitAuction, this, 00045 p::_1, p::_2, p::_3); 00046 config.init(); 00047 config.bindTcp(); 00048 config.start(); 00049 00050 masterBanker.init(std::make_shared<RedisBankerPersistence>(redis)); 00051 masterBanker.bindTcp(); 00052 masterBanker.start(); 00053 00054 budgetController.init(getServices()->config); 00055 budgetController.start(); 00056 00057 auto makeSlaveBanker = [=] (const std::string & name) 00058 { 00059 auto res = make_shared<SlaveBanker>( 00060 getZmqContext(), getServices()->config, name); 00061 res->start(); 00062 return res; 00063 }; 00064 00065 00066 // getServices()->config->dump(cerr); 00067 00068 postAuctionLoop.init(); 00069 postAuctionLoop.setBanker(makeSlaveBanker("postAuction")); 00070 postAuctionLoop.bindTcp(); 00071 00072 router.init(); 00073 router.setBanker(makeSlaveBanker("router")); 00074 router.bindTcp(); 00075 00076 monitor.init({"router", "postAuction", "masterBanker"}); 00077 monitor.bindTcp(); 00078 monitor.start(); 00079 00080 initialized = true; 00081 } 00082 00083 void 00084 RouterStack:: 00085 submitAuction(const std::shared_ptr<Auction> & auction, 00086 const Id & adSpotId, 00087 const Auction::Response & response) 00088 { 00089 const std::string& agentAugmentations = auction->agentAugmentations[response.agent]; 00090 postAuctionLoop.injectSubmittedAuction(auction->id, 00091 adSpotId, 00092 auction->request, 00093 auction->requestStr, 00094 auction->requestStrFormat, 00095 agentAugmentations, 00096 response, 00097 auction->lossAssumed); 00098 } 00099 00100 void 00101 RouterStack:: 00102 start(boost::function<void ()> onStop) 00103 { 00104 if (!initialized) 00105 init(); 00106 00107 //config.start(); 00108 postAuctionLoop.start(); 00109 router.start(onStop); 00110 } 00111 00112 void 00113 RouterStack:: 00114 sleepUntilIdle() 00115 { 00116 router.sleepUntilIdle(); 00117 } 00118 00119 void 00120 RouterStack:: 00121 shutdown() 00122 { 00123 router.shutdown(); 00124 postAuctionLoop.shutdown(); 00125 budgetController.shutdown(); 00126 masterBanker.shutdown(); 00127 config.shutdown(); 00128 monitor.shutdown(); 00129 } 00130 00131 size_t 00132 RouterStack:: 00133 numNonIdle() const 00134 { 00135 size_t numInFlight, numSubmitted, numAwaitingAugmentation; 00136 { 00137 numInFlight = router.inFlight.size(); 00138 numAwaitingAugmentation = router.augmentationLoop.numAugmenting(); 00139 numSubmitted = postAuctionLoop.numAwaitingWinLoss(); 00140 } 00141 00142 cerr << "numInFlight = " << numInFlight << endl; 00143 cerr << "numSubmitted = " << numSubmitted << endl; 00144 cerr << "numAwaitingAugmentation = " << numAwaitingAugmentation << endl; 00145 00146 size_t result = numInFlight + numSubmitted + numAwaitingAugmentation; 00147 return result; 00148 } 00149 00150 void 00151 RouterStack:: 00152 addBudget(const AccountKey & account, CurrencyPool amount) 00153 { 00154 ExcAssertEqual(account.size(), 1); 00155 budgetController.addBudgetSync(account[0], amount); 00156 } 00157 00158 void 00159 RouterStack:: 00160 setBudget(const AccountKey & account, CurrencyPool amount) 00161 { 00162 ExcAssertEqual(account.size(), 1); 00163 budgetController.setBudgetSync(account[0], amount); 00164 } 00165 00166 void 00167 RouterStack:: 00168 topupTransfer(const AccountKey & account, CurrencyPool amount) 00169 { 00170 budgetController.topupTransferSync(account, amount); 00171 } 00172 00173 } // namespace RTBKIT