RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/router_stack.cc
00001 /* router_stack.cc
00002    Jeremy Barnes, 21 November 2012
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    RTB router code.
00006 */
00007 
00008 #include "router_stack.h"
00009 #include "rtbkit/core/banker/slave_banker.h"
00010 
00011 using namespace std;
00012 using namespace ML;
00013 
00014 
00015 namespace RTBKIT {
00016 
00017 
00018 /*****************************************************************************/
00019 /* ROUTER STACK                                                              */
00020 /*****************************************************************************/
00021 
00022 RouterStack::
00023 RouterStack(std::shared_ptr<ServiceProxies> services,
00024             const std::string & serviceName,
00025             double secondsUntilLossAssumed)
00026     : ServiceBase(serviceName, services),
00027       router(*this, "router", secondsUntilLossAssumed,
00028              false /* connect to post auction loop */),
00029       masterBanker(services, "masterBanker"),
00030       postAuctionLoop(*this, "postAuction"),
00031       config(services, "config"),
00032       monitor(services, "monitor"),
00033       initialized(false)
00034 {
00035 }
00036 
00037 void
00038 RouterStack::
00039 init()
00040 {
00041     ExcAssert(!initialized);
00042 
00043     namespace p = std::placeholders;
00044     router.onSubmittedAuction = std::bind(&RouterStack::submitAuction, this,
00045                                           p::_1, p::_2, p::_3);
00046     config.init();
00047     config.bindTcp();
00048     config.start();
00049 
00050     masterBanker.init(std::make_shared<RedisBankerPersistence>(redis));
00051     masterBanker.bindTcp();
00052     masterBanker.start();
00053 
00054     budgetController.init(getServices()->config);
00055     budgetController.start();
00056 
00057     auto makeSlaveBanker = [=] (const std::string & name)
00058         {
00059             auto res = make_shared<SlaveBanker>(
00060                     getZmqContext(), getServices()->config, name);
00061             res->start();
00062             return res;
00063         };
00064 
00065  
00066     // getServices()->config->dump(cerr);
00067 
00068     postAuctionLoop.init();
00069     postAuctionLoop.setBanker(makeSlaveBanker("postAuction"));
00070     postAuctionLoop.bindTcp();
00071 
00072     router.init();
00073     router.setBanker(makeSlaveBanker("router"));
00074     router.bindTcp();
00075 
00076     monitor.init({"router", "postAuction", "masterBanker"});
00077     monitor.bindTcp();
00078     monitor.start();
00079 
00080     initialized = true;
00081 }
00082 
00083 void
00084 RouterStack::
00085 submitAuction(const std::shared_ptr<Auction> & auction,
00086               const Id & adSpotId,
00087               const Auction::Response & response)
00088 {
00089     const std::string& agentAugmentations = auction->agentAugmentations[response.agent];
00090     postAuctionLoop.injectSubmittedAuction(auction->id,
00091                                            adSpotId,
00092                                            auction->request,
00093                                            auction->requestStr,
00094                                            auction->requestStrFormat,
00095                                            agentAugmentations,
00096                                            response,
00097                                            auction->lossAssumed);
00098 }
00099 
00100 void
00101 RouterStack::
00102 start(boost::function<void ()> onStop)
00103 {
00104     if (!initialized)
00105         init();
00106 
00107     //config.start();
00108     postAuctionLoop.start();
00109     router.start(onStop);
00110 }
00111 
00112 void
00113 RouterStack::
00114 sleepUntilIdle()
00115 {
00116     router.sleepUntilIdle();
00117 }
00118     
00119 void
00120 RouterStack::
00121 shutdown()
00122 {
00123     router.shutdown();
00124     postAuctionLoop.shutdown();
00125     budgetController.shutdown();
00126     masterBanker.shutdown();
00127     config.shutdown();
00128     monitor.shutdown();
00129 }
00130 
00131 size_t
00132 RouterStack::
00133 numNonIdle() const
00134 {
00135     size_t numInFlight, numSubmitted, numAwaitingAugmentation;
00136     {
00137         numInFlight = router.inFlight.size();
00138         numAwaitingAugmentation = router.augmentationLoop.numAugmenting();
00139         numSubmitted = postAuctionLoop.numAwaitingWinLoss();
00140     }
00141 
00142     cerr << "numInFlight = " << numInFlight << endl;
00143     cerr << "numSubmitted = " << numSubmitted << endl;
00144     cerr << "numAwaitingAugmentation = " << numAwaitingAugmentation << endl;
00145 
00146     size_t result = numInFlight + numSubmitted + numAwaitingAugmentation;
00147     return result;
00148 }
00149 
00150 void
00151 RouterStack::
00152 addBudget(const AccountKey & account, CurrencyPool amount)
00153 {
00154     ExcAssertEqual(account.size(), 1);
00155     budgetController.addBudgetSync(account[0], amount);
00156 }
00157 
00158 void
00159 RouterStack::
00160 setBudget(const AccountKey & account, CurrencyPool amount)
00161 {
00162     ExcAssertEqual(account.size(), 1);
00163     budgetController.setBudgetSync(account[0], amount);
00164 }
00165 
00166 void
00167 RouterStack::
00168 topupTransfer(const AccountKey & account, CurrencyPool amount)
00169 {
00170     budgetController.topupTransferSync(account, amount);
00171 }
00172 
00173 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator