RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rtb_router.h -*- C++ -*- 00002 Jeremy Barnes, 13 March 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Router class for RTB. 00006 */ 00007 00008 #ifndef __rtb__router_h__ 00009 #define __rtb__router_h__ 00010 00011 #include "soa/service/zmq.hpp" 00012 #include <unordered_map> 00013 #include <boost/thread/thread.hpp> 00014 #include <boost/scoped_ptr.hpp> 00015 #include "jml/utils/filter_streams.h" 00016 #include "soa/service/zmq_named_pub_sub.h" 00017 #include "soa/service/socket_per_thread.h" 00018 #include "soa/service/timeout_map.h" 00019 #include "soa/service/pending_list.h" 00020 #include "augmentation_loop.h" 00021 #include "router_types.h" 00022 #include "soa/gc/gc_lock.h" 00023 #include "jml/utils/ring_buffer.h" 00024 #include "jml/arch/wakeup_fd.h" 00025 #include "jml/utils/smart_ptr_utils.h" 00026 #include <unordered_set> 00027 #include <thread> 00028 #include "rtbkit/common/exchange_connector.h" 00029 #include "rtbkit/core/agent_configuration/blacklist.h" 00030 #include "rtbkit/core/agent_configuration/agent_configuration_listener.h" 00031 #include "rtbkit/core/agent_configuration/agent_config.h" 00032 #include "rtbkit/core/monitor/monitor_provider.h" 00033 #include "rtbkit/core/monitor/monitor_client.h" 00034 00035 namespace RTBKIT { 00036 00037 struct Banker; 00038 struct BudgetController; 00039 struct Accountant; 00040 00041 00042 /*****************************************************************************/ 00043 /* AGENT INFO */ 00044 /*****************************************************************************/ 00045 00047 struct AgentInfoEntry { 00048 std::string name; 00049 std::shared_ptr<const AgentConfig> config; 00050 std::shared_ptr<const AgentStatus> status; 00051 std::shared_ptr<AgentStats> stats; 00052 00053 bool valid() const { return config && stats; } 00054 00056 Json::Value toJson() const; 00057 }; 00058 00059 00064 struct AllAgentInfo : public std::vector<AgentInfoEntry> { 00065 std::unordered_map<std::string, int> agentIndex; 00066 std::unordered_map<AccountKey, std::vector<int> > accountIndex; 00067 }; 00068 00069 /*****************************************************************************/ 00070 /* DEBUG INFO */ 00071 /*****************************************************************************/ 00072 00073 struct AuctionDebugInfo { 00074 void addAuctionEvent(Date date, std::string type, 00075 const std::vector<std::string> & args); 00076 void addSpotEvent(const Id & spot, Date date, std::string type, 00077 const std::vector<std::string> & args); 00078 void dumpAuction() const; 00079 void dumpSpot(Id spot) const; 00080 00081 struct Message { 00082 Date timestamp; 00083 Id spot; 00084 std::string type; 00085 std::vector<std::string> args; 00086 }; 00087 00088 std::vector<Message> messages; 00089 }; 00090 00091 /*****************************************************************************/ 00092 /* ROUTER */ 00093 /*****************************************************************************/ 00094 00097 struct Router : public ServiceBase, 00098 public MonitorProvider 00099 { 00100 Router(ServiceBase & parent, 00101 const std::string & serviceName = "router", 00102 double secondsUntilLossAssumed = 2.0, 00103 bool connectPostAuctionLoop = true); 00104 00105 Router(std::shared_ptr<ServiceProxies> services, 00106 const std::string & serviceName = "router", 00107 double secondsUntilLossAssumed = 2.0, 00108 bool connectPostAuctionLoop = true); 00109 00110 ~Router(); 00111 00112 double secondsUntilLossAssumed() const { return secondsUntilLossAssumed_; } 00113 00114 void setSecondsUntilLossAssumed(double newValue) 00115 { 00116 if (newValue < 0.0) 00117 throw ML::Exception("invalid seconds until loss assumed"); 00118 secondsUntilLossAssumed_ = newValue; 00119 } 00120 00121 std::shared_ptr<Banker> getBanker() const; 00122 void setBanker(const std::shared_ptr<Banker> & newBanker); 00123 00125 void init(); 00126 00128 void bindTcp(); 00129 00131 void bindAgents(std::string agentUri); 00132 00134 void bindAugmentors(const std::string & uri); 00135 00139 void unsafeDisableMonitor(); 00140 00143 virtual void 00144 start(boost::function<void ()> onStop = boost::function<void ()>()); 00145 00149 virtual void sleepUntilIdle(); 00150 00152 void issueTimestamp(); 00153 00155 virtual size_t numNonIdle() const; 00156 00157 virtual void shutdown(); 00158 00160 template<typename F> 00161 void forAllExchanges(F functor) { 00162 for(auto & item : exchanges) 00163 functor(item); 00164 } 00165 00175 void connectExchange(ExchangeConnector & exchange) 00176 { 00177 exchange.onNewAuction = [=] (std::shared_ptr<Auction> a) { this->injectAuction(a, secondsUntilLossAssumed_); }; 00178 exchange.onAuctionDone = [=] (std::shared_ptr<Auction> a) { this->onAuctionDone(a); }; 00179 } 00180 00182 void addExchange(ExchangeConnector * exchange) 00183 { 00184 Guard guard(lock); 00185 exchanges.push_back(std::shared_ptr<ExchangeConnector>(exchange)); 00186 connectExchange(*exchange); 00187 } 00188 00193 void addExchange(ExchangeConnector & exchange) 00194 { 00195 Guard guard(lock); 00196 exchanges.emplace_back(ML::make_unowned_std_sp(exchange)); 00197 connectExchange(exchange); 00198 } 00199 00201 void addExchange(std::unique_ptr<ExchangeConnector> && exchange) 00202 { 00203 addExchange(exchange.release()); 00204 } 00205 00209 void startExchange(const std::string & exchangeType, 00210 const Json::Value & exchangeConfig); 00211 00215 void startExchange(const Json::Value & exchangeConfig); 00216 00225 void injectAuction(std::shared_ptr<Auction> auction, 00226 double lossTime = INFINITY); 00227 00254 std::shared_ptr<Auction> 00255 injectAuction(Auction::HandleAuction onAuctionFinished, 00256 std::shared_ptr<BidRequest> request, 00257 const std::string & requestStr, 00258 const std::string & requestStrFormat, 00259 double startTime = 0.0, 00260 double expiryTime = 0.0, 00261 double lossTime = INFINITY); 00262 00277 void notifyFinishedAuction(const Id & auctionId); 00278 00280 int numAuctionsInProgress() const; 00281 00283 int numAuctionsAwaitingResult() const; 00284 00286 Json::Value getStats() const; 00287 00289 Json::Value getAgentInfo(const std::string & agent) const; 00290 00292 Json::Value getAllAgentInfo() const; 00293 00296 Json::Value getAccountInfo(const AccountKey & account) const; 00297 00299 void setGlobalBidProbability(double val) { globalBidProbability = val; } 00300 00304 void setBidsErrorRate(double val) { bidsErrorRate = val; } 00305 00309 void setBudgetErrorRate(double val) { budgetErrorRate = val; } 00310 00314 boost::function<void (double)> acceptAuctionProbabilityFn; 00315 00317 void setAcceptAuctionProbability(double val) 00318 { 00319 if (acceptAuctionProbabilityFn) 00320 acceptAuctionProbabilityFn(val); 00321 else if (val != 1.0) 00322 std::cerr << "warning: no way to change accept auction probability" << std::endl; 00323 } 00324 00326 virtual Json::Value getServiceStatus() const; 00327 00331 std::function<void (std::shared_ptr<Auction>, Id, Auction::Response)> 00332 onSubmittedAuction; 00333 00336 virtual void submitToPostAuctionService(std::shared_ptr<Auction> auction, 00337 Id auctionId, 00338 const Auction::Response & bid); 00339 00340 protected: 00341 // This thread contains the main router loop 00342 boost::scoped_ptr<boost::thread> runThread; 00343 00344 // This thread wakes up every now and again to run the destructors 00345 // on auction objects, which are expensive to destroy, so that they 00346 // don't have to run in the main loop 00347 boost::scoped_ptr<boost::thread> cleanupThread; 00348 00349 typedef std::recursive_mutex Lock; 00350 typedef std::unique_lock<Lock> Guard; 00351 00352 int shutdown_; 00353 00354 public: 00355 // Connection to the agents 00356 ZmqNamedClientBus agentEndpoint; 00357 00358 // Connection to the post auction loop 00359 ZmqNamedProxy postAuctionEndpoint; 00360 00361 void updateAllAgents(); 00362 00364 typedef std::map<std::string, AgentInfo> Agents; 00365 Agents agents; 00366 00367 ML::RingBufferSRMW<std::pair<std::string, std::shared_ptr<const AgentConfig> > > configBuffer; 00368 ML::RingBufferSRMW<std::shared_ptr<AugmentationInfo> > startBiddingBuffer; 00369 ML::RingBufferSRMW<std::shared_ptr<Auction> > submittedBuffer; 00370 ML::RingBufferSWMR<std::shared_ptr<Auction> > auctionGraveyard; 00371 00372 ML::Wakeup_Fd wakeupMainLoop; 00373 00374 AugmentationLoop augmentationLoop; 00375 Blacklist blacklist; 00376 00378 typedef TimeoutMap<Id, AuctionInfo> InFlight; 00379 InFlight inFlight; 00380 00382 AuctionInfo & 00383 addAuction(std::shared_ptr<Auction> auction, Date timeout); 00384 00385 DutyCycleEntry dutyCycleCurrent; 00386 std::vector<DutyCycleEntry> dutyCycleHistory; 00387 00388 void run(); 00389 00390 void handleAgentMessage(const std::vector<std::string> & message); 00391 00392 void checkDeadAgents(); 00393 00394 void checkExpiredAuctions(); 00395 00396 void returnErrorResponse(const std::vector<std::string> & message, 00397 const std::string & error); 00398 00399 void doShutdown(); 00400 00406 std::shared_ptr<AugmentationInfo> 00407 preprocessAuction(const std::shared_ptr<Auction> & auction); 00408 00412 void augmentAuction(const std::shared_ptr<AugmentationInfo> & info); 00413 00417 void doStartBidding(const std::vector<std::string> & message); 00418 00420 void doStartBidding(const std::shared_ptr<AugmentationInfo> & augInfo); 00421 00424 void doSubmitted(std::shared_ptr<Auction> auction); 00425 00426 //std::unordered_set<Id> recentlySubmitted; // DEBUG 00427 00429 void doBid(const std::vector<std::string> & message); 00430 00433 void doPong(int level, const std::vector<std::string> & message); 00434 00438 void sendPings(); 00439 00441 void doStats(const std::vector<std::string> & message); 00442 00444 void onNewAuction(std::shared_ptr<Auction> auction); 00445 00447 void onAuctionDone(std::shared_ptr<Auction> auction); 00448 00450 void doConfig(const std::string & agent, 00451 std::shared_ptr<const AgentConfig> config); 00452 00456 void unconfigure(const std::string & agent, const AgentConfig & config); 00457 00461 void configure(const std::string & agent, AgentConfig & config); 00462 00464 template<typename... Args> 00465 void sendAgentMessage(const std::string & agent, 00466 const std::string & messageType, 00467 const Date & date, 00468 Args... args) 00469 { 00470 agentEndpoint.sendMessage(agent, messageType, date, args...); 00471 } 00472 00474 void sendBidResponse(const std::string & agent, 00475 const AgentInfo & info, 00476 BidStatus status, 00477 Date timestamp, 00478 const std::string & message, 00479 const Id & auctionId, 00480 int spotNum = -1, 00481 Amount price = Amount(), 00482 const Auction * auction = 0, 00483 const std::string & bidData = "", 00484 const Json::Value & metadata = Json::Value(), 00485 const std::string & augmentationsStr = ""); 00486 00487 00488 mutable Lock lock; 00489 00490 std::shared_ptr<Banker> banker; 00491 00492 double secondsUntilLossAssumed_; 00493 double globalBidProbability; 00494 double bidsErrorRate; 00495 double budgetErrorRate; 00496 bool connectPostAuctionLoop; 00497 00498 00499 /*************************************************************************/ 00500 /* AGENT INTERACTIONS */ 00501 /*************************************************************************/ 00502 00504 AllAgentInfo * allAgents; 00505 00507 mutable GcLock allAgentsGc; 00508 00509 typedef std::function<void (const AgentInfoEntry & info)> OnAgentFn; 00511 void forEachAgent(const OnAgentFn & onAgent) const; 00512 00516 void forEachAccountAgent(const AccountKey & account, 00517 const OnAgentFn & onAgent) const; 00518 00522 AgentInfoEntry getAgentEntry(const std::string & agent) const; 00523 00527 AgentConfigurationListener configListener; 00528 00530 bool initialized; 00531 00533 std::vector<std::shared_ptr<ExchangeConnector> > exchanges; 00534 00535 /*************************************************************************/ 00536 /* EXCEPTIONS */ 00537 /*************************************************************************/ 00538 00542 void throwException(const std::string & key, const std::string & fmt, 00543 ...) __attribute__((__noreturn__)); 00544 00545 00546 /*************************************************************************/ 00547 /* SYSTEM LOGGING */ 00548 /*************************************************************************/ 00549 00551 template<typename... Args> 00552 void logRouterError(const std::string & function, 00553 const std::string & exception, 00554 Args... args) 00555 { 00556 logger.publish("ROUTERERROR", Date::now().print(5), 00557 function, exception, args...); 00558 recordHit("error.%s", function); 00559 } 00560 00561 00562 /*************************************************************************/ 00563 /* DATA LOGGING */ 00564 /*************************************************************************/ 00565 00567 template<typename... Args> 00568 void logMessage(const std::string & channel, Args... args) 00569 { 00570 using namespace std; 00571 //cerr << "********* logging message to " << channel << endl; 00572 logger.publish(channel, Date::now().print(5), args...); 00573 } 00574 00576 template<typename... Args> 00577 void logMessageNoTimestamp(const std::string & channel, Args... args) 00578 { 00579 using namespace std; 00580 //cerr << "********* logging message to " << channel << endl; 00581 logger.publish(channel, args...); 00582 } 00583 00584 /*************************************************************************/ 00585 /* DEBUGGING */ 00586 /*************************************************************************/ 00587 00588 void debugAuction(const Id & auction, const std::string & type, 00589 const std::vector<std::string> & args 00590 = std::vector<std::string>()) 00591 { 00592 if (JML_LIKELY(!doDebug)) return; 00593 debugAuctionImpl(auction, type, args); 00594 } 00595 00596 void debugAuctionImpl(const Id & auction, const std::string & type, 00597 const std::vector<std::string> & args); 00598 00599 void debugSpot(const Id & auction, 00600 const Id & spot, 00601 const std::string & type, 00602 const std::vector<std::string> & args 00603 = std::vector<std::string>()) 00604 { 00605 if (JML_LIKELY(!doDebug)) return; 00606 debugSpotImpl(auction, spot, type, args); 00607 } 00608 00609 void debugSpotImpl(const Id & auction, 00610 const Id & spot, 00611 const std::string & type, 00612 const std::vector<std::string> & args); 00613 00614 void expireDebugInfo(); 00615 00616 void dumpAuction(const Id & auction) const; 00617 void dumpSpot(const Id & auction, const Id & spot) const; 00618 00619 Date getCurrentTime() const { return Date::now(); } 00620 00621 ZmqNamedPublisher logger; 00622 00624 bool doDebug; 00625 00626 mutable ML::Spinlock debugLock; 00627 TimeoutMap<Id, AuctionDebugInfo> debugInfo; 00628 00629 uint64_t numAuctions; 00630 uint64_t numBids; 00631 uint64_t numNonEmptyBids; 00632 uint64_t numAuctionsWithBid; 00633 uint64_t numNoPotentialBidders; 00634 uint64_t numNoBidders; 00635 00636 /* Client connection to the Monitor, determines if we can process bid 00637 requests */ 00638 MonitorClient monitorClient; 00639 Date slowModeLastAuction; 00640 int slowModeCount; 00641 00642 /* MONITOR PROVIDER */ 00643 /* Post service health status to Monitor */ 00644 MonitorProviderClient monitorProviderClient; 00645 00646 /* MonitorProvider interface */ 00647 std::string getProviderName() const; 00648 Json::Value getProviderIndicators() const; 00649 }; 00650 00651 00652 } // namespace RTBKIT 00653 00654 00655 #endif /* __rtb__router_h__ */