RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/router.h
00001 /* rtb_router.h                                                   -*- C++ -*-
00002    Jeremy Barnes, 13 March 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Router class for RTB.
00006 */
00007 
00008 #ifndef __rtb__router_h__
00009 #define __rtb__router_h__
00010 
00011 #include "soa/service/zmq.hpp"
00012 #include <unordered_map>
00013 #include <boost/thread/thread.hpp>
00014 #include <boost/scoped_ptr.hpp>
00015 #include "jml/utils/filter_streams.h"
00016 #include "soa/service/zmq_named_pub_sub.h"
00017 #include "soa/service/socket_per_thread.h"
00018 #include "soa/service/timeout_map.h"
00019 #include "soa/service/pending_list.h"
00020 #include "augmentation_loop.h"
00021 #include "router_types.h"
00022 #include "soa/gc/gc_lock.h"
00023 #include "jml/utils/ring_buffer.h"
00024 #include "jml/arch/wakeup_fd.h"
00025 #include "jml/utils/smart_ptr_utils.h"
00026 #include <unordered_set>
00027 #include <thread>
00028 #include "rtbkit/common/exchange_connector.h"
00029 #include "rtbkit/core/agent_configuration/blacklist.h"
00030 #include "rtbkit/core/agent_configuration/agent_configuration_listener.h"
00031 #include "rtbkit/core/agent_configuration/agent_config.h"
00032 #include "rtbkit/core/monitor/monitor_provider.h"
00033 #include "rtbkit/core/monitor/monitor_client.h"
00034 
00035 namespace RTBKIT {
00036 
00037 struct Banker;
00038 struct BudgetController;
00039 struct Accountant;
00040 
00041 
00042 /*****************************************************************************/
00043 /* AGENT INFO                                                                */
00044 /*****************************************************************************/
00045 
00047 struct AgentInfoEntry {
00048     std::string name;
00049     std::shared_ptr<const AgentConfig> config;
00050     std::shared_ptr<const AgentStatus> status;
00051     std::shared_ptr<AgentStats> stats;
00052 
00053     bool valid() const { return config && stats; }
00054 
00056     Json::Value toJson() const;
00057 };
00058 
00059 
00064 struct AllAgentInfo : public std::vector<AgentInfoEntry> {
00065     std::unordered_map<std::string, int> agentIndex;
00066     std::unordered_map<AccountKey, std::vector<int> > accountIndex;
00067 };
00068 
00069 /*****************************************************************************/
00070 /* DEBUG INFO                                                                */
00071 /*****************************************************************************/
00072 
00073 struct AuctionDebugInfo {
00074     void addAuctionEvent(Date date, std::string type,
00075                          const std::vector<std::string> & args);
00076     void addSpotEvent(const Id & spot, Date date, std::string type,
00077                       const std::vector<std::string> & args);
00078     void dumpAuction() const;
00079     void dumpSpot(Id spot) const;
00080 
00081     struct Message {
00082         Date timestamp;
00083         Id spot;
00084         std::string type;
00085         std::vector<std::string> args;
00086     };
00087 
00088     std::vector<Message> messages;
00089 };
00090 
00091 /*****************************************************************************/
00092 /* ROUTER                                                                    */
00093 /*****************************************************************************/
00094 
00097 struct Router : public ServiceBase,
00098                 public MonitorProvider
00099 {
00100     Router(ServiceBase & parent,
00101            const std::string & serviceName = "router",
00102            double secondsUntilLossAssumed = 2.0,
00103            bool connectPostAuctionLoop = true);
00104 
00105     Router(std::shared_ptr<ServiceProxies> services,
00106            const std::string & serviceName = "router",
00107            double secondsUntilLossAssumed = 2.0,
00108            bool connectPostAuctionLoop = true);
00109 
00110     ~Router();
00111 
00112     double secondsUntilLossAssumed() const { return secondsUntilLossAssumed_; }
00113 
00114     void setSecondsUntilLossAssumed(double newValue)
00115     {
00116         if (newValue < 0.0)
00117             throw ML::Exception("invalid seconds until loss assumed");
00118         secondsUntilLossAssumed_ = newValue;
00119     }
00120 
00121     std::shared_ptr<Banker> getBanker() const;
00122     void setBanker(const std::shared_ptr<Banker> & newBanker);
00123 
00125     void init();
00126 
00128     void bindTcp();
00129 
00131     void bindAgents(std::string agentUri);
00132 
00134     void bindAugmentors(const std::string & uri);
00135 
00139     void unsafeDisableMonitor();
00140 
00143     virtual void
00144     start(boost::function<void ()> onStop = boost::function<void ()>());
00145 
00149     virtual void sleepUntilIdle();
00150 
00152     void issueTimestamp();
00153 
00155     virtual size_t numNonIdle() const;
00156     
00157     virtual void shutdown();
00158 
00160     template<typename F>
00161     void forAllExchanges(F functor) {
00162         for(auto & item : exchanges)
00163             functor(item);
00164     }
00165 
00175     void connectExchange(ExchangeConnector & exchange)
00176     {
00177         exchange.onNewAuction  = [=] (std::shared_ptr<Auction> a) { this->injectAuction(a, secondsUntilLossAssumed_); };
00178         exchange.onAuctionDone = [=] (std::shared_ptr<Auction> a) { this->onAuctionDone(a); };
00179     }
00180 
00182     void addExchange(ExchangeConnector * exchange)
00183     {
00184         Guard guard(lock);
00185         exchanges.push_back(std::shared_ptr<ExchangeConnector>(exchange));
00186         connectExchange(*exchange);
00187     }
00188 
00193     void addExchange(ExchangeConnector & exchange)
00194     {
00195         Guard guard(lock);
00196         exchanges.emplace_back(ML::make_unowned_std_sp(exchange));
00197         connectExchange(exchange);
00198     }
00199     
00201     void addExchange(std::unique_ptr<ExchangeConnector> && exchange)
00202     {
00203         addExchange(exchange.release());
00204     }
00205 
00209     void startExchange(const std::string & exchangeType,
00210                        const Json::Value & exchangeConfig);
00211 
00215     void startExchange(const Json::Value & exchangeConfig);
00216 
00225     void injectAuction(std::shared_ptr<Auction> auction,
00226                        double lossTime = INFINITY);
00227     
00254     std::shared_ptr<Auction>
00255     injectAuction(Auction::HandleAuction onAuctionFinished,
00256                   std::shared_ptr<BidRequest> request,
00257                   const std::string & requestStr,
00258                   const std::string & requestStrFormat,
00259                   double startTime = 0.0,
00260                   double expiryTime = 0.0,
00261                   double lossTime = INFINITY);
00262 
00277     void notifyFinishedAuction(const Id & auctionId);
00278 
00280     int numAuctionsInProgress() const;
00281 
00283     int numAuctionsAwaitingResult() const;
00284 
00286     Json::Value getStats() const;
00287 
00289     Json::Value getAgentInfo(const std::string & agent) const;
00290 
00292     Json::Value getAllAgentInfo() const;
00293 
00296     Json::Value getAccountInfo(const AccountKey & account) const;
00297     
00299     void setGlobalBidProbability(double val) { globalBidProbability = val; }
00300     
00304     void setBidsErrorRate(double val) { bidsErrorRate = val; }
00305 
00309     void setBudgetErrorRate(double val) { budgetErrorRate = val; }
00310 
00314     boost::function<void (double)> acceptAuctionProbabilityFn;
00315 
00317     void setAcceptAuctionProbability(double val)
00318     {
00319         if (acceptAuctionProbabilityFn)
00320             acceptAuctionProbabilityFn(val);
00321         else if (val != 1.0)
00322             std::cerr << "warning: no way to change accept auction probability" << std::endl;
00323     }
00324 
00326     virtual Json::Value getServiceStatus() const;
00327 
00331     std::function<void (std::shared_ptr<Auction>, Id, Auction::Response)>
00332         onSubmittedAuction;
00333 
00336     virtual void submitToPostAuctionService(std::shared_ptr<Auction> auction,
00337                                             Id auctionId,
00338                                             const Auction::Response & bid);
00339 
00340 protected:
00341     // This thread contains the main router loop
00342     boost::scoped_ptr<boost::thread> runThread;
00343 
00344     // This thread wakes up every now and again to run the destructors
00345     // on auction objects, which are expensive to destroy, so that they
00346     // don't have to run in the main loop
00347     boost::scoped_ptr<boost::thread> cleanupThread;
00348 
00349     typedef std::recursive_mutex Lock;
00350     typedef std::unique_lock<Lock> Guard;
00351 
00352     int shutdown_;
00353 
00354 public:
00355     // Connection to the agents
00356     ZmqNamedClientBus agentEndpoint;
00357 
00358     // Connection to the post auction loop
00359     ZmqNamedProxy postAuctionEndpoint;
00360 
00361     void updateAllAgents();
00362 
00364     typedef std::map<std::string, AgentInfo> Agents;
00365     Agents agents;
00366 
00367     ML::RingBufferSRMW<std::pair<std::string, std::shared_ptr<const AgentConfig> > > configBuffer;
00368     ML::RingBufferSRMW<std::shared_ptr<AugmentationInfo> > startBiddingBuffer;
00369     ML::RingBufferSRMW<std::shared_ptr<Auction> > submittedBuffer;
00370     ML::RingBufferSWMR<std::shared_ptr<Auction> > auctionGraveyard;
00371 
00372     ML::Wakeup_Fd wakeupMainLoop;
00373 
00374     AugmentationLoop augmentationLoop;
00375     Blacklist blacklist;
00376 
00378     typedef TimeoutMap<Id, AuctionInfo> InFlight;
00379     InFlight inFlight;
00380 
00382     AuctionInfo &
00383     addAuction(std::shared_ptr<Auction> auction, Date timeout);
00384 
00385     DutyCycleEntry dutyCycleCurrent;
00386     std::vector<DutyCycleEntry> dutyCycleHistory;
00387 
00388     void run();
00389 
00390     void handleAgentMessage(const std::vector<std::string> & message);
00391 
00392     void checkDeadAgents();
00393 
00394     void checkExpiredAuctions();
00395 
00396     void returnErrorResponse(const std::vector<std::string> & message,
00397                              const std::string & error);
00398 
00399     void doShutdown();
00400 
00406     std::shared_ptr<AugmentationInfo>
00407     preprocessAuction(const std::shared_ptr<Auction> & auction);
00408 
00412     void augmentAuction(const std::shared_ptr<AugmentationInfo> & info);
00413 
00417     void doStartBidding(const std::vector<std::string> & message);
00418 
00420     void doStartBidding(const std::shared_ptr<AugmentationInfo> & augInfo);
00421 
00424     void doSubmitted(std::shared_ptr<Auction> auction);
00425 
00426     //std::unordered_set<Id> recentlySubmitted;  // DEBUG
00427 
00429     void doBid(const std::vector<std::string> & message);
00430 
00433     void doPong(int level, const std::vector<std::string> & message);
00434 
00438     void sendPings();
00439 
00441     void doStats(const std::vector<std::string> & message);
00442 
00444     void onNewAuction(std::shared_ptr<Auction> auction);
00445 
00447     void onAuctionDone(std::shared_ptr<Auction> auction);
00448 
00450     void doConfig(const std::string & agent,
00451                   std::shared_ptr<const AgentConfig> config);
00452 
00456     void unconfigure(const std::string & agent, const AgentConfig & config);
00457 
00461     void configure(const std::string & agent, AgentConfig & config);
00462 
00464     template<typename... Args>
00465     void sendAgentMessage(const std::string & agent,
00466                           const std::string & messageType,
00467                           const Date & date,
00468                           Args... args)
00469     {
00470         agentEndpoint.sendMessage(agent, messageType, date, args...);
00471     }
00472 
00474     void sendBidResponse(const std::string & agent,
00475                          const AgentInfo & info,
00476                          BidStatus status,
00477                          Date timestamp,
00478                          const std::string & message,
00479                          const Id & auctionId,
00480                          int spotNum = -1,
00481                          Amount price = Amount(),
00482                          const Auction * auction = 0,
00483                          const std::string & bidData = "",
00484                          const Json::Value & metadata = Json::Value(),
00485                          const std::string & augmentationsStr = "");
00486                          
00487 
00488     mutable Lock lock;
00489 
00490     std::shared_ptr<Banker> banker;
00491 
00492     double secondsUntilLossAssumed_;
00493     double globalBidProbability;
00494     double bidsErrorRate;
00495     double budgetErrorRate;
00496     bool connectPostAuctionLoop;
00497 
00498 
00499     /*************************************************************************/
00500     /* AGENT INTERACTIONS                                                    */
00501     /*************************************************************************/
00502 
00504     AllAgentInfo * allAgents;
00505 
00507     mutable GcLock allAgentsGc;
00508 
00509     typedef std::function<void (const AgentInfoEntry & info)> OnAgentFn;
00511     void forEachAgent(const OnAgentFn & onAgent) const;
00512 
00516     void forEachAccountAgent(const AccountKey & account,
00517                              const OnAgentFn & onAgent) const;
00518 
00522     AgentInfoEntry getAgentEntry(const std::string & agent) const;
00523 
00527     AgentConfigurationListener configListener;
00528 
00530     bool initialized;
00531 
00533     std::vector<std::shared_ptr<ExchangeConnector> > exchanges;
00534 
00535     /*************************************************************************/
00536     /* EXCEPTIONS                                                            */
00537     /*************************************************************************/
00538 
00542     void throwException(const std::string & key, const std::string & fmt,
00543                         ...) __attribute__((__noreturn__));
00544 
00545 
00546     /*************************************************************************/
00547     /* SYSTEM LOGGING                                                        */
00548     /*************************************************************************/
00549 
00551     template<typename... Args>
00552     void logRouterError(const std::string & function,
00553                         const std::string & exception,
00554                         Args... args)
00555     {
00556         logger.publish("ROUTERERROR", Date::now().print(5),
00557                        function, exception, args...);
00558         recordHit("error.%s", function);
00559     }
00560 
00561 
00562     /*************************************************************************/
00563     /* DATA LOGGING                                                          */
00564     /*************************************************************************/
00565 
00567     template<typename... Args>
00568     void logMessage(const std::string & channel, Args... args)
00569     {
00570         using namespace std;
00571         //cerr << "********* logging message to " << channel << endl;
00572         logger.publish(channel, Date::now().print(5), args...);
00573     }
00574 
00576     template<typename... Args>
00577     void logMessageNoTimestamp(const std::string & channel, Args... args)
00578     {
00579         using namespace std;
00580         //cerr << "********* logging message to " << channel << endl;
00581         logger.publish(channel, args...);
00582     }
00583 
00584     /*************************************************************************/
00585     /* DEBUGGING                                                             */
00586     /*************************************************************************/
00587 
00588     void debugAuction(const Id & auction, const std::string & type,
00589                       const std::vector<std::string> & args
00590                       = std::vector<std::string>())
00591     {
00592         if (JML_LIKELY(!doDebug)) return;
00593         debugAuctionImpl(auction, type, args);
00594     }
00595 
00596     void debugAuctionImpl(const Id & auction, const std::string & type,
00597                           const std::vector<std::string> & args);
00598 
00599     void debugSpot(const Id & auction,
00600                    const Id & spot,
00601                    const std::string & type,
00602                    const std::vector<std::string> & args
00603                        = std::vector<std::string>())
00604     {
00605         if (JML_LIKELY(!doDebug)) return;
00606         debugSpotImpl(auction, spot, type, args);
00607     }
00608 
00609     void debugSpotImpl(const Id & auction,
00610                        const Id & spot,
00611                        const std::string & type,
00612                        const std::vector<std::string> & args);
00613 
00614     void expireDebugInfo();
00615 
00616     void dumpAuction(const Id & auction) const;
00617     void dumpSpot(const Id & auction, const Id & spot) const;
00618 
00619     Date getCurrentTime() const { return Date::now(); }
00620 
00621     ZmqNamedPublisher logger;
00622 
00624     bool doDebug;
00625 
00626     mutable ML::Spinlock debugLock;
00627     TimeoutMap<Id, AuctionDebugInfo> debugInfo;
00628 
00629     uint64_t numAuctions;
00630     uint64_t numBids;
00631     uint64_t numNonEmptyBids;
00632     uint64_t numAuctionsWithBid;
00633     uint64_t numNoPotentialBidders;
00634     uint64_t numNoBidders;
00635 
00636     /* Client connection to the Monitor, determines if we can process bid
00637        requests */
00638     MonitorClient monitorClient;
00639     Date slowModeLastAuction;
00640     int slowModeCount;
00641 
00642     /* MONITOR PROVIDER */
00643     /* Post service health status to Monitor */
00644     MonitorProviderClient monitorProviderClient;
00645 
00646     /* MonitorProvider interface */
00647     std::string getProviderName() const;
00648     Json::Value getProviderIndicators() const;
00649 };
00650 
00651 
00652 } // namespace RTBKIT
00653 
00654 
00655 #endif /* __rtb__router_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator