RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* post_auction_loop.h -*- C++ -*- 00002 Jeremy Barnes, 31 May 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Loop for post-auction processing. 00006 */ 00007 00008 #include <string> 00009 #include <sstream> 00010 #include <iostream> 00011 #include "rtbkit/core/agent_configuration/agent_config.h" 00012 #include "jml/utils/pair_utils.h" 00013 #include "jml/arch/futex.h" 00014 #include "jml/db/persistent.h" 00015 #include "rtbkit/core/banker/banker.h" 00016 #include "rtbkit/common/messages.h" 00017 #include "rtbkit/common/auction_events.h" 00018 00019 #include "post_auction_loop.h" 00020 00021 using namespace std; 00022 using namespace ML; 00023 00024 00025 namespace RTBKIT { 00026 00027 00028 /*****************************************************************************/ 00029 /* SUBMISSION INFO */ 00030 /*****************************************************************************/ 00031 00032 std::string 00033 SubmissionInfo:: 00034 serializeToString() const 00035 { 00036 ostringstream stream; 00037 ML::DB::Store_Writer writer(stream); 00038 int version = 5; 00039 writer << version 00040 << bidRequestStr 00041 << bidRequestStrFormat 00042 << augmentations.toString() 00043 << earlyWinEvents 00044 << earlyCampaignEvents; 00045 bid.serialize(writer); 00046 return stream.str(); 00047 } 00048 00049 void 00050 SubmissionInfo:: 00051 reconstituteFromString(const std::string & str) 00052 { 00053 istringstream stream(str); 00054 ML::DB::Store_Reader store(stream); 00055 int version; 00056 store >> version; 00057 if (version < 1 || version > 5) 00058 throw ML::Exception("bad version %d", version); 00059 store >> bidRequestStr; 00060 if (version == 5) 00061 { 00062 store >> bidRequestStrFormat ; 00063 } 00064 if (version > 1) { 00065 string s; 00066 store >> s; 00067 augmentations = s; 00068 } 00069 else augmentations.clear(); 00070 if (version == 3) { 00071 vector<vector<string> > msg1, msg2; 00072 store >> msg1 >> msg2; 00073 if (!msg1.empty() || !msg2.empty()) 00074 cerr << "warning: discarding early events from old format" 00075 << endl; 00076 earlyWinEvents.clear(); 00077 earlyCampaignEvents.clear(); 00078 } 00079 else if (version > 3) { 00080 store >> earlyWinEvents >> earlyCampaignEvents; 00081 } 00082 else { 00083 earlyWinEvents.clear(); 00084 earlyCampaignEvents.clear(); 00085 } 00086 bid.reconstitute(store); 00087 00088 if (bidRequestStr != "") 00089 bidRequest.reset(BidRequest::parse(bidRequestStrFormat, bidRequestStr)); 00090 else bidRequest.reset(); 00091 } 00092 00093 00094 /*****************************************************************************/ 00095 /* FINISHED INFO */ 00096 /*****************************************************************************/ 00097 00098 Json::Value 00099 FinishedInfo:: 00100 bidToJson() const 00101 { 00102 Json::Value result = bid.toJson(); 00103 result["timestamp"] = bidTime.secondsSinceEpoch(); 00104 return result; 00105 } 00106 00107 Json::Value 00108 FinishedInfo:: 00109 winToJson() const 00110 { 00111 Json::Value result; 00112 if (!hasWin()) return result; 00113 00114 result["timestamp"] = winTime.secondsSinceEpoch(); 00115 result["reportedStatus"] = (reportedStatus == BS_WIN ? "WIN" : "LOSS"); 00116 result["winPrice"] = winPrice.toJson(); 00117 result["meta"] = winMeta; 00118 00119 return result; 00120 } 00121 00122 void 00123 FinishedInfo:: 00124 addVisit(Date visitTime, 00125 const std::string & visitMeta, 00126 const SegmentList & channels) 00127 { 00128 Visit visit; 00129 visit.visitTime = visitTime; 00130 visit.channels = channels; 00131 visit.meta = visitMeta; 00132 visits.push_back(visit); 00133 } 00134 00135 Json::Value 00136 FinishedInfo:: 00137 visitsToJson() const 00138 { 00139 Json::Value result; 00140 for (unsigned i = 0; i < visits.size(); ++i) { 00141 Json::Value & v = result[i]; 00142 const Visit & visit = visits[i]; 00143 v["timestamp"] = visit.visitTime.secondsSinceEpoch(); 00144 v["meta"] = visit.meta; 00145 v["channels"] = visit.channels.toJson(); 00146 } 00147 return result; 00148 } 00149 00150 Json::Value 00151 FinishedInfo:: 00152 toJson() const 00153 { 00154 throw ML::Exception("FinishedInfo::toJson()"); 00155 Json::Value result; 00156 return result; 00157 } 00158 00159 void 00160 FinishedInfo::Visit:: 00161 serialize(DB::Store_Writer & store) const 00162 { 00163 unsigned char version = 1; 00164 store << version << visitTime << channels << meta; 00165 } 00166 00167 void 00168 FinishedInfo::Visit:: 00169 reconstitute(DB::Store_Reader & store) 00170 { 00171 unsigned char version; 00172 store >> version; 00173 if (version != 1) 00174 throw ML::Exception("invalid version"); 00175 store >> visitTime >> channels >> meta; 00176 } 00177 00178 IMPL_SERIALIZE_RECONSTITUTE(FinishedInfo::Visit); 00179 00180 std::string 00181 FinishedInfo:: 00182 serializeToString() const 00183 { 00184 ostringstream stream; 00185 ML::DB::Store_Writer writer(stream); 00186 int version = 6; 00187 writer << version 00188 << auctionTime << auctionId << adSpotId 00189 << bidRequestStr << bidTime <<bidRequestStrFormat; 00190 bid.serialize(writer); 00191 writer << winTime 00192 << reportedStatus << winPrice << winMeta; 00193 writer << campaignEvents; 00194 writer << fromOldRouter 00195 << augmentations.toString(); 00196 writer << visitChannels << uids << visits; 00197 00198 return stream.str(); 00199 } 00200 00201 void 00202 FinishedInfo:: 00203 reconstituteFromString(const std::string & str) 00204 { 00205 istringstream stream(str); 00206 ML::DB::Store_Reader store(stream); 00207 int version, istatus; 00208 store >> version; 00209 if (version > 6) 00210 throw ML::Exception("bad version %d", version); 00211 if (version < 6) 00212 throw ML::Exception("version %d no longer supported", version); 00213 00214 string auctionIdStr, adSpotIdStr; 00215 00216 store >> auctionTime >> auctionId >> adSpotId 00217 >> bidRequestStr >> bidTime; 00218 bid.reconstitute(store); 00219 00220 store >> winTime >> istatus >> winPrice >> winMeta; 00221 store >> campaignEvents; 00222 store >> fromOldRouter; 00223 00224 if (version > 1) { 00225 string s; 00226 store >> s; 00227 augmentations = s; 00228 } 00229 else augmentations.clear(); 00230 00231 if (version > 2) { 00232 store >> visitChannels >> uids >> visits; 00233 } 00234 00235 reportedStatus = (BidStatus)istatus; 00236 00237 bidRequest.reset(BidRequest::parse(bidRequestStrFormat, bidRequestStr)); 00238 } 00239 00240 00241 /*****************************************************************************/ 00242 /* POST AUCTION LOOP */ 00243 /*****************************************************************************/ 00244 00245 PostAuctionLoop:: 00246 PostAuctionLoop(std::shared_ptr<ServiceProxies> proxies, 00247 const std::string & serviceName) 00248 : ServiceBase(serviceName, proxies), 00249 logger(getZmqContext()), 00250 monitorProviderClient(getZmqContext(), *this), 00251 auctions(65536), 00252 events(65536), 00253 endpoint(getZmqContext()), 00254 router(!!getZmqContext()), 00255 toAgents(getZmqContext()), 00256 configListener(getZmqContext()) 00257 { 00258 } 00259 00260 PostAuctionLoop:: 00261 PostAuctionLoop(ServiceBase & parent, 00262 const std::string & serviceName) 00263 : ServiceBase(serviceName, parent), 00264 logger(getZmqContext()), 00265 monitorProviderClient(getZmqContext(), *this), 00266 auctions(65536), 00267 events(65536), 00268 endpoint(getZmqContext()), 00269 router(!!getZmqContext()), 00270 toAgents(getZmqContext()), 00271 configListener(getZmqContext()) 00272 { 00273 } 00274 00275 void 00276 PostAuctionLoop:: 00277 init() 00278 { 00279 initConnections(); 00280 monitorProviderClient.init(getServices()->config); 00281 } 00282 00283 void 00284 PostAuctionLoop:: 00285 initConnections() 00286 { 00287 registerServiceProvider(serviceName(), { "rtbPostAuctionService" }); 00288 00289 cerr << "post auction logger on " << serviceName() + "/logger" << endl; 00290 logger.init(getServices()->config, serviceName() + "/logger"); 00291 00292 auctions.onEvent = std::bind<void>(&PostAuctionLoop::doAuction, this, 00293 std::placeholders::_1); 00294 events.onEvent = std::bind<void>(&PostAuctionLoop::doEvent, this, 00295 std::placeholders::_1); 00296 toAgents.clientMessageHandler = [&] (const std::vector<std::string> & msg) 00297 { 00298 // Clients should never send the post auction service anything, 00299 // but we catch it here just in case 00300 cerr << "PostAuctionLoop got agent message " << msg << endl; 00301 }; 00302 00303 router.bind("AUCTION", 00304 std::bind(&PostAuctionLoop::doAuctionMessage, this, 00305 std::placeholders::_1)); 00306 router.bind("WIN", 00307 std::bind(&PostAuctionLoop::doWinMessage, this, 00308 std::placeholders::_1)); 00309 router.bind("LOSS", 00310 std::bind(&PostAuctionLoop::doLossMessage, this, 00311 std::placeholders::_1)); 00312 router.bind("EVENT", 00313 std::bind(&PostAuctionLoop::doCampaignEventMessage, this, 00314 std::placeholders::_1)); 00315 00316 // Every second we check for expired auctions 00317 loop.addPeriodic("PostAuctionLoop::checkExpiredAuctions", 1.0, 00318 std::bind<void>(&PostAuctionLoop::checkExpiredAuctions, 00319 this)); 00320 00321 // Initialize zeromq endpoints 00322 endpoint.init(getServices()->config, ZMQ_XREP, serviceName() + "/events"); 00323 toAgents.init(getServices()->config, serviceName() + "/agents"); 00324 configListener.init(getServices()->config); 00325 endpoint.messageHandler 00326 = std::bind(&ZmqMessageRouter::handleMessage, 00327 &router, 00328 std::placeholders::_1); 00329 00330 loop.addSource("PostAuctionLoop::auctions", auctions); 00331 loop.addSource("PostAuctionLoop::events", events); 00332 00333 loop.addSource("PostAuctionLoop::endpoint", endpoint); 00334 00335 loop.addSource("PostAuctionLoop::toAgents", toAgents); 00336 loop.addSource("PostAuctionLoop::configListener", configListener); 00337 loop.addSource("PostAuctionLoop::logger", logger); 00338 } 00339 00340 void 00341 PostAuctionLoop:: 00342 bindTcp() 00343 { 00344 logger.bindTcp(getServices()->ports->getRange("logs")); 00345 endpoint.bindTcp(getServices()->ports->getRange("postAuctionLoop")); 00346 toAgents.bindTcp(getServices()->ports->getRange("postAuctionLoopAgents")); 00347 } 00348 00349 void 00350 PostAuctionLoop:: 00351 start(std::function<void ()> onStop) 00352 { 00353 loop.start(onStop); 00354 monitorProviderClient.start(); 00355 } 00356 00357 void 00358 PostAuctionLoop:: 00359 shutdown() 00360 { 00361 loop.shutdown(); 00362 logger.shutdown(); 00363 toAgents.shutdown(); 00364 endpoint.shutdown(); 00365 configListener.shutdown(); 00366 monitorProviderClient.shutdown(); 00367 } 00368 00369 Json::Value 00370 PostAuctionLoop:: 00371 getServiceStatus() const 00372 { 00373 return Json::Value(); 00374 } 00375 00376 void 00377 PostAuctionLoop:: 00378 throwException(const std::string & key, const std::string & fmt, ...) 00379 { 00380 recordHit("error.exception"); 00381 recordHit("error.exception.%s", key); 00382 00383 string message; 00384 va_list ap; 00385 va_start(ap, fmt); 00386 try { 00387 message = vformat(fmt.c_str(), ap); 00388 va_end(ap); 00389 } 00390 catch (...) { 00391 va_end(ap); 00392 throw; 00393 } 00394 00395 logPAError("exception", key, message); 00396 throw ML::Exception("Router Exception: " + key + ": " + message); 00397 } 00398 00399 void 00400 PostAuctionLoop:: 00401 injectWin(const Id & auctionId, 00402 const Id & adSpotId, 00403 Amount winPrice, 00404 Date timestamp, 00405 const JsonHolder & winMeta, 00406 const UserIds & uids, 00407 const AccountKey & account, 00408 Date bidTimestamp) 00409 { 00410 auto event = std::make_shared<PostAuctionEvent>(); 00411 event->type = PAE_WIN; 00412 event->auctionId = auctionId; 00413 event->adSpotId = adSpotId; 00414 event->timestamp = timestamp; 00415 event->winPrice = winPrice; 00416 event->metadata = winMeta; 00417 event->uids = uids; 00418 event->account = account; 00419 event->bidTimestamp = bidTimestamp; 00420 00421 events.push(event); 00422 } 00423 00424 void 00425 PostAuctionLoop:: 00426 injectLoss(const Id & auctionId, 00427 const Id & adSpotId, 00428 Date timestamp, 00429 const JsonHolder & json, 00430 const AccountKey & account, 00431 Date bidTimestamp) 00432 { 00433 //cerr << "injecting loss for " << auctionId << endl; 00434 00435 if (timestamp == Date()) 00436 timestamp = Date::now(); 00437 00438 auto event = std::make_shared<PostAuctionEvent>(); 00439 event->type = PAE_LOSS; 00440 event->auctionId = auctionId; 00441 event->adSpotId = adSpotId; 00442 event->timestamp = timestamp; 00443 event->winPrice = Amount(); 00444 event->account = account; 00445 event->bidTimestamp = bidTimestamp; 00446 00447 events.push(event); 00448 } 00449 00450 void 00451 PostAuctionLoop:: 00452 injectCampaignEvent(const string & label, 00453 const Id & auctionId, 00454 const Id & adSpotId, 00455 Date timestamp, 00456 const JsonHolder & impressionMeta, 00457 const UserIds & uids) 00458 { 00459 auto event = std::make_shared<PostAuctionEvent>(); 00460 event->type = PAE_CAMPAIGN_EVENT; 00461 event->label = label; 00462 event->auctionId = auctionId; 00463 event->adSpotId = adSpotId; 00464 event->timestamp = timestamp; 00465 event->metadata = impressionMeta; 00466 event->uids = uids; 00467 00468 events.push(event); 00469 } 00470 00471 00472 void 00473 PostAuctionLoop:: 00474 doAuctionMessage(const std::vector<std::string> & message) 00475 { 00476 recordHit("messages.AUCTION"); 00477 //cerr << "doAuctionMessage " << message << endl; 00478 00479 SubmittedAuctionEvent event 00480 = ML::DB::reconstituteFromString<SubmittedAuctionEvent>(message.at(2)); 00481 doAuction(event); 00482 } 00483 00484 void 00485 PostAuctionLoop:: 00486 doWinMessage(const std::vector<std::string> & message) 00487 { 00488 recordHit("messages.WIN"); 00489 auto event = std::make_shared<PostAuctionEvent> 00490 (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2))); 00491 doWinLoss(event, false /* replay */); 00492 } 00493 00494 void 00495 PostAuctionLoop:: 00496 doLossMessage(const std::vector<std::string> & message) 00497 { 00498 recordHit("messages.LOSS"); 00499 auto event = std::make_shared<PostAuctionEvent> 00500 (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2))); 00501 doWinLoss(event, false /* replay */); 00502 } 00503 00504 void 00505 PostAuctionLoop:: 00506 doCampaignEventMessage(const std::vector<std::string> & message) 00507 { 00508 auto event = std::make_shared<PostAuctionEvent> 00509 (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2))); 00510 recordHit("messages.EVENT." + event->label); 00511 doCampaignEvent(event); 00512 } 00513 00514 namespace { 00515 00516 std::pair<Id, Id> 00517 unstringifyPair(const std::string & str) 00518 { 00519 istringstream stream(str); 00520 DB::Store_Reader store(stream); 00521 pair<Id, Id> result; 00522 store >> result.first >> result.second; 00523 return result; 00524 } 00525 00526 std::string stringifyPair(const std::pair<Id, Id> & vals) 00527 { 00528 if (!vals.second || vals.second.type == Id::NULLID) 00529 throw ML::Exception("attempt to store null ID"); 00530 00531 ostringstream stream; 00532 { 00533 DB::Store_Writer store(stream); 00534 store << vals.first << vals.second; 00535 } 00536 00537 return stream.str(); 00538 } 00539 00540 } // file scope 00541 00542 void 00543 PostAuctionLoop:: 00544 initStatePersistence(const std::string & path) 00545 { 00546 typedef PendingPersistenceT<pair<Id, Id>, SubmissionInfo> 00547 SubmittedPending; 00548 00549 auto submittedDb = std::make_shared<LeveldbPendingPersistence>(); 00550 submittedDb->open(path + "/submitted"); 00551 00552 auto submittedPersistence 00553 = std::make_shared<SubmittedPending>(); 00554 submittedPersistence->store = submittedDb; 00555 00556 auto stringifySubmissionInfo = [] (const SubmissionInfo & info) 00557 { 00558 return info.serializeToString(); 00559 }; 00560 00561 auto unstringifySubmissionInfo = [] (const std::string & str) 00562 { 00563 SubmissionInfo info; 00564 info.reconstituteFromString(str); 00565 return info; 00566 }; 00567 00568 submittedPersistence->stringifyKey = stringifyPair; 00569 submittedPersistence->unstringifyKey = unstringifyPair; 00570 submittedPersistence->stringifyValue = stringifySubmissionInfo; 00571 submittedPersistence->unstringifyValue = unstringifySubmissionInfo; 00572 00573 Date newTimeout = Date::now().plusSeconds(15); 00574 00575 auto acceptSubmitted = [&] (pair<Id, Id> & key, 00576 SubmissionInfo & info, 00577 Date & timeout) -> bool 00578 { 00579 info.fromOldRouter = true; 00580 newTimeout.addSeconds(0.001); 00581 timeout = newTimeout; 00582 // this->debugSpot(key.first, key.second, "RECONST SUBMITTED"); 00583 return true; 00584 }; 00585 00586 submitted.initFromStore(submittedPersistence, 00587 acceptSubmitted, 00588 Date::now().plusSeconds(15)); 00589 00590 typedef PendingPersistenceT<pair<Id, Id>, FinishedInfo> 00591 FinishedPending; 00592 00593 auto finishedDb = std::make_shared<LeveldbPendingPersistence>(); 00594 finishedDb->open(path + "/finished"); 00595 00596 auto finishedPersistence 00597 = std::make_shared<FinishedPending>(); 00598 finishedPersistence->store = finishedDb; 00599 00600 auto stringifyFinishedInfo = [] (const FinishedInfo & info) 00601 { 00602 return info.serializeToString(); 00603 }; 00604 00605 auto unstringifyFinishedInfo = [] (const std::string & str) 00606 { 00607 FinishedInfo info; 00608 info.reconstituteFromString(str); 00609 return info; 00610 }; 00611 00612 finishedPersistence->stringifyKey = stringifyPair; 00613 finishedPersistence->unstringifyKey = unstringifyPair; 00614 finishedPersistence->stringifyValue = stringifyFinishedInfo; 00615 finishedPersistence->unstringifyValue = unstringifyFinishedInfo; 00616 00617 newTimeout = Date::now().plusSeconds(900); 00618 00619 auto acceptFinished = [&] (pair<Id, Id> & key, 00620 FinishedInfo & info, 00621 Date & timeout) -> bool 00622 { 00623 info.fromOldRouter = true; 00624 newTimeout.addSeconds(0.001); 00625 timeout = newTimeout; 00626 // this->debugSpot(key.first, key.second, "RECONST FINISHED"); 00627 00628 return true; 00629 }; 00630 00631 finished.initFromStore(finishedPersistence, 00632 acceptFinished, 00633 Date::now().plusSeconds(900)); 00634 00635 auto backgroundWork = [=] (volatile int & shutdown, int64_t threadId) 00636 { 00637 while (!shutdown) { 00638 futex_wait(const_cast<int &>(shutdown), 0, 600.0); 00639 if (shutdown) break; 00640 //continue; 00641 00642 { 00643 Date start = Date::now(); 00644 submittedDb->compact(); 00645 Date end = Date::now(); 00646 this->recordEvent("persistentData.submitted.compactTimeMs", 00647 ET_OUTCOME, 00648 1000.0 * (end.secondsSince(start))); 00649 uint64_t size = submittedDb->getDbSize(); 00650 //cerr << "submitted db is " << size / 1024.0 / 1024.0 00651 // << "MB" << endl; 00652 this->recordEvent("persistentData.submitted.dbSizeMb", 00653 ET_LEVEL, size / 1024.0 / 1024.0); 00654 } 00655 00656 { 00657 Date start = Date::now(); 00658 finishedDb->compact(); 00659 Date end = Date::now(); 00660 this->recordEvent("persistentData.finished.compactTimeMs", 00661 ET_OUTCOME, 00662 1000.0 * (end.secondsSince(start))); 00663 uint64_t size = finishedDb->getDbSize(); 00664 //cerr << "finished db is " << size / 1024.0 / 1024.0 00665 // << "MB" << endl; 00666 this->recordEvent("persistentData.finished.dbSizeMb", 00667 ET_LEVEL, size / 1024.0 / 1024.0); 00668 } 00669 } 00670 00671 cerr << "exiting background work thread" << endl; 00672 }; 00673 00674 loop.startSubordinateThread(backgroundWork); 00675 } 00676 00677 00678 void 00679 PostAuctionLoop:: 00680 checkExpiredAuctions() 00681 { 00682 Date start = Date::now(); 00683 00684 { 00685 cerr << " checking " << submitted.size() 00686 << " submitted auctions for inferred loss" << endl; 00687 00688 00689 //RouterProfiler profiler(this, dutyCycleCurrent.nsExpireSubmitted); 00690 00691 00692 auto onExpiredSubmitted = [&] (const pair<Id, Id> & key, 00693 const SubmissionInfo & info) 00694 { 00695 //RouterProfiler profiler(this, dutyCycleCurrent.nsOnExpireSubmitted); 00696 00697 const Id & auctionId = key.first; 00698 const Id & adSpotId = key.second; 00699 00700 recordHit("submittedAuctionExpiry"); 00701 00702 if (!info.bidRequest) { 00703 recordHit("submittedAuctionExpiryWithoutBid"); 00704 //cerr << "expired with no bid request" << endl; 00705 // this->debugSpot(auctionId, adSpotId, "EXPIRED SPOT NO BR", {}); 00706 00707 // this->dumpSpot(auctionId, adSpotId); 00708 return Date(); 00709 } 00710 00711 // this->debugSpot(auctionId, adSpotId, "EXPIRED SPOT", {}); 00712 00713 //cerr << "onExpiredSubmitted " << key << endl; 00714 try { 00715 this->doBidResult(auctionId, adSpotId, info, Amount() /* price */, 00716 start /* date */, BS_LOSS, "inferred", 00717 "null", UserIds()); 00718 } catch (const std::exception & exc) { 00719 cerr << "error handling expired loss auction: " << exc.what() 00720 << endl; 00721 this->logPAError("checkExpiredAuctions.loss", exc.what()); 00722 } 00723 00724 return Date(); 00725 }; 00726 00727 submitted.expire(onExpiredSubmitted, start); 00728 } 00729 00730 { 00731 cerr << " checking " << finished.size() 00732 << " finished auctions for expiry" << endl; 00733 00734 //RouterProfiler profiler(this, dutyCycleCurrent.nsExpireFinished); 00735 00736 auto onExpiredFinished = [&] (const pair<Id, Id> & key, 00737 const FinishedInfo & info) 00738 { 00739 recordHit("finishedAuctionExpiry"); 00740 00741 // this->debugSpot(key.first, key.second, "EXPIRED FINISHED", {}); 00742 00743 return Date(); 00744 }; 00745 00746 finished.expire(onExpiredFinished); 00747 } 00748 00749 banker->logBidEvents(*this); 00750 } 00751 00752 void 00753 PostAuctionLoop:: 00754 doAuction(const SubmittedAuctionEvent & event) 00755 { 00756 try { 00757 recordHit("processedAuction"); 00758 00759 const Id & auctionId = event.auctionId; 00760 00761 //cerr << "doAuction for " << auctionId << endl; 00762 00763 Date lossTimeout = event.lossTimeout; 00764 00765 // move the auction over to the submitted bid pipeline... 00766 auto key = make_pair(auctionId, event.adSpotId); 00767 00768 SubmissionInfo submission; 00769 vector<std::shared_ptr<PostAuctionEvent> > earlyWinEvents; 00770 if (submitted.count(key)) { 00771 submission = submitted.pop(key); 00772 earlyWinEvents.swap(submission.earlyWinEvents); 00773 recordHit("auctionAlreadySubmitted"); 00774 } 00775 00776 submission.bidRequest = std::move(event.bidRequest); 00777 submission.bidRequestStrFormat = std::move(event.bidRequestStrFormat); 00778 submission.bidRequestStr = std::move(event.bidRequestStr); 00779 submission.augmentations = std::move(event.augmentations); 00780 submission.bid = std::move(event.bidResponse); 00781 00782 submitted.insert(key, submission, lossTimeout); 00783 00784 string transId = makeBidId(auctionId, event.adSpotId, event.bidResponse.agent); 00785 banker->attachBid(event.bidResponse.account, 00786 transId, 00787 event.bidResponse.price.maxPrice); 00788 00789 #if 0 00790 //cerr << "submitted " << auctionId << "; now " << submitted.size() 00791 // << " auctions submitted" << endl; 00792 00793 // Add to awaiting result list 00794 if (!agents.count(submission.bid.agent)) { 00795 logPAError("doSubmitted.unknownAgentWonAuction", 00796 "unknown agent won auction"); 00797 continue; 00798 } 00799 agents[submission.bid.agent].awaitingResult 00800 .insert(make_pair(auctionId, adSpotId)); 00801 #endif 00802 00803 /* Replay any early win/loss events. */ 00804 for (auto it = earlyWinEvents.begin(), 00805 end = earlyWinEvents.end(); 00806 it != end; ++it) { 00807 recordHit("replayedEarlyWinEvent"); 00808 //cerr << "replaying early win message" << endl; 00809 doWinLoss(*it, true /* is_replay */); 00810 } 00811 } catch (const std::exception & exc) { 00812 cerr << "doAuction ignored error handling auction: " 00813 << exc.what() << endl; 00814 } 00815 } 00816 00817 void 00818 PostAuctionLoop:: 00819 doEvent(const std::shared_ptr<PostAuctionEvent> & event) 00820 { 00821 //cerr << "!!!PostAuctionLoop::doEvent:got post auction event " << 00822 //print(event->type) << endl; 00823 00824 try { 00825 switch (event->type) { 00826 case PAE_WIN: 00827 case PAE_LOSS: 00828 doWinLoss(event, false); 00829 break; 00830 case PAE_CAMPAIGN_EVENT: 00831 doCampaignEvent(event); 00832 break; 00833 default: 00834 throw Exception("postAuctionLoop.unknownEventType", 00835 "unknown event type (%d)", event->type); 00836 } 00837 } catch (const std::exception & exc) { 00838 cerr << "doEvent " << print(event->type) << " threw: " 00839 << exc.what() << endl; 00840 } 00841 00842 //cerr << "finished with event " << print(event->type) << endl; 00843 } 00844 00845 void 00846 PostAuctionLoop:: 00847 doWinLoss(const std::shared_ptr<PostAuctionEvent> & event, bool isReplay) 00848 { 00849 lastWinLoss = Date::now(); 00850 00851 #if 0 00852 static Date dbg_ts; 00853 00854 if (!dbg_ts.secondsSinceEpoch()) dbg_ts = lastWinLoss; 00855 00856 if (lastWinLoss > dbg_ts.plusSeconds(0.2)) { 00857 cerr << "WIN_RECEIVED: " << dbg_ts.printClassic() << endl; 00858 dbg_ts = Date::now(); 00859 } 00860 #endif 00861 00862 BidStatus status; 00863 if (event->type == PAE_WIN) { 00864 ML::atomic_inc(numWins); 00865 status = BS_WIN; 00866 recordHit("processedWin"); 00867 } 00868 else { 00869 status = BS_LOSS; 00870 ML::atomic_inc(numLosses); 00871 recordHit("processedLoss"); 00872 } 00873 00874 const char * typeStr = print(event->type); 00875 00876 if (!isReplay) 00877 recordHit("bidResult.%s.messagesReceived", typeStr); 00878 else 00879 recordHit("bidResult.%s.messagesReplayed", typeStr); 00880 00881 //cerr << "doWinLoss 1" << endl; 00882 00883 // cerr << "doWin" << message << endl; 00884 const Id & auctionId = event->auctionId; 00885 const Id & adSpotId = event->adSpotId; 00886 Amount winPrice = event->winPrice; 00887 Date timestamp = event->timestamp; 00888 const JsonHolder & meta = event->metadata; 00889 const UserIds & uids = event->uids; 00890 const AccountKey & account = event->account; 00891 if (account.size() == 0) { 00892 throw ML::Exception("invalid account key"); 00893 } 00894 00895 Date bidTimestamp = event->bidTimestamp; 00896 00897 // debugSpot(auctionId, adSpotId, typeStr); 00898 00899 auto getTimeGapMs = [&] () 00900 { 00901 return 1000.0 * Date::now().secondsSince(bidTimestamp); 00902 }; 00903 00904 /*cerr << "doWinLoss for " << auctionId << "-" << adSpotId 00905 << " " << typeStr 00906 << " submitted.size() = " << submitted.size() 00907 << endl; 00908 */ 00909 //cerr << " key = (" << auctionId << "," << adSpotId << ")" << endl; 00910 auto key = make_pair(auctionId, adSpotId); 00911 /* In this case, the auction is finished which means we've already either: 00912 a) received a WIN message (and this one is a duplicate); 00913 b) received no WIN message, timed out, and inferred a loss 00914 00915 Note that an auction is only removed when the last bidder has bid or 00916 timed out, and so an auction may be both inFlight and submitted or 00917 finished. 00918 */ 00919 if (finished.count(key)) { 00920 00921 //cerr << "doWinLoss in finished" << endl; 00922 00923 FinishedInfo info = finished.get(key); 00924 if (info.hasWin()) { 00925 if (winPrice == info.winPrice 00926 && status == info.reportedStatus) { 00927 recordHit("bidResult.%s.duplicate", typeStr); 00928 return; 00929 } 00930 else { 00931 recordHit("bidResult.%s.duplicateWithDifferentPrice", 00932 typeStr); 00933 return; 00934 } 00935 } 00936 else recordHit("bidResult.%s.auctionAlreadyFinished", 00937 typeStr); 00938 double timeGapMs = getTimeGapMs(); 00939 recordOutcome(timeGapMs, 00940 "bidResult.%s.alreadyFinishedTimeSinceBidSubmittedMs", 00941 typeStr); 00942 cerr << "WIN for already completed auction: " << meta 00943 << " timeGapMs = " << timeGapMs << endl; 00944 00945 cerr << "info win: " << info.winMeta << " time " << info.winTime 00946 << " info.hasWin() = " << info.hasWin() << endl; 00947 00948 if (event->type == PAE_WIN) { 00949 // Late win with auction still around 00950 banker->forceWinBid(account, winPrice, LineItems()); 00951 00952 info.setWin(timestamp, BS_WIN, winPrice, meta.toString()); 00953 00954 finished.update(key, info); 00955 00956 recordHit("bidResult.%s.winAfterLossAssumed", typeStr); 00957 recordOutcome(winPrice.value, 00958 "bidResult.%s.winAfterLossAssumedAmount.%s", 00959 typeStr, winPrice.getCurrencyStr()); 00960 00961 cerr << "got late win with price " << winPrice 00962 << " for account " << account << endl; 00963 } 00964 00965 /* 00966 cerr << "doWinLoss: auction " << key 00967 << " was in submitted auctions and also in finished auctions" 00968 << endl; 00969 */ 00970 return; 00971 } 00972 00973 //cerr << "doWinLoss not in finished" << endl; 00974 00975 double lossTimeout = 15.0; 00976 /* If the auction wasn't finished, then it should be submitted. The only 00977 time this won't happen is: 00978 a) when the WIN message raced and got in before we noticed the auction 00979 timeout. In that case we will find the auction in inFlight and we 00980 can store that message there. 00981 b) when we were more than an hour late, which means that the auction 00982 is completely unknown. 00983 */ 00984 #if 0 00985 cerr << fName << " number of elements in submitted " << submitted.size() << endl; 00986 for (auto it = submitted.begin() ; it != submitted.end() ;++it) 00987 cerr << it->first << endl; 00988 #endif 00989 if (!submitted.count(key)) { 00990 double timeGapMs = getTimeGapMs(); 00991 if (timeGapMs < lossTimeout * 1000) { 00992 recordHit("bidResult.%s.noBidSubmitted", typeStr); 00993 //cerr << "WIN for active auction: " << meta 00994 // << " timeGapMs = " << timeGapMs << endl; 00995 00996 /* We record the win message here and play it back once we submit 00997 the auction. 00998 */ 00999 SubmissionInfo info; 01000 info.earlyWinEvents.push_back(event); 01001 submitted.insert(key, info, Date::now().plusSeconds(lossTimeout)); 01002 01003 return; 01004 } 01005 else { 01006 cerr << "REALLY REALLY LATE WIN event='" << *event 01007 << "' timeGapMs = " << timeGapMs << endl; 01008 cerr << "message = " << meta << endl; 01009 cerr << "bidTimestamp = " << bidTimestamp.print(6) << endl; 01010 cerr << "now = " << Date::now().print(6) << endl; 01011 cerr << "account = " << account << endl; 01012 01013 recordHit("bidResult.%s.notInSubmitted", typeStr); 01014 recordOutcome(timeGapMs, 01015 "bidResult.%s.notInSubmittedTimeSinceBidSubmittedMs", 01016 typeStr); 01017 01018 banker->forceWinBid(account, winPrice, LineItems()); 01019 01020 return; 01021 } 01022 } 01023 SubmissionInfo info = submitted.pop(key); 01024 if (!info.bidRequest) { 01025 //cerr << "doWinLoss doubled bid request" << endl; 01026 01027 // We doubled up on a WIN without having got the auction yet 01028 info.earlyWinEvents.push_back(event); 01029 submitted.insert(key, info, Date::now().plusSeconds(lossTimeout)); 01030 return; 01031 } 01032 01033 recordHit("bidResult.%s.delivered", typeStr); 01034 01035 //cerr << "event.metadata = " << event->metadata << endl; 01036 //cerr << "event.winPrice = " << event->winPrice << endl; 01037 01038 doBidResult(auctionId, adSpotId, info, 01039 winPrice, timestamp, status, 01040 status == BS_WIN ? "guaranteed" : "inferred", 01041 meta.toString(), uids); 01042 std::for_each(info.earlyCampaignEvents.begin(), 01043 info.earlyCampaignEvents.end(), 01044 std::bind(&PostAuctionLoop::doCampaignEvent, this, 01045 std::placeholders::_1)); 01046 01047 //cerr << "doWinLoss done" << endl; 01048 } 01049 01050 template<typename Value> 01051 bool findAuction(PendingList<pair<Id,Id>, Value> & pending, 01052 const Id & auctionId) 01053 { 01054 auto key = make_pair(auctionId, Id()); 01055 auto key2 = pending.completePrefix(key, IsPrefixPair()); 01056 return key2.first == auctionId; 01057 } 01058 01059 template<typename Value> 01060 bool findAuction(PendingList<pair<Id,Id>, Value> & pending, 01061 const Id & auctionId, 01062 Id & adSpotId, Value & val) 01063 { 01064 auto key = make_pair(auctionId, adSpotId); 01065 if (!adSpotId) { 01066 auto key2 = pending.completePrefix(key, IsPrefixPair()); 01067 if (key2.first == auctionId) { 01068 //cerr << "found info for " << make_pair(auctionId, adSpotId) 01069 // << " under " << key << endl; 01070 adSpotId = key2.second; 01071 key = key2; 01072 } 01073 else return false; 01074 } 01075 01076 if (!pending.count(key)) return false; 01077 val = pending.get(key); 01078 01079 return true; 01080 } 01081 01082 void 01083 PostAuctionLoop:: 01084 doCampaignEvent(const std::shared_ptr<PostAuctionEvent> & event) 01085 { 01086 //RouterProfiler profiler(this, dutyCycleCurrent.nsImpression); 01087 //static const char* fName = "PostAuctionLoop::doCampaignEvent:"; 01088 const string & label = event->label; 01089 const Id & auctionId = event->auctionId; 01090 Id adSpotId = event->adSpotId; 01091 Date timestamp = event->timestamp; 01092 const JsonHolder & meta = event->metadata; 01093 const UserIds & uids = event->uids; 01094 01095 SubmissionInfo submissionInfo; 01096 FinishedInfo finishedInfo; 01097 01098 if (event->type != PAE_CAMPAIGN_EVENT) { 01099 throw ML::Exception("event type must be PAE_CAMPAIGN_EVENT: " 01100 + string(print(event->type))); 01101 } 01102 01103 lastCampaignEvent = Date::now(); 01104 01105 recordHit("delivery.EVENT.%s.messagesReceived", label); 01106 01107 //cerr << fName << typeStr << " " << auctionId << "-" << adSpotId << endl; 01108 //cerr <<"The number of elements in submitted " << submitted.size() << endl; 01109 auto recordUnmatched = [&] (const std::string & why) 01110 { 01111 this->logMessage(string("UNMATCHED") + label, why, 01112 auctionId.toString(), adSpotId.toString(), 01113 to_string(timestamp.secondsSinceEpoch()), 01114 meta); 01115 }; 01116 01117 if (findAuction(submitted, auctionId, adSpotId, submissionInfo)) { 01118 // Record the impression or click in the submission info. This will 01119 // then be passed on once the win comes in. 01120 // 01121 // TODO: for now we just ignore the event; we should eventually 01122 // implement what is written above 01123 //cerr << "auction " << auctionId << "-" << adSpotId 01124 // << " in flight but got " << type << endl; 01125 recordHit("delivery.%s.stillInFlight", label); 01126 logPAError(string("doCampaignEvent.auctionNotWon") + label, 01127 "message for auction that's not won"); 01128 recordUnmatched("inFlight"); 01129 01130 submissionInfo.earlyCampaignEvents.push_back(event); 01131 01132 submitted.update(make_pair(auctionId, adSpotId), submissionInfo); 01133 return; 01134 } 01135 else if (findAuction(finished, auctionId, adSpotId, finishedInfo)) { 01136 // Update the info 01137 if (finishedInfo.campaignEvents.hasEvent(label)) { 01138 recordHit("delivery.%s.duplicate", label); 01139 logPAError(string("doCampaignEvent.duplicate") 01140 + label, "message duplicated"); 01141 recordUnmatched("duplicate"); 01142 return; 01143 } 01144 01145 finishedInfo.campaignEvents.setEvent(label, timestamp, meta); 01146 ML::atomic_inc(numCampaignEvents); 01147 01148 recordHit("delivery.%s.account.%s.matched", 01149 label, 01150 finishedInfo.bid.account.toString().c_str()); 01151 01152 pair<Id, Id> key(auctionId, adSpotId); 01153 //cerr << "key = " << key << endl; 01154 if (!key.second) 01155 throw ML::Exception("updating null entry in finished map"); 01156 01157 // Add in the user IDs to the index so we can route any visits 01158 // properly 01159 finishedInfo.addUids(uids); 01160 01161 finished.update(key, finishedInfo); 01162 01163 routePostAuctionEvent(label, finishedInfo, 01164 SegmentList(), false /* filterChannels */); 01165 } 01166 else { 01167 /* We get here if we got an IMPRESSION or a CLICK before we got 01168 notification that an auction had been submitted. 01169 01170 Normally this should happen rarely. However, in some cases 01171 (for example a transient failure in the router to post auction 01172 loop link which is rectified and allows buffered messages to 01173 be replayed) we may still want to match things up. 01174 01175 What we should do here is to keep these messages around in a 01176 buffer (like the early win messages) and replay them when the 01177 auction event comes in. 01178 */ 01179 01180 recordHit("delivery.%s.auctionNotFound", label); 01181 //cerr << "delivery " << typeStr << ": auction " 01182 // << auctionId << "-" << adSpotId << " not found" 01183 // << endl; 01184 logPAError(string("doCampaignEvent.auctionNotFound") + label, 01185 "auction not found for delivery message"); 01186 recordUnmatched("auctionNotFound"); 01187 } 01188 } 01189 01190 bool 01191 PostAuctionLoop:: 01192 routePostAuctionEvent(const string & label, 01193 const FinishedInfo & finishedInfo, 01194 const SegmentList & channels, 01195 bool filterChannels) 01196 { 01197 // For the moment, send the message to all of the agents that are 01198 // bidding on this account 01199 const AccountKey & account = finishedInfo.bid.account; 01200 01201 bool sent = false; 01202 auto onMatchingAgent = [&] (const AgentConfigEntry & entry) 01203 { 01204 if (!entry.config) return; 01205 if (filterChannels) { 01206 if (!entry.config->visitChannels.match(channels)) 01207 return; 01208 } 01209 01210 sent = true; 01211 01212 this->sendAgentMessage(entry.name, 01213 label, 01214 Date::now(), 01215 finishedInfo.auctionId, 01216 finishedInfo.adSpotId, 01217 to_string(finishedInfo.spotIndex), 01218 finishedInfo.bidRequestStrFormat, 01219 finishedInfo.bidRequestStr, 01220 finishedInfo.augmentations, 01221 finishedInfo.bidToJson(), 01222 finishedInfo.winToJson(), 01223 finishedInfo.campaignEvents.toJson(), 01224 finishedInfo.visitsToJson()); 01225 }; 01226 01227 configListener.forEachAccountAgent(account, onMatchingAgent); 01228 01229 if (!sent) { 01230 recordHit("delivery.%s.orphaned", label); 01231 logPAError(string("doCampaignEvent.noListeners") + label, 01232 "nothing listening for account " + account.toString()); 01233 } 01234 else recordHit("delivery.%s.delivered", label); 01235 01236 // TODO: full account 01237 this->logMessage 01238 (string("MATCHED") + label, 01239 finishedInfo.auctionId, 01240 finishedInfo.adSpotId, 01241 finishedInfo.bidRequestStr, 01242 finishedInfo.bidToJson(), 01243 finishedInfo.winToJson(), 01244 finishedInfo.campaignEvents.toJson(), 01245 finishedInfo.visitsToJson(), 01246 finishedInfo.bid.account[0], 01247 finishedInfo.bid.account[1], 01248 finishedInfo.bid.account.toString(), 01249 finishedInfo.bidRequestStrFormat); 01250 01251 return sent; 01252 } 01253 01254 void 01255 PostAuctionLoop:: 01256 doBidResult(const Id & auctionId, 01257 const Id & adSpotId, 01258 const SubmissionInfo & submission, 01259 Amount winPrice, 01260 Date timestamp, 01261 BidStatus status, 01262 const std::string & confidence, 01263 const std::string & winLossMeta, 01264 const UserIds & uids) 01265 { 01266 //RouterProfiler profiler(this, dutyCycleCurrent.nsBidResult); 01267 //static const char *fName = "PostAuctionLoop::doBidResult:"; 01268 string msg; 01269 01270 if (status == BS_WIN) msg = "WIN"; 01271 else if (status == BS_LOSS) msg = "LOSS"; 01272 else throwException("doBidResult.nonWinLoss", "submitted non win/loss"); 01273 01274 #if 0 01275 cerr << "doBidResult: " << msg 01276 << " id " << auctionId << " spot " << adSpotId 01277 << " submission " << submission.bid.agent << " " 01278 << submission.bid.price.maxPrice 01279 << " winPrice " << winPrice << "bid request string format<" 01280 << submission.bidRequestStrFormat << ">" << endl; 01281 #endif 01282 01283 // debugSpot(auctionId, adSpotId, msg); 01284 01285 if (!adSpotId) 01286 throw ML::Exception("inserting null entry in finished map"); 01287 01288 string agent = submission.bid.agent; 01289 01290 // Find the adspot ID 01291 int adspot_num = submission.bidRequest->findAdSpotIndex(adSpotId); 01292 01293 if (adspot_num == -1) { 01294 logPAError("doBidResult.adSpotIdNotFound", 01295 "adspot ID ", adSpotId, " not found in auction ", 01296 submission.bidRequestStr); 01297 } 01298 01299 const Auction::Response & response = submission.bid; 01300 01301 const AccountKey & account = response.account; 01302 if (account.size() == 0) { 01303 throw ML::Exception("invalid account key"); 01304 } 01305 01306 #if 0 01307 if (doDebug) 01308 debugSpot(auctionId, adSpotId, "ACCOUNT " + account.toString()); 01309 #endif 01310 01311 Amount bidPrice = response.price.maxPrice; 01312 01313 #if 0 01314 cerr << fName << "bidPriceMicros = " << bidPriceMicros 01315 << " winPriceMicros = " << winPriceMicros 01316 << endl; 01317 #endif 01318 01319 //cerr << "account = " << account << endl; 01320 01321 if (winPrice > bidPrice) { 01322 //cerr << submission.bidRequestStr << endl; 01323 logPAError("doBidResult.winPriceExceedsBidPrice", 01324 ML::format("win price %s exceeds bid price %s", 01325 winPrice.toString(), 01326 bidPrice.toString())); 01327 } 01328 01329 // Make sure we account for the bid no matter what 01330 ML::Call_Guard guard 01331 ([&] () 01332 { 01333 banker->cancelBid(account, makeBidId(auctionId, adSpotId, agent)); 01334 }); 01335 01336 // No bid 01337 if (bidPrice == Amount() && response.price.priority == 0) { 01338 throwException("doBidResult.responseadNoBidPrice", 01339 "bid response had no bid price"); 01340 } 01341 01342 if (status == BS_WIN) { 01343 //info.stats->totalSpent += winPrice; 01344 //info.stats->totalBidOnWins += bidPrice; 01345 01346 // This is a real win 01347 guard.clear(); 01348 banker->winBid(account, makeBidId(auctionId, adSpotId, agent), winPrice, 01349 LineItems()); 01350 01351 //++info.stats->wins; 01352 // local win; send it back 01353 01354 //cerr << "MATCHEDWIN " << auctionId << "-" << adSpotId << endl; 01355 } 01356 else if (status == BS_LOSS) { 01357 //++info.stats->losses; 01358 // local loss; send it back 01359 } 01360 else throwException("doBidResult.nonWinLoss", "submitted non win/loss"); 01361 logMessage("MATCHED" + msg, 01362 auctionId, 01363 to_string(adspot_num), 01364 response.agent, 01365 account[1], 01366 winPrice.toString(), 01367 response.price.maxPrice.toString(), 01368 to_string(response.price.priority), 01369 submission.bidRequestStr, 01370 response.bidData, 01371 response.meta, 01372 to_string(response.creativeId), 01373 response.creativeName, 01374 account[0], 01375 uids.toJsonStr(), 01376 winLossMeta, 01377 account[0], 01378 adSpotId, 01379 account.toString(), 01380 submission.bidRequestStrFormat); 01381 01382 sendAgentMessage(response.agent, msg, timestamp, 01383 confidence, auctionId, 01384 to_string(adspot_num), 01385 winPrice.toString(), 01386 submission.bidRequestStrFormat, 01387 submission.bidRequestStr, 01388 response.bidData, 01389 response.meta, 01390 submission.augmentations); 01391 01392 // Finally, place it in the finished queue 01393 FinishedInfo i; 01394 i.auctionId = auctionId; 01395 i.adSpotId = adSpotId; 01396 i.spotIndex = adspot_num; 01397 i.bidRequest = submission.bidRequest; 01398 i.bidRequestStr = submission.bidRequestStr; 01399 i.bidRequestStrFormat = submission.bidRequestStrFormat ; 01400 i.bid = response; 01401 i.reportedStatus = status; 01402 //i.auctionTime = auction.start; 01403 i.setWin(timestamp, status, winPrice, winLossMeta); 01404 01405 // Copy the configuration into the finished info so that we can 01406 // know which visits to route back 01407 i.visitChannels = response.visitChannels; 01408 01409 i.addUids(uids); 01410 01411 double expiryInterval = 3600; 01412 if (status == BS_LOSS) 01413 expiryInterval = 900; 01414 01415 Date expiryTime = Date::now().plusSeconds(expiryInterval); 01416 01417 finished.insert(make_pair(auctionId, adSpotId), i, expiryTime); 01418 } 01419 01420 void 01421 PostAuctionLoop:: 01422 injectSubmittedAuction(const Id & auctionId, 01423 const Id & adSpotId, 01424 std::shared_ptr<BidRequest> bidRequest, 01425 const std::string & bidRequestStr, 01426 const std::string & bidRequestStrFormat, 01427 const JsonHolder & augmentations, 01428 const Auction::Response & bidResponse, 01429 Date lossTimeout) 01430 { 01431 if (bidRequestStr.size() == 0) { 01432 throw ML::Exception("invalid bidRequestStr"); 01433 } 01434 if (bidRequestStrFormat.size() == 0) { 01435 throw ML::Exception("invalid bidRequestStrFormat"); 01436 } 01437 01438 SubmittedAuctionEvent event; 01439 event.auctionId = auctionId; 01440 event.adSpotId = adSpotId; 01441 event.bidRequest = bidRequest; 01442 event.bidRequestStr = bidRequestStr; 01443 event.bidRequestStrFormat = bidRequestStrFormat; 01444 event.augmentations = augmentations; 01445 event.bidResponse = bidResponse; 01446 event.lossTimeout = lossTimeout; 01447 01448 auctions.push(event); 01449 } 01450 01451 void 01452 PostAuctionLoop:: 01453 notifyFinishedSpot(const Id & auctionId, const Id & adSpotId) 01454 { 01455 throw ML::Exception("notifyFinishedSpot(): not implemented"); 01456 } 01457 01458 std::string 01459 PostAuctionLoop:: 01460 makeBidId(Id auctionId, Id spotId, const std::string & agent) 01461 { 01462 return auctionId.toString() + "-" + spotId.toString() + "-" + agent; 01463 } 01464 01465 std::string 01466 PostAuctionLoop:: 01467 getProviderName() 01468 const 01469 { 01470 return serviceName(); 01471 } 01472 01473 Json::Value 01474 PostAuctionLoop:: 01475 getProviderIndicators() 01476 const 01477 { 01478 Json::Value value; 01479 01480 /* PA health check: 01481 - last campaign event in the last 10 seconds */ 01482 Date now = Date::now(); 01483 bool status(now < lastWinLoss.plusSeconds(10) 01484 && now < lastCampaignEvent.plusSeconds(10)); 01485 01486 #if 0 01487 if (!status) { 01488 cerr << "--- WRONGNESS DETECTED:" 01489 << " last event: " << (now - lastCampaignEvent) 01490 << endl; 01491 } 01492 #endif 01493 01494 value["status"] = status ? "ok" : "failure"; 01495 01496 return value; 01497 } 01498 01499 } // namespace RTBKIT