RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* post_auction_loop.h -*- C++ -*- 00002 Jeremy Barnes, 30 May 2012 00003 Router post-auction loop. 00004 */ 00005 00006 #ifndef __router__post_auction_loop_h__ 00007 #define __router__post_auction_loop_h__ 00008 00009 #include "soa/service/service_base.h" 00010 #include "soa/service/pending_list.h" 00011 #include <unordered_map> 00012 #include "soa/service/message_loop.h" 00013 #include "soa/service/typed_message_channel.h" 00014 #include "rtbkit/common/auction.h" 00015 #include "rtbkit/common/auction_events.h" 00016 #include "soa/service/zmq_endpoint.h" 00017 #include "soa/service/zmq_message_router.h" 00018 #include "soa/service/zmq_named_pub_sub.h" 00019 #include "rtbkit/core/agent_configuration/agent_configuration_listener.h" 00020 #include "rtbkit/core/banker/banker.h" 00021 #include "rtbkit/core/monitor/monitor_provider.h" 00022 00023 namespace RTBKIT { 00024 00025 /*****************************************************************************/ 00026 /* SUBMISSION INFO */ 00027 /*****************************************************************************/ 00028 00034 struct SubmissionInfo { 00035 SubmissionInfo() 00036 : fromOldRouter(false) 00037 { 00038 } 00039 00040 std::shared_ptr<BidRequest> bidRequest; 00041 std::string bidRequestStr; 00042 std::string bidRequestStrFormat; 00043 JsonHolder augmentations; 00044 Auction::Response bid; 00045 bool fromOldRouter; 00046 00053 std::vector<std::shared_ptr<PostAuctionEvent> > earlyWinEvents; 00054 std::vector<std::shared_ptr<PostAuctionEvent> > earlyCampaignEvents; 00055 00056 std::string serializeToString() const; 00057 void reconstituteFromString(const std::string & str); 00058 }; 00059 00060 00061 /*****************************************************************************/ 00062 /* FINISHED INFO */ 00063 /*****************************************************************************/ 00064 00071 struct FinishedInfo { 00072 FinishedInfo() 00073 : fromOldRouter(false) 00074 { 00075 } 00076 00077 Date auctionTime; 00078 Id auctionId; 00079 Id adSpotId; 00080 int spotIndex; 00081 std::shared_ptr<BidRequest> bidRequest; 00082 std::string bidRequestStr; 00083 std::string bidRequestStrFormat; 00084 JsonHolder augmentations; 00085 std::set<Id> uids; 00086 00091 SegmentList visitChannels; 00092 00095 void addUids(const UserIds & toAdd) 00096 { 00097 for (auto it = toAdd.begin(), end = toAdd.end(); it != end; ++it) { 00098 auto jt = uids.find(it->second); 00099 if (jt != uids.end()) 00100 return; 00101 uids.insert(it->second); 00102 } 00103 } 00104 00105 Date bidTime; 00106 Auction::Response bid; 00107 Json::Value bidToJson() const; 00108 00109 bool hasWin() const { return winTime != Date(); } 00110 void setWin(Date winTime, BidStatus status, Amount winPrice, 00111 const std::string & winMeta) 00112 { 00113 if (hasWin()) 00114 throw ML::Exception("already has win"); 00115 this->winTime = winTime; 00116 this->reportedStatus = status; 00117 this->winPrice = winPrice; 00118 this->winMeta = winMeta; 00119 } 00120 00121 Date winTime; 00122 BidStatus reportedStatus; 00123 Amount winPrice; 00124 std::string winMeta; 00125 Json::Value winToJson() const; 00126 00127 CampaignEvents campaignEvents; 00128 00129 struct Visit { 00130 Date visitTime; 00131 SegmentList channels; 00132 std::string meta; 00133 00134 Json::Value toJson() const; 00135 void serialize(ML::DB::Store_Writer & store) const; 00136 void reconstitute(ML::DB::Store_Reader & store); 00137 }; 00138 00139 std::vector<Visit> visits; 00140 00142 void addVisit(Date visitTime, 00143 const std::string & visitMeta, 00144 const SegmentList & channels); 00145 00146 Json::Value visitsToJson() const; 00147 00148 Json::Value toJson() const; 00149 00150 bool fromOldRouter; 00151 00152 std::string serializeToString() const; 00153 void reconstituteFromString(const std::string & str); 00154 }; 00155 00156 00157 /*****************************************************************************/ 00158 /* POST AUCTION LOOP */ 00159 /*****************************************************************************/ 00160 00161 struct PostAuctionLoop : public ServiceBase, public MonitorProvider 00162 { 00163 00164 PostAuctionLoop(ServiceBase & parent, 00165 const std::string & serviceName); 00166 PostAuctionLoop(std::shared_ptr<ServiceProxies> proxies, 00167 const std::string & serviceName); 00168 00169 ~PostAuctionLoop() 00170 { 00171 shutdown(); 00172 } 00173 00174 std::shared_ptr<Banker> getBanker() const 00175 { 00176 return banker; 00177 } 00178 00179 void setBanker(const std::shared_ptr<Banker> & newBanker) 00180 { 00181 banker = newBanker; 00182 } 00183 00184 std::shared_ptr<Banker> banker; 00185 00186 /* ROUTERSHARED */ 00187 uint64_t numWins; 00188 uint64_t numLosses; 00189 uint64_t numCampaignEvents; 00190 00191 /* /ROUTERSHARED */ 00192 00193 /* ROUTERBASE */ 00194 /************************************************************************/ 00195 /* EXCEPTIONS */ 00196 /************************************************************************/ 00197 00201 void throwException(const std::string & key, const std::string & fmt, 00202 ...) __attribute__((__noreturn__)); 00203 00204 /************************************************************************/ 00205 /* LOGGING */ 00206 /************************************************************************/ 00207 00208 ZmqNamedPublisher logger; 00209 00211 template<typename... Args> 00212 void logMessage(const std::string & channel, Args... args) 00213 { 00214 using namespace std; 00215 //cerr << "********* logging message to " << channel << endl; 00216 logger.publish(channel, Date::now().print(5), args...); 00217 } 00218 00220 template<typename... Args> 00221 void logPAError(const std::string & function, 00222 const std::string & exception, 00223 Args... args) 00224 { 00225 logger.publish("PAERROR", Date::now().print(5), 00226 function, exception, args...); 00227 recordHit("error.%s", function); 00228 } 00229 00230 /* /ROUTERBASE */ 00231 00234 void bindTcp(); 00235 00236 void init(); 00237 00238 void start(std::function<void ()> onStop = std::function<void ()>()); 00239 00240 void shutdown(); 00241 00242 size_t numAwaitingWinLoss() const 00243 { 00244 return submitted.size(); 00245 } 00246 00247 size_t numFinishedAuctionsTracked() const 00248 { 00249 return finished.size(); 00250 } 00251 00261 void initStatePersistence(const std::string & path); 00262 00264 virtual Json::Value getServiceStatus() const; 00265 00271 void injectSubmittedAuction(const Id & auctionId, 00272 const Id & adSpotId, 00273 std::shared_ptr<BidRequest> bidRequest, 00274 const std::string & bidRequestStr, 00275 const std::string & bidRequestStrFormat, 00276 const JsonHolder & augmentations, 00277 const Auction::Response & bidResponse, 00278 Date lossTimeout); 00279 00282 void injectWin(const Id & auctionId, 00283 const Id & adspot, 00284 Amount winPrice, 00285 Date timestamp, 00286 const JsonHolder & winMeta, 00287 const UserIds & ids, 00288 const AccountKey & account, 00289 Date bidTimestamp); 00290 00295 void injectLoss(const Id & auctionId, 00296 const Id & adspot, 00297 Date timestamp, 00298 const JsonHolder & lossMeta, 00299 const AccountKey & account, 00300 Date bidTimestamp); 00301 00308 void injectCampaignEvent(const std::string & label, 00309 const Id & auctionId, 00310 const Id & adSpotId, 00311 Date timestamp, 00312 const JsonHolder & eventMeta, 00313 const UserIds & ids); 00314 00332 void notifyFinishedSpot(const Id & auctionId, const Id & adSpotId); 00333 00334 /* Post service health status to Monitor */ 00335 MonitorProviderClient monitorProviderClient; 00336 00337 /* MonitorProvider interface */ 00338 std::string getProviderName() const; 00339 Json::Value getProviderIndicators() const; 00340 00341 Date lastWinLoss; 00342 Date lastCampaignEvent; 00343 00344 private: 00348 void initConnections(); 00349 00351 void doAuction(const SubmittedAuctionEvent & event); 00352 00354 void doEvent(const std::shared_ptr<PostAuctionEvent> & event); 00355 00357 void doAuctionMessage(const std::vector<std::string> & message); 00358 00360 void doWinMessage(const std::vector<std::string> & message); 00361 00363 void doLossMessage(const std::vector<std::string> & message); 00364 00367 void doCampaignEventMessage(const std::vector<std::string> & message); 00368 00370 void checkExpiredAuctions(); 00371 00375 void doWinLoss(const std::shared_ptr<PostAuctionEvent> & event, 00376 bool isReplay); 00377 00379 void doSubmitted(const std::shared_ptr<PostAuctionEvent> & event); 00380 00382 void doCampaignEvent(const std::shared_ptr<PostAuctionEvent> & event); 00383 00385 bool routePostAuctionEvent(const std::string & label, 00386 const FinishedInfo & finished, 00387 const SegmentList & channels, 00388 bool filterChannels); 00389 00391 void doBidResult(const Id & auctionId, 00392 const Id & adSpotId, 00393 const SubmissionInfo & submission, 00394 Amount price, 00395 Date timestamp, 00396 BidStatus status, 00397 const std::string & confidence, 00398 const std::string & winLossMeta, 00399 const UserIds & uids); 00400 00408 typedef PendingList<std::pair<Id, Id>, 00409 SubmissionInfo> Submitted; 00410 Submitted submitted; 00411 00420 typedef PendingList<std::pair<Id, Id>, FinishedInfo> Finished; 00421 Finished finished; 00422 00424 MessageLoop loop; 00425 00427 TypedMessageSink<SubmittedAuctionEvent> auctions; 00428 00430 TypedMessageSink<std::shared_ptr<PostAuctionEvent> > events; 00431 00433 ZmqNamedEndpoint endpoint; 00434 00437 ZmqMessageRouter router; 00438 00440 ZmqNamedClientBus toAgents; 00441 00443 template<typename... Args> 00444 void sendAgentMessage(const std::string & agent, 00445 const std::string & messageType, 00446 const Date & date, 00447 Args... args) 00448 { 00449 toAgents.sendMessage(agent, messageType, date, 00450 std::forward<Args>(args)...); 00451 } 00452 00454 static std::string makeBidId(Id auctionId, Id spotId, const std::string & agent); 00455 00456 AgentConfigurationListener configListener; 00457 }; 00458 00459 00460 } // namespace RTBKIT 00461 00462 00463 #endif /* __router__post_auction_loop_h__ */