RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
examples/rtbkit_integration_test.cc
00001 /* router_integration_test.cc
00002    Jeremy Barnes, 21 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Overall integration test for the router stack.
00006 */
00007 
00008 #include "augmentor_ex.h"
00009 
00010 #include "rtbkit/core/router/router.h"
00011 #include "rtbkit/core/post_auction/post_auction_loop.h"
00012 #include "rtbkit/core/agent_configuration/agent_configuration_service.h"
00013 #include "rtbkit/core/banker/master_banker.h"
00014 #include "rtbkit/core/banker/slave_banker.h"
00015 #include "rtbkit/core/monitor/monitor_endpoint.h"
00016 #include "jml/utils/rng.h"
00017 #include "jml/utils/pair_utils.h"
00018 #include "jml/utils/environment.h"
00019 #include "jml/arch/timers.h"
00020 #include "soa/service/testing/redis_temporary_server.h"
00021 #include "testing/generic_exchange_connector.h"
00022 #include "testing/mock_exchange.h"
00023 #include "rtbkit/testing/test_agent.h"
00024 #include "rtbkit/examples/mock_exchange_connector.h"
00025 #include "rtbkit/examples/mock_ad_server_connector.h"
00026 #include <boost/thread.hpp>
00027 #include <netdb.h>
00028 #include <memory>
00029 
00030 
00031 using namespace std;
00032 using namespace ML;
00033 using namespace Redis;
00034 using namespace Datacratic;
00035 using namespace RTBKIT;
00036 
00037 
00038 /******************************************************************************/
00039 /* COMPONENTS                                                                 */
00040 /******************************************************************************/
00041 
00043 struct Components
00044 {
00045 
00046     std::shared_ptr<ServiceProxies> proxies;
00047 
00048     // See init for an inline description of the various components.
00049 
00050     RedisTemporaryServer redis;
00051     Router router1, router2;
00052     MockAdServerConnector winStream;
00053     PostAuctionLoop postAuctionLoop;
00054     MasterBanker masterBanker;
00055     SlaveBudgetController budgetController;
00056     AgentConfigurationService agentConfig;
00057     MonitorEndpoint monitor;
00058     TestAgent agent;
00059     FrequencyCapAugmentor augmentor;
00060 
00061     // \todo Add a PAL event subscriber.
00062 
00063     vector<unique_ptr<MockExchangeConnector> > exchangeConnectors;
00064     vector<int> exchangePorts;
00065 
00066 
00067     Components(std::shared_ptr<ServiceProxies> proxies)
00068         : proxies(proxies),
00069           router1(proxies, "router1"),
00070           router2(proxies, "router2"),
00071           winStream("mockStream", proxies),
00072           postAuctionLoop(proxies, "pas1"),
00073           masterBanker(proxies, "masterBanker"),
00074           agentConfig(proxies, "agentConfigurationService"),
00075           monitor(proxies, "monitor"),
00076           agent(proxies, "agent1"),
00077           augmentor(proxies, "frequency-cap-ex")
00078     {
00079     }
00080 
00081     void shutdown()
00082     {
00083         router1.shutdown();
00084         router2.shutdown();
00085         winStream.shutdown();
00086         postAuctionLoop.shutdown();
00087 
00088         budgetController.shutdown();
00089         masterBanker.shutdown();
00090 
00091         agent.shutdown();
00092         augmentor.shutdown();
00093         agentConfig.shutdown();
00094 
00095         monitor.shutdown();
00096 
00097         cerr << "done shutdown" << endl;
00098     }
00099 
00100     void init()
00101     {
00102         const string agentUri = "tcp://127.0.0.1:1234";
00103 
00104         // Setup a monitor which ensures that any instability in the system will
00105         // throttle the bid request stream. In other words, it ensures you won't
00106         // go bankrupt.
00107         monitor.init({"router1", "router2", "pas1", "masterBanker"});
00108         monitor.bindTcp();
00109         monitor.start();
00110 
00111         // Setup and agent configuration service which is used to notify all
00112         // interested services of changes to the agent configuration.
00113         agentConfig.init();
00114         agentConfig.bindTcp();
00115         agentConfig.start();
00116 
00117         // Setup a master banker used to keep the canonical budget of the
00118         // various bidding agent accounts. The data contained in this service is
00119         // periodically persisted to redis.
00120         masterBanker.init(std::make_shared<RedisBankerPersistence>(redis));
00121         masterBanker.bindTcp();
00122         masterBanker.start();
00123 
00124         // Setup a slave banker that we can use to manipulate and peak at the
00125         // budgets during the test.
00126         budgetController.init(proxies->config);
00127         budgetController.start();
00128 
00129         // Each router contains a slave masterBanker which is periodically
00130         // synced with the master banker.
00131         auto makeSlaveBanker = [=] (const std::string & name)
00132             {
00133                 auto res = std::make_shared<SlaveBanker>
00134                 (proxies->zmqContext, proxies->config, name);
00135                 res->start();
00136                 return res;
00137             };
00138 
00139         // Setup a post auction loop (PAL) which handles all exchange events
00140         // that don't need to be processed in real-time (wins, loss, etc).
00141         postAuctionLoop.init();
00142         postAuctionLoop.setBanker(makeSlaveBanker("pas1"));
00143         postAuctionLoop.bindTcp();
00144         postAuctionLoop.start();
00145 
00146         // Setup two routers which will manage the bid request stream coming
00147         // from the exchange, the augmentations coming from the augmentors (to
00148         // be added to the test) and the bids coming from the agents. Along the
00149         // way it also applies various filters based on agent configuration
00150         // while ensuring that all the real-time constraints are respected.
00151         router1.init();
00152         router1.setBanker(makeSlaveBanker("router1"));
00153         router1.bindTcp();
00154         router1.start();
00155 
00156         router2.init();
00157         router2.setBanker(makeSlaveBanker("router2"));
00158         router2.bindTcp();
00159         router2.start();
00160 
00161         // Setup an exchange connector for each router which will act as the
00162         // middle men between the exchange and the router.
00163 
00164         int ports = 12338;
00165 
00166         exchangeConnectors.emplace_back(
00167                 new MockExchangeConnector("mock-1", proxies));
00168 
00169         exchangeConnectors.emplace_back(
00170                 new MockExchangeConnector("mock-2", proxies));
00171 
00172         for (auto& connector : exchangeConnectors) {
00173             connector->enableUntil(Date::positiveInfinity());
00174 
00175             int port = connector->init(ports, "localhost", 2 /* threads */);
00176 
00177             exchangePorts.push_back(port);
00178             ++ports;
00179         }
00180 
00181         router1.addExchange(*exchangeConnectors[0]);
00182         router2.addExchange(*exchangeConnectors[1]);
00183         
00184         // Setup an ad server connector that also acts as a midlle men between
00185         // the exchange's wins and the post auction loop.
00186         winStream.init(12340);
00187         winStream.start();
00188 
00189         // Our bidding agent which listens to the bid request stream from all
00190         // available routers and decide who gets to see your awesome pictures of
00191         // kittens.
00192         agent.init();
00193         agent.start();
00194         agent.configure();
00195 
00196         // Our augmentor which does frequency capping for our agent.
00197         augmentor.init();
00198         augmentor.start();
00199     }
00200 };
00201 
00202 
00203 /******************************************************************************/
00204 /* SETUP                                                                      */
00205 /******************************************************************************/
00206 
00207 void setupAgent(TestAgent& agent)
00208 {
00209     return;
00210 
00211     // Set our frequency cap to 42. This has two effects: 1) it instructs the
00212     // router that we want bid requests destined for our agent to first be
00213     // augmented with frequency capping information and 2) it instructs our
00214     // augmentor to place the pass-frequency-cap-ex tag on our bid request if
00215     // our agent has seen a given user less then 42 times.
00216     agent.config.addAugmentation("frequency-cap-ex", Json::Value(42));
00217 
00218     // Instructs the router to only keep bid requests that have this tag. In
00219     // other words keep only the bid requests that haven't reached our frequency
00220     // cap limit.
00221     agent.config.augmentationFilter.include.push_back("pass-frequency-cap-ex");
00222 
00223     // This lambda implements our incredibly sophisticated bidding strategy.
00224     agent.onBidRequest = [&] (
00225             double timestamp,
00226             const Id & id,
00227             std::shared_ptr<BidRequest> br,
00228             Bids bids,
00229             double timeLeftMs,
00230             const Json::Value & augmentations)
00231         {
00232             ExcAssertGreater(bids.size(), 0);
00233 
00234             Bid& bid = bids[0];
00235             ExcAssertGreater(bid.availableCreatives.size(), 0);
00236 
00237             bid.bid(bid.availableCreatives[0], USD_CPM(10));
00238 
00239             agent.doBid(id, bids, Json::Value());
00240             ML::atomic_inc(agent.numBidRequests);
00241         };
00242 }
00243 
00244 
00246 void allocateBudget(
00247         SlaveBudgetController& budgetController,
00248         const AccountKey& account,
00249         Amount budget)
00250 {
00251     budgetController.addAccountSync(account);
00252     budgetController.setBudgetSync(account[0], budget);
00253     budgetController.topupTransferSync(account, USD(10));
00254 
00255     cerr << budgetController.getAccountSummarySync(account[0], -1) << endl;
00256 
00257     // Syncing is done periodically so we have to wait a bit before the router
00258     // will have a budget available. Necessary because the bid request stream
00259     // for this test isn't infinit.
00260     cerr << "sleeping so that the slave accounts can sync up" << endl;
00261     ML::sleep(2.1);
00262 
00263     auto summary = budgetController.getAccountSummarySync(account[0], -1);
00264     cerr << summary << endl;
00265 
00266     ExcAssertEqual(
00267             summary.subAccounts["testStrategy"].subAccounts["router1"].budget,
00268             USD(0.10));
00269 
00270     ExcAssertEqual(
00271             summary.subAccounts["testStrategy"].subAccounts["router2"].budget,
00272             USD(0.10));
00273 }
00274 
00276 void dumpAccounts(
00277         SlaveBudgetController& budgetController,
00278         const AccountKey & name, const AccountSummary & a)
00279 {
00280     cerr << name << ": " << endl;
00281     cerr << budgetController.getAccountSync(name) << endl;
00282 
00283     for (auto & sub: a.subAccounts) {
00284         dumpAccounts(budgetController, name.childKey(sub.first), sub.second);
00285     }
00286 };
00287 
00288 
00289 /******************************************************************************/
00290 /* MAIN                                                                       */
00291 /******************************************************************************/
00292 
00297 int main(int argc, char ** argv)
00298 {
00299     // Controls the length of the test.
00300     enum {
00301         nExchangeThreads = 10,
00302         nBidRequestsPerThread = 200
00303     };
00304 
00305     auto proxies = std::make_shared<ServiceProxies>();
00306 
00307     // If we had a zookeeper instance running, we could use it to do service
00308     // discovery. Since we don't, ServiceProxies will just default to using a
00309     // local service map.
00310     if (false) proxies->useZookeeper("zookeeper.rtbkit.org", "stats");
00311 
00312     // If we had a carbon instance running, we could use it to log events. Since
00313     // we don't, ServiceProxies will just default to using a local equivalent.
00314     if (false) proxies->logToCarbon("carbon.rtbkit.org", "stats");
00315 
00316 
00317     // Setups up the various component of the RTBKit stack. See Components::init
00318     // for more details.
00319     Components components(proxies);
00320     components.init();
00321 
00322     // Some extra customization for our agent to make it extra special. See
00323     // setupAgent for more details.
00324     setupAgent(components.agent);
00325 
00326     // Setup an initial budgeting for the test.
00327     allocateBudget(
00328             components.budgetController,
00329             {"testCampaign", "testStrategy"},
00330             USD(1000));
00331 
00332     // Start up the exchange threads which should let bid requests flow through
00333     // our stack.
00334     MockExchange exchange(proxies, "mock-exchange");
00335     exchange.start(nExchangeThreads, nBidRequestsPerThread, components.exchangePorts, { 12340 });
00336 
00337     // Dump the budget stats while we wait for the test to finish.
00338     while (!exchange.isDone()) {
00339         auto summary = components.budgetController.getAccountSummarySync(
00340                 {"testCampaign"}, -1);
00341         cerr <<  summary << endl;
00342 
00343         dumpAccounts(components.budgetController, {"testCampaign"}, summary);
00344         ML::sleep(1.0);
00345     }
00346 
00347     // Test is done; clean up time.
00348     components.shutdown();
00349 
00350     components.proxies->events->dump(cerr);
00351 }
00352 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator