![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* router_integration_test.cc 00002 Jeremy Barnes, 21 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Overall integration test for the router stack. 00006 */ 00007 00008 #include "augmentor_ex.h" 00009 00010 #include "rtbkit/core/router/router.h" 00011 #include "rtbkit/core/post_auction/post_auction_loop.h" 00012 #include "rtbkit/core/agent_configuration/agent_configuration_service.h" 00013 #include "rtbkit/core/banker/master_banker.h" 00014 #include "rtbkit/core/banker/slave_banker.h" 00015 #include "rtbkit/core/monitor/monitor_endpoint.h" 00016 #include "jml/utils/rng.h" 00017 #include "jml/utils/pair_utils.h" 00018 #include "jml/utils/environment.h" 00019 #include "jml/arch/timers.h" 00020 #include "soa/service/testing/redis_temporary_server.h" 00021 #include "testing/generic_exchange_connector.h" 00022 #include "testing/mock_exchange.h" 00023 #include "rtbkit/testing/test_agent.h" 00024 #include "rtbkit/examples/mock_exchange_connector.h" 00025 #include "rtbkit/examples/mock_ad_server_connector.h" 00026 #include <boost/thread.hpp> 00027 #include <netdb.h> 00028 #include <memory> 00029 00030 00031 using namespace std; 00032 using namespace ML; 00033 using namespace Redis; 00034 using namespace Datacratic; 00035 using namespace RTBKIT; 00036 00037 00038 /******************************************************************************/ 00039 /* COMPONENTS */ 00040 /******************************************************************************/ 00041 00043 struct Components 00044 { 00045 00046 std::shared_ptr<ServiceProxies> proxies; 00047 00048 // See init for an inline description of the various components. 00049 00050 RedisTemporaryServer redis; 00051 Router router1, router2; 00052 MockAdServerConnector winStream; 00053 PostAuctionLoop postAuctionLoop; 00054 MasterBanker masterBanker; 00055 SlaveBudgetController budgetController; 00056 AgentConfigurationService agentConfig; 00057 MonitorEndpoint monitor; 00058 TestAgent agent; 00059 FrequencyCapAugmentor augmentor; 00060 00061 // \todo Add a PAL event subscriber. 00062 00063 vector<unique_ptr<MockExchangeConnector> > exchangeConnectors; 00064 vector<int> exchangePorts; 00065 00066 00067 Components(std::shared_ptr<ServiceProxies> proxies) 00068 : proxies(proxies), 00069 router1(proxies, "router1"), 00070 router2(proxies, "router2"), 00071 winStream("mockStream", proxies), 00072 postAuctionLoop(proxies, "pas1"), 00073 masterBanker(proxies, "masterBanker"), 00074 agentConfig(proxies, "agentConfigurationService"), 00075 monitor(proxies, "monitor"), 00076 agent(proxies, "agent1"), 00077 augmentor(proxies, "frequency-cap-ex") 00078 { 00079 } 00080 00081 void shutdown() 00082 { 00083 router1.shutdown(); 00084 router2.shutdown(); 00085 winStream.shutdown(); 00086 postAuctionLoop.shutdown(); 00087 00088 budgetController.shutdown(); 00089 masterBanker.shutdown(); 00090 00091 agent.shutdown(); 00092 augmentor.shutdown(); 00093 agentConfig.shutdown(); 00094 00095 monitor.shutdown(); 00096 00097 cerr << "done shutdown" << endl; 00098 } 00099 00100 void init() 00101 { 00102 const string agentUri = "tcp://127.0.0.1:1234"; 00103 00104 // Setup a monitor which ensures that any instability in the system will 00105 // throttle the bid request stream. In other words, it ensures you won't 00106 // go bankrupt. 00107 monitor.init({"router1", "router2", "pas1", "masterBanker"}); 00108 monitor.bindTcp(); 00109 monitor.start(); 00110 00111 // Setup and agent configuration service which is used to notify all 00112 // interested services of changes to the agent configuration. 00113 agentConfig.init(); 00114 agentConfig.bindTcp(); 00115 agentConfig.start(); 00116 00117 // Setup a master banker used to keep the canonical budget of the 00118 // various bidding agent accounts. The data contained in this service is 00119 // periodically persisted to redis. 00120 masterBanker.init(std::make_shared<RedisBankerPersistence>(redis)); 00121 masterBanker.bindTcp(); 00122 masterBanker.start(); 00123 00124 // Setup a slave banker that we can use to manipulate and peak at the 00125 // budgets during the test. 00126 budgetController.init(proxies->config); 00127 budgetController.start(); 00128 00129 // Each router contains a slave masterBanker which is periodically 00130 // synced with the master banker. 00131 auto makeSlaveBanker = [=] (const std::string & name) 00132 { 00133 auto res = std::make_shared<SlaveBanker> 00134 (proxies->zmqContext, proxies->config, name); 00135 res->start(); 00136 return res; 00137 }; 00138 00139 // Setup a post auction loop (PAL) which handles all exchange events 00140 // that don't need to be processed in real-time (wins, loss, etc). 00141 postAuctionLoop.init(); 00142 postAuctionLoop.setBanker(makeSlaveBanker("pas1")); 00143 postAuctionLoop.bindTcp(); 00144 postAuctionLoop.start(); 00145 00146 // Setup two routers which will manage the bid request stream coming 00147 // from the exchange, the augmentations coming from the augmentors (to 00148 // be added to the test) and the bids coming from the agents. Along the 00149 // way it also applies various filters based on agent configuration 00150 // while ensuring that all the real-time constraints are respected. 00151 router1.init(); 00152 router1.setBanker(makeSlaveBanker("router1")); 00153 router1.bindTcp(); 00154 router1.start(); 00155 00156 router2.init(); 00157 router2.setBanker(makeSlaveBanker("router2")); 00158 router2.bindTcp(); 00159 router2.start(); 00160 00161 // Setup an exchange connector for each router which will act as the 00162 // middle men between the exchange and the router. 00163 00164 int ports = 12338; 00165 00166 exchangeConnectors.emplace_back( 00167 new MockExchangeConnector("mock-1", proxies)); 00168 00169 exchangeConnectors.emplace_back( 00170 new MockExchangeConnector("mock-2", proxies)); 00171 00172 for (auto& connector : exchangeConnectors) { 00173 connector->enableUntil(Date::positiveInfinity()); 00174 00175 int port = connector->init(ports, "localhost", 2 /* threads */); 00176 00177 exchangePorts.push_back(port); 00178 ++ports; 00179 } 00180 00181 router1.addExchange(*exchangeConnectors[0]); 00182 router2.addExchange(*exchangeConnectors[1]); 00183 00184 // Setup an ad server connector that also acts as a midlle men between 00185 // the exchange's wins and the post auction loop. 00186 winStream.init(12340); 00187 winStream.start(); 00188 00189 // Our bidding agent which listens to the bid request stream from all 00190 // available routers and decide who gets to see your awesome pictures of 00191 // kittens. 00192 agent.init(); 00193 agent.start(); 00194 agent.configure(); 00195 00196 // Our augmentor which does frequency capping for our agent. 00197 augmentor.init(); 00198 augmentor.start(); 00199 } 00200 }; 00201 00202 00203 /******************************************************************************/ 00204 /* SETUP */ 00205 /******************************************************************************/ 00206 00207 void setupAgent(TestAgent& agent) 00208 { 00209 return; 00210 00211 // Set our frequency cap to 42. This has two effects: 1) it instructs the 00212 // router that we want bid requests destined for our agent to first be 00213 // augmented with frequency capping information and 2) it instructs our 00214 // augmentor to place the pass-frequency-cap-ex tag on our bid request if 00215 // our agent has seen a given user less then 42 times. 00216 agent.config.addAugmentation("frequency-cap-ex", Json::Value(42)); 00217 00218 // Instructs the router to only keep bid requests that have this tag. In 00219 // other words keep only the bid requests that haven't reached our frequency 00220 // cap limit. 00221 agent.config.augmentationFilter.include.push_back("pass-frequency-cap-ex"); 00222 00223 // This lambda implements our incredibly sophisticated bidding strategy. 00224 agent.onBidRequest = [&] ( 00225 double timestamp, 00226 const Id & id, 00227 std::shared_ptr<BidRequest> br, 00228 Bids bids, 00229 double timeLeftMs, 00230 const Json::Value & augmentations) 00231 { 00232 ExcAssertGreater(bids.size(), 0); 00233 00234 Bid& bid = bids[0]; 00235 ExcAssertGreater(bid.availableCreatives.size(), 0); 00236 00237 bid.bid(bid.availableCreatives[0], USD_CPM(10)); 00238 00239 agent.doBid(id, bids, Json::Value()); 00240 ML::atomic_inc(agent.numBidRequests); 00241 }; 00242 } 00243 00244 00246 void allocateBudget( 00247 SlaveBudgetController& budgetController, 00248 const AccountKey& account, 00249 Amount budget) 00250 { 00251 budgetController.addAccountSync(account); 00252 budgetController.setBudgetSync(account[0], budget); 00253 budgetController.topupTransferSync(account, USD(10)); 00254 00255 cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; 00256 00257 // Syncing is done periodically so we have to wait a bit before the router 00258 // will have a budget available. Necessary because the bid request stream 00259 // for this test isn't infinit. 00260 cerr << "sleeping so that the slave accounts can sync up" << endl; 00261 ML::sleep(2.1); 00262 00263 auto summary = budgetController.getAccountSummarySync(account[0], -1); 00264 cerr << summary << endl; 00265 00266 ExcAssertEqual( 00267 summary.subAccounts["testStrategy"].subAccounts["router1"].budget, 00268 USD(0.10)); 00269 00270 ExcAssertEqual( 00271 summary.subAccounts["testStrategy"].subAccounts["router2"].budget, 00272 USD(0.10)); 00273 } 00274 00276 void dumpAccounts( 00277 SlaveBudgetController& budgetController, 00278 const AccountKey & name, const AccountSummary & a) 00279 { 00280 cerr << name << ": " << endl; 00281 cerr << budgetController.getAccountSync(name) << endl; 00282 00283 for (auto & sub: a.subAccounts) { 00284 dumpAccounts(budgetController, name.childKey(sub.first), sub.second); 00285 } 00286 }; 00287 00288 00289 /******************************************************************************/ 00290 /* MAIN */ 00291 /******************************************************************************/ 00292 00297 int main(int argc, char ** argv) 00298 { 00299 // Controls the length of the test. 00300 enum { 00301 nExchangeThreads = 10, 00302 nBidRequestsPerThread = 200 00303 }; 00304 00305 auto proxies = std::make_shared<ServiceProxies>(); 00306 00307 // If we had a zookeeper instance running, we could use it to do service 00308 // discovery. Since we don't, ServiceProxies will just default to using a 00309 // local service map. 00310 if (false) proxies->useZookeeper("zookeeper.rtbkit.org", "stats"); 00311 00312 // If we had a carbon instance running, we could use it to log events. Since 00313 // we don't, ServiceProxies will just default to using a local equivalent. 00314 if (false) proxies->logToCarbon("carbon.rtbkit.org", "stats"); 00315 00316 00317 // Setups up the various component of the RTBKit stack. See Components::init 00318 // for more details. 00319 Components components(proxies); 00320 components.init(); 00321 00322 // Some extra customization for our agent to make it extra special. See 00323 // setupAgent for more details. 00324 setupAgent(components.agent); 00325 00326 // Setup an initial budgeting for the test. 00327 allocateBudget( 00328 components.budgetController, 00329 {"testCampaign", "testStrategy"}, 00330 USD(1000)); 00331 00332 // Start up the exchange threads which should let bid requests flow through 00333 // our stack. 00334 MockExchange exchange(proxies, "mock-exchange"); 00335 exchange.start(nExchangeThreads, nBidRequestsPerThread, components.exchangePorts, { 12340 }); 00336 00337 // Dump the budget stats while we wait for the test to finish. 00338 while (!exchange.isDone()) { 00339 auto summary = components.budgetController.getAccountSummarySync( 00340 {"testCampaign"}, -1); 00341 cerr << summary << endl; 00342 00343 dumpAccounts(components.budgetController, {"testCampaign"}, summary); 00344 ML::sleep(1.0); 00345 } 00346 00347 // Test is done; clean up time. 00348 components.shutdown(); 00349 00350 components.proxies->events->dump(cerr); 00351 } 00352