RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/post_auction/post_auction_loop.cc
00001 /* post_auction_loop.h                                             -*- C++ -*-
00002    Jeremy Barnes, 31 May 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Loop for post-auction processing.
00006 */
00007 
00008 #include <string>
00009 #include <sstream>
00010 #include <iostream>
00011 #include "rtbkit/core/agent_configuration/agent_config.h"
00012 #include "jml/utils/pair_utils.h"
00013 #include "jml/arch/futex.h"
00014 #include "jml/db/persistent.h"
00015 #include "rtbkit/core/banker/banker.h"
00016 #include "rtbkit/common/messages.h"
00017 #include "rtbkit/common/auction_events.h"
00018 
00019 #include "post_auction_loop.h"
00020 
00021 using namespace std;
00022 using namespace ML;
00023 
00024 
00025 namespace RTBKIT {
00026 
00027 
00028 /*****************************************************************************/
00029 /* SUBMISSION INFO                                                           */
00030 /*****************************************************************************/
00031 
00032 std::string
00033 SubmissionInfo::
00034 serializeToString() const
00035 {
00036     ostringstream stream;
00037     ML::DB::Store_Writer writer(stream);
00038     int version = 5;
00039     writer << version
00040            << bidRequestStr
00041            << bidRequestStrFormat
00042            << augmentations.toString()
00043            << earlyWinEvents
00044            << earlyCampaignEvents;
00045     bid.serialize(writer);
00046     return stream.str();
00047 }
00048 
00049 void
00050 SubmissionInfo::
00051 reconstituteFromString(const std::string & str)
00052 {
00053     istringstream stream(str);
00054     ML::DB::Store_Reader store(stream);
00055     int version;
00056     store >> version;
00057     if (version < 1 || version > 5)
00058         throw ML::Exception("bad version %d", version);
00059     store >> bidRequestStr;
00060     if (version == 5)
00061     {
00062         store >> bidRequestStrFormat ;
00063     }
00064     if (version > 1) {
00065         string s;
00066         store >> s;
00067         augmentations = s;
00068     }
00069     else augmentations.clear();
00070     if (version == 3) {
00071         vector<vector<string> > msg1, msg2;
00072         store >> msg1 >> msg2;
00073         if (!msg1.empty() || !msg2.empty())
00074             cerr << "warning: discarding early events from old format"
00075                  << endl;
00076         earlyWinEvents.clear();
00077         earlyCampaignEvents.clear();
00078     }
00079     else if (version > 3) {
00080         store >> earlyWinEvents >> earlyCampaignEvents;
00081     }
00082     else {
00083         earlyWinEvents.clear();
00084         earlyCampaignEvents.clear();
00085     }
00086     bid.reconstitute(store);
00087 
00088     if (bidRequestStr != "")
00089         bidRequest.reset(BidRequest::parse(bidRequestStrFormat, bidRequestStr));
00090     else bidRequest.reset();
00091 }
00092 
00093 
00094 /*****************************************************************************/
00095 /* FINISHED INFO                                                             */
00096 /*****************************************************************************/
00097 
00098 Json::Value
00099 FinishedInfo::
00100 bidToJson() const
00101 {
00102     Json::Value result = bid.toJson();
00103     result["timestamp"] = bidTime.secondsSinceEpoch();
00104     return result;
00105 }
00106 
00107 Json::Value
00108 FinishedInfo::
00109 winToJson() const
00110 {
00111     Json::Value result;
00112     if (!hasWin()) return result;
00113 
00114     result["timestamp"] = winTime.secondsSinceEpoch();
00115     result["reportedStatus"] = (reportedStatus == BS_WIN ? "WIN" : "LOSS");
00116     result["winPrice"] = winPrice.toJson();
00117     result["meta"] = winMeta;
00118 
00119     return result;
00120 }
00121 
00122 void
00123 FinishedInfo::
00124 addVisit(Date visitTime,
00125          const std::string & visitMeta,
00126          const SegmentList & channels)
00127 {
00128     Visit visit;
00129     visit.visitTime = visitTime;
00130     visit.channels = channels;
00131     visit.meta = visitMeta;
00132     visits.push_back(visit);
00133 }
00134 
00135 Json::Value
00136 FinishedInfo::
00137 visitsToJson() const
00138 {
00139     Json::Value result;
00140     for (unsigned i = 0;  i < visits.size();  ++i) {
00141         Json::Value & v = result[i];
00142         const Visit & visit = visits[i];
00143         v["timestamp"] = visit.visitTime.secondsSinceEpoch();
00144         v["meta"] = visit.meta;
00145         v["channels"] = visit.channels.toJson();
00146     }
00147     return result;
00148 }
00149 
00150 Json::Value
00151 FinishedInfo::
00152 toJson() const
00153 {
00154     throw ML::Exception("FinishedInfo::toJson()");
00155     Json::Value result;
00156     return result;
00157 }
00158 
00159 void
00160 FinishedInfo::Visit::
00161 serialize(DB::Store_Writer & store) const
00162 {
00163     unsigned char version = 1;
00164     store << version << visitTime << channels << meta;
00165 }
00166 
00167 void
00168 FinishedInfo::Visit::
00169 reconstitute(DB::Store_Reader & store)
00170 {
00171     unsigned char version;
00172     store >> version;
00173     if (version != 1)
00174         throw ML::Exception("invalid version");
00175     store >> visitTime >> channels >> meta;
00176 }
00177 
00178 IMPL_SERIALIZE_RECONSTITUTE(FinishedInfo::Visit);
00179 
00180 std::string
00181 FinishedInfo::
00182 serializeToString() const
00183 {
00184     ostringstream stream;
00185     ML::DB::Store_Writer writer(stream);
00186     int version = 6;
00187     writer << version
00188            << auctionTime << auctionId << adSpotId
00189            << bidRequestStr << bidTime <<bidRequestStrFormat;
00190     bid.serialize(writer);
00191     writer << winTime
00192            << reportedStatus << winPrice << winMeta;
00193     writer << campaignEvents;
00194     writer << fromOldRouter
00195            << augmentations.toString();
00196     writer << visitChannels << uids << visits;
00197 
00198     return stream.str();
00199 }
00200 
00201 void
00202 FinishedInfo::
00203 reconstituteFromString(const std::string & str)
00204 {
00205     istringstream stream(str);
00206     ML::DB::Store_Reader store(stream);
00207     int version, istatus;
00208     store >> version;
00209     if (version > 6)
00210         throw ML::Exception("bad version %d", version);
00211     if (version < 6)
00212         throw ML::Exception("version %d no longer supported", version);
00213 
00214     string auctionIdStr, adSpotIdStr;
00215 
00216     store >> auctionTime >> auctionId >> adSpotId
00217           >> bidRequestStr >> bidTime;
00218     bid.reconstitute(store);
00219 
00220     store >> winTime >> istatus >> winPrice >> winMeta;
00221     store >> campaignEvents;
00222     store >> fromOldRouter;
00223 
00224     if (version > 1) {
00225         string s;
00226         store >> s;
00227         augmentations = s;
00228     }
00229     else augmentations.clear();
00230 
00231     if (version > 2) {
00232         store >> visitChannels >> uids >> visits;
00233     }
00234 
00235     reportedStatus = (BidStatus)istatus;
00236 
00237     bidRequest.reset(BidRequest::parse(bidRequestStrFormat, bidRequestStr));
00238 }
00239 
00240 
00241 /*****************************************************************************/
00242 /* POST AUCTION LOOP                                                         */
00243 /*****************************************************************************/
00244 
00245 PostAuctionLoop::
00246 PostAuctionLoop(std::shared_ptr<ServiceProxies> proxies,
00247                 const std::string & serviceName)
00248     : ServiceBase(serviceName, proxies),
00249       logger(getZmqContext()),
00250       monitorProviderClient(getZmqContext(), *this),
00251       auctions(65536),
00252       events(65536),
00253       endpoint(getZmqContext()),
00254       router(!!getZmqContext()),
00255       toAgents(getZmqContext()),
00256       configListener(getZmqContext())
00257 {
00258 }
00259 
00260 PostAuctionLoop::
00261 PostAuctionLoop(ServiceBase & parent,
00262                 const std::string & serviceName)
00263     : ServiceBase(serviceName, parent),
00264       logger(getZmqContext()),
00265       monitorProviderClient(getZmqContext(), *this),
00266       auctions(65536),
00267       events(65536),
00268       endpoint(getZmqContext()),
00269       router(!!getZmqContext()),
00270       toAgents(getZmqContext()),
00271       configListener(getZmqContext())
00272 {
00273 }
00274 
00275 void
00276 PostAuctionLoop::
00277 init()
00278 {
00279     initConnections();
00280     monitorProviderClient.init(getServices()->config);
00281 }
00282 
00283 void
00284 PostAuctionLoop::
00285 initConnections()
00286 {
00287     registerServiceProvider(serviceName(), { "rtbPostAuctionService" });
00288 
00289     cerr << "post auction logger on " << serviceName() + "/logger" << endl;
00290     logger.init(getServices()->config, serviceName() + "/logger");
00291 
00292     auctions.onEvent = std::bind<void>(&PostAuctionLoop::doAuction, this,
00293                                        std::placeholders::_1);
00294     events.onEvent   = std::bind<void>(&PostAuctionLoop::doEvent, this,
00295                                        std::placeholders::_1);
00296     toAgents.clientMessageHandler = [&] (const std::vector<std::string> & msg)
00297         {
00298             // Clients should never send the post auction service anything,
00299             // but we catch it here just in case
00300             cerr << "PostAuctionLoop got agent message " << msg << endl;
00301         };
00302 
00303     router.bind("AUCTION",
00304                 std::bind(&PostAuctionLoop::doAuctionMessage, this,
00305                           std::placeholders::_1));
00306     router.bind("WIN",
00307                 std::bind(&PostAuctionLoop::doWinMessage, this,
00308                           std::placeholders::_1));
00309     router.bind("LOSS",
00310                 std::bind(&PostAuctionLoop::doLossMessage, this,
00311                           std::placeholders::_1));
00312     router.bind("EVENT",
00313                 std::bind(&PostAuctionLoop::doCampaignEventMessage, this,
00314                           std::placeholders::_1));
00315 
00316     // Every second we check for expired auctions
00317     loop.addPeriodic("PostAuctionLoop::checkExpiredAuctions", 1.0,
00318                      std::bind<void>(&PostAuctionLoop::checkExpiredAuctions,
00319                                      this));
00320 
00321     // Initialize zeromq endpoints
00322     endpoint.init(getServices()->config, ZMQ_XREP, serviceName() + "/events");
00323     toAgents.init(getServices()->config, serviceName() + "/agents");
00324     configListener.init(getServices()->config);
00325     endpoint.messageHandler
00326         = std::bind(&ZmqMessageRouter::handleMessage,
00327                     &router,
00328                     std::placeholders::_1);
00329 
00330     loop.addSource("PostAuctionLoop::auctions", auctions);
00331     loop.addSource("PostAuctionLoop::events", events);
00332 
00333     loop.addSource("PostAuctionLoop::endpoint", endpoint);
00334 
00335     loop.addSource("PostAuctionLoop::toAgents", toAgents);
00336     loop.addSource("PostAuctionLoop::configListener", configListener);
00337     loop.addSource("PostAuctionLoop::logger", logger);
00338 }
00339 
00340 void
00341 PostAuctionLoop::
00342 bindTcp()
00343 {
00344     logger.bindTcp(getServices()->ports->getRange("logs"));
00345     endpoint.bindTcp(getServices()->ports->getRange("postAuctionLoop"));
00346     toAgents.bindTcp(getServices()->ports->getRange("postAuctionLoopAgents"));
00347 }
00348 
00349 void
00350 PostAuctionLoop::
00351 start(std::function<void ()> onStop)
00352 {
00353     loop.start(onStop);
00354     monitorProviderClient.start();
00355 }
00356 
00357 void
00358 PostAuctionLoop::
00359 shutdown()
00360 {
00361     loop.shutdown();
00362     logger.shutdown();
00363     toAgents.shutdown();
00364     endpoint.shutdown();
00365     configListener.shutdown();
00366     monitorProviderClient.shutdown();
00367 }
00368 
00369 Json::Value
00370 PostAuctionLoop::
00371 getServiceStatus() const
00372 {
00373     return Json::Value();
00374 }
00375 
00376 void
00377 PostAuctionLoop::
00378 throwException(const std::string & key, const std::string & fmt, ...)
00379 {
00380     recordHit("error.exception");
00381     recordHit("error.exception.%s", key);
00382 
00383     string message;
00384     va_list ap;
00385     va_start(ap, fmt);
00386     try {
00387         message = vformat(fmt.c_str(), ap);
00388         va_end(ap);
00389     }
00390     catch (...) {
00391         va_end(ap);
00392         throw;
00393     }
00394 
00395     logPAError("exception", key, message);
00396     throw ML::Exception("Router Exception: " + key + ": " + message);
00397 }
00398 
00399 void
00400 PostAuctionLoop::
00401 injectWin(const Id & auctionId,
00402           const Id & adSpotId,
00403           Amount winPrice,
00404           Date timestamp,
00405           const JsonHolder & winMeta,
00406           const UserIds & uids,
00407           const AccountKey & account,
00408           Date bidTimestamp)
00409 {
00410     auto event = std::make_shared<PostAuctionEvent>();
00411     event->type = PAE_WIN;
00412     event->auctionId = auctionId;
00413     event->adSpotId = adSpotId;
00414     event->timestamp = timestamp;
00415     event->winPrice = winPrice;
00416     event->metadata = winMeta;
00417     event->uids = uids;
00418     event->account = account;
00419     event->bidTimestamp = bidTimestamp;
00420 
00421     events.push(event);
00422 }
00423 
00424 void
00425 PostAuctionLoop::
00426 injectLoss(const Id & auctionId,
00427            const Id & adSpotId,
00428            Date timestamp,
00429            const JsonHolder & json,
00430            const AccountKey & account,
00431            Date bidTimestamp)
00432 {
00433     //cerr << "injecting loss for " << auctionId << endl;
00434 
00435     if (timestamp == Date())
00436         timestamp = Date::now();
00437 
00438     auto event = std::make_shared<PostAuctionEvent>();
00439     event->type = PAE_LOSS;
00440     event->auctionId = auctionId;
00441     event->adSpotId = adSpotId;
00442     event->timestamp = timestamp;
00443     event->winPrice = Amount();
00444     event->account = account;
00445     event->bidTimestamp = bidTimestamp;
00446 
00447     events.push(event);
00448 }
00449 
00450 void
00451 PostAuctionLoop::
00452 injectCampaignEvent(const string & label,
00453                     const Id & auctionId,
00454                     const Id & adSpotId,
00455                     Date timestamp,
00456                     const JsonHolder & impressionMeta,
00457                     const UserIds & uids)
00458 {
00459     auto event = std::make_shared<PostAuctionEvent>();
00460     event->type = PAE_CAMPAIGN_EVENT;
00461     event->label = label;
00462     event->auctionId = auctionId;
00463     event->adSpotId = adSpotId;
00464     event->timestamp = timestamp;
00465     event->metadata = impressionMeta;
00466     event->uids = uids;
00467 
00468     events.push(event);
00469 }
00470 
00471 
00472 void
00473 PostAuctionLoop::
00474 doAuctionMessage(const std::vector<std::string> & message)
00475 {
00476     recordHit("messages.AUCTION");
00477     //cerr << "doAuctionMessage " << message << endl;
00478 
00479     SubmittedAuctionEvent event
00480         = ML::DB::reconstituteFromString<SubmittedAuctionEvent>(message.at(2));
00481     doAuction(event);
00482 }
00483 
00484 void
00485 PostAuctionLoop::
00486 doWinMessage(const std::vector<std::string> & message)
00487 {
00488     recordHit("messages.WIN");
00489     auto event = std::make_shared<PostAuctionEvent>
00490         (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2)));
00491     doWinLoss(event, false /* replay */);
00492 }
00493 
00494 void
00495 PostAuctionLoop::
00496 doLossMessage(const std::vector<std::string> & message)
00497 {
00498     recordHit("messages.LOSS");
00499     auto event = std::make_shared<PostAuctionEvent>
00500         (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2)));
00501     doWinLoss(event, false /* replay */);
00502 }
00503 
00504 void
00505 PostAuctionLoop::
00506 doCampaignEventMessage(const std::vector<std::string> & message)
00507 {
00508     auto event = std::make_shared<PostAuctionEvent>
00509         (ML::DB::reconstituteFromString<PostAuctionEvent>(message.at(2)));
00510     recordHit("messages.EVENT." + event->label);
00511     doCampaignEvent(event);
00512 }
00513 
00514 namespace {
00515 
00516 std::pair<Id, Id>
00517 unstringifyPair(const std::string & str)
00518 {
00519     istringstream stream(str);
00520     DB::Store_Reader store(stream);
00521     pair<Id, Id> result;
00522     store >> result.first >> result.second;
00523     return result;
00524 }
00525 
00526 std::string stringifyPair(const std::pair<Id, Id> & vals)
00527 {
00528     if (!vals.second || vals.second.type == Id::NULLID)
00529         throw ML::Exception("attempt to store null ID");
00530 
00531     ostringstream stream;
00532     {
00533         DB::Store_Writer store(stream);
00534         store << vals.first << vals.second;
00535     }
00536 
00537     return stream.str();
00538 }
00539 
00540 } // file scope
00541 
00542 void
00543 PostAuctionLoop::
00544 initStatePersistence(const std::string & path)
00545 {
00546     typedef PendingPersistenceT<pair<Id, Id>, SubmissionInfo>
00547         SubmittedPending;
00548 
00549     auto submittedDb = std::make_shared<LeveldbPendingPersistence>();
00550     submittedDb->open(path + "/submitted");
00551 
00552     auto submittedPersistence
00553         = std::make_shared<SubmittedPending>();
00554     submittedPersistence->store = submittedDb;
00555 
00556     auto stringifySubmissionInfo = [] (const SubmissionInfo & info)
00557         {
00558             return info.serializeToString();
00559         };
00560 
00561     auto unstringifySubmissionInfo = [] (const std::string & str)
00562         {
00563             SubmissionInfo info;
00564             info.reconstituteFromString(str);
00565             return info;
00566         };
00567 
00568     submittedPersistence->stringifyKey = stringifyPair;
00569     submittedPersistence->unstringifyKey = unstringifyPair;
00570     submittedPersistence->stringifyValue = stringifySubmissionInfo;
00571     submittedPersistence->unstringifyValue = unstringifySubmissionInfo;
00572 
00573     Date newTimeout = Date::now().plusSeconds(15);
00574 
00575     auto acceptSubmitted = [&] (pair<Id, Id> & key,
00576                                 SubmissionInfo & info,
00577                                 Date & timeout) -> bool
00578         {
00579             info.fromOldRouter = true;
00580             newTimeout.addSeconds(0.001);
00581             timeout = newTimeout;
00582             // this->debugSpot(key.first, key.second, "RECONST SUBMITTED");
00583             return true;
00584         };
00585 
00586     submitted.initFromStore(submittedPersistence,
00587                             acceptSubmitted,
00588                             Date::now().plusSeconds(15));
00589 
00590     typedef PendingPersistenceT<pair<Id, Id>, FinishedInfo>
00591         FinishedPending;
00592 
00593     auto finishedDb = std::make_shared<LeveldbPendingPersistence>();
00594     finishedDb->open(path + "/finished");
00595 
00596     auto finishedPersistence
00597         = std::make_shared<FinishedPending>();
00598     finishedPersistence->store = finishedDb;
00599 
00600     auto stringifyFinishedInfo = [] (const FinishedInfo & info)
00601         {
00602             return info.serializeToString();
00603         };
00604 
00605     auto unstringifyFinishedInfo = [] (const std::string & str)
00606         {
00607             FinishedInfo info;
00608             info.reconstituteFromString(str);
00609             return info;
00610         };
00611 
00612     finishedPersistence->stringifyKey = stringifyPair;
00613     finishedPersistence->unstringifyKey = unstringifyPair;
00614     finishedPersistence->stringifyValue = stringifyFinishedInfo;
00615     finishedPersistence->unstringifyValue = unstringifyFinishedInfo;
00616 
00617     newTimeout = Date::now().plusSeconds(900);
00618 
00619     auto acceptFinished = [&] (pair<Id, Id> & key,
00620                                FinishedInfo & info,
00621                                Date & timeout) -> bool
00622         {
00623             info.fromOldRouter = true;
00624             newTimeout.addSeconds(0.001);
00625             timeout = newTimeout;
00626             // this->debugSpot(key.first, key.second, "RECONST FINISHED");
00627 
00628             return true;
00629         };
00630 
00631     finished.initFromStore(finishedPersistence,
00632                            acceptFinished,
00633                            Date::now().plusSeconds(900));
00634 
00635     auto backgroundWork = [=] (volatile int & shutdown, int64_t threadId)
00636         {
00637             while (!shutdown) {
00638                 futex_wait(const_cast<int &>(shutdown), 0, 600.0);
00639                 if (shutdown) break;
00640                 //continue;
00641 
00642                 {
00643                     Date start = Date::now();
00644                     submittedDb->compact();
00645                     Date end = Date::now();
00646                     this->recordEvent("persistentData.submitted.compactTimeMs",
00647                                   ET_OUTCOME,
00648                                   1000.0 * (end.secondsSince(start)));
00649                     uint64_t size = submittedDb->getDbSize();
00650                     //cerr << "submitted db is " << size / 1024.0 / 1024.0
00651                     //     << "MB" << endl;
00652                     this->recordEvent("persistentData.submitted.dbSizeMb",
00653                                   ET_LEVEL, size / 1024.0 / 1024.0);
00654                 }
00655 
00656                 {
00657                     Date start = Date::now();
00658                     finishedDb->compact();
00659                     Date end = Date::now();
00660                     this->recordEvent("persistentData.finished.compactTimeMs",
00661                                   ET_OUTCOME,
00662                                   1000.0 * (end.secondsSince(start)));
00663                     uint64_t size = finishedDb->getDbSize();
00664                     //cerr << "finished db is " << size / 1024.0 / 1024.0
00665                     //     << "MB" << endl;
00666                     this->recordEvent("persistentData.finished.dbSizeMb",
00667                                   ET_LEVEL, size / 1024.0 / 1024.0);
00668                 }
00669             }
00670 
00671             cerr << "exiting background work thread" << endl;
00672         };
00673 
00674     loop.startSubordinateThread(backgroundWork);
00675 }
00676 
00677 
00678 void
00679 PostAuctionLoop::
00680 checkExpiredAuctions()
00681 {
00682     Date start = Date::now();
00683 
00684     {
00685         cerr << " checking " << submitted.size()
00686              << " submitted auctions for inferred loss" << endl;
00687 
00688 
00689         //RouterProfiler profiler(this, dutyCycleCurrent.nsExpireSubmitted);
00690 
00691 
00692         auto onExpiredSubmitted = [&] (const pair<Id, Id> & key,
00693                                        const SubmissionInfo & info)
00694             {
00695                 //RouterProfiler profiler(this, dutyCycleCurrent.nsOnExpireSubmitted);
00696 
00697                 const Id & auctionId = key.first;
00698                 const Id & adSpotId = key.second;
00699 
00700                 recordHit("submittedAuctionExpiry");
00701 
00702                 if (!info.bidRequest) {
00703                     recordHit("submittedAuctionExpiryWithoutBid");
00704                     //cerr << "expired with no bid request" << endl;
00705                     // this->debugSpot(auctionId, adSpotId, "EXPIRED SPOT NO BR", {});
00706 
00707                     // this->dumpSpot(auctionId, adSpotId);
00708                     return Date();
00709                 }
00710 
00711                 // this->debugSpot(auctionId, adSpotId, "EXPIRED SPOT", {});
00712 
00713                 //cerr << "onExpiredSubmitted " << key << endl;
00714                 try {
00715                     this->doBidResult(auctionId, adSpotId, info, Amount() /* price */,
00716                                       start /* date */, BS_LOSS, "inferred",
00717                                       "null", UserIds());
00718                 } catch (const std::exception & exc) {
00719                     cerr << "error handling expired loss auction: " << exc.what()
00720                     << endl;
00721                     this->logPAError("checkExpiredAuctions.loss", exc.what());
00722                 }
00723 
00724                 return Date();
00725             };
00726 
00727         submitted.expire(onExpiredSubmitted, start);
00728     }
00729 
00730     {
00731         cerr << " checking " << finished.size()
00732              << " finished auctions for expiry" << endl;
00733 
00734         //RouterProfiler profiler(this, dutyCycleCurrent.nsExpireFinished);
00735 
00736         auto onExpiredFinished = [&] (const pair<Id, Id> & key,
00737                                       const FinishedInfo & info)
00738             {
00739                 recordHit("finishedAuctionExpiry");
00740 
00741                 // this->debugSpot(key.first, key.second, "EXPIRED FINISHED", {});
00742 
00743                 return Date();
00744             };
00745 
00746         finished.expire(onExpiredFinished);
00747     }
00748 
00749     banker->logBidEvents(*this);
00750 }
00751 
00752 void
00753 PostAuctionLoop::
00754 doAuction(const SubmittedAuctionEvent & event)
00755 {
00756     try {
00757         recordHit("processedAuction");
00758 
00759         const Id & auctionId = event.auctionId;
00760 
00761         //cerr << "doAuction for " << auctionId << endl;
00762 
00763         Date lossTimeout = event.lossTimeout;
00764 
00765         // move the auction over to the submitted bid pipeline...
00766         auto key = make_pair(auctionId, event.adSpotId);
00767 
00768         SubmissionInfo submission;
00769         vector<std::shared_ptr<PostAuctionEvent> > earlyWinEvents;
00770         if (submitted.count(key)) {
00771             submission = submitted.pop(key);
00772             earlyWinEvents.swap(submission.earlyWinEvents);
00773             recordHit("auctionAlreadySubmitted");
00774         }
00775 
00776         submission.bidRequest = std::move(event.bidRequest);
00777         submission.bidRequestStrFormat = std::move(event.bidRequestStrFormat);
00778         submission.bidRequestStr = std::move(event.bidRequestStr);
00779         submission.augmentations = std::move(event.augmentations);
00780         submission.bid = std::move(event.bidResponse);
00781 
00782         submitted.insert(key, submission, lossTimeout);
00783 
00784         string transId = makeBidId(auctionId, event.adSpotId, event.bidResponse.agent);
00785         banker->attachBid(event.bidResponse.account,
00786                           transId,
00787                           event.bidResponse.price.maxPrice);
00788 
00789 #if 0
00790         //cerr << "submitted " << auctionId << "; now " << submitted.size()
00791         //     << " auctions submitted" << endl;
00792 
00793         // Add to awaiting result list
00794         if (!agents.count(submission.bid.agent)) {
00795             logPAError("doSubmitted.unknownAgentWonAuction",
00796                        "unknown agent won auction");
00797             continue;
00798         }
00799         agents[submission.bid.agent].awaitingResult
00800             .insert(make_pair(auctionId, adSpotId));
00801 #endif
00802 
00803         /* Replay any early win/loss events. */
00804         for (auto it = earlyWinEvents.begin(),
00805                  end = earlyWinEvents.end();
00806              it != end;  ++it) {
00807             recordHit("replayedEarlyWinEvent");
00808             //cerr << "replaying early win message" << endl;
00809             doWinLoss(*it, true /* is_replay */);
00810         }
00811     } catch (const std::exception & exc) {
00812         cerr << "doAuction ignored error handling auction: "
00813              << exc.what() << endl;
00814     }
00815 }
00816 
00817 void
00818 PostAuctionLoop::
00819 doEvent(const std::shared_ptr<PostAuctionEvent> & event)
00820 {
00821     //cerr << "!!!PostAuctionLoop::doEvent:got post auction event " <<
00822     //print(event->type) << endl;
00823 
00824     try {
00825         switch (event->type) {
00826         case PAE_WIN:
00827         case PAE_LOSS:
00828             doWinLoss(event, false);
00829             break;
00830         case PAE_CAMPAIGN_EVENT:
00831             doCampaignEvent(event);
00832             break;
00833         default:
00834             throw Exception("postAuctionLoop.unknownEventType",
00835                             "unknown event type (%d)", event->type);
00836         }
00837     } catch (const std::exception & exc) {
00838         cerr << "doEvent " << print(event->type) << " threw: "
00839              << exc.what() << endl;
00840     }
00841 
00842     //cerr << "finished with event " << print(event->type) << endl;
00843 }
00844 
00845 void
00846 PostAuctionLoop::
00847 doWinLoss(const std::shared_ptr<PostAuctionEvent> & event, bool isReplay)
00848 {
00849     lastWinLoss = Date::now();
00850 
00851 #if 0
00852     static Date dbg_ts;
00853 
00854     if (!dbg_ts.secondsSinceEpoch()) dbg_ts = lastWinLoss;
00855 
00856     if (lastWinLoss > dbg_ts.plusSeconds(0.2)) {
00857       cerr << "WIN_RECEIVED: " << dbg_ts.printClassic() << endl;
00858       dbg_ts = Date::now();
00859     }
00860 #endif
00861 
00862     BidStatus status;
00863     if (event->type == PAE_WIN) {
00864         ML::atomic_inc(numWins);
00865         status = BS_WIN;
00866         recordHit("processedWin");
00867     }
00868     else {
00869         status = BS_LOSS;
00870         ML::atomic_inc(numLosses);
00871         recordHit("processedLoss");
00872     }
00873 
00874     const char * typeStr = print(event->type);
00875 
00876     if (!isReplay)
00877         recordHit("bidResult.%s.messagesReceived", typeStr);
00878     else
00879         recordHit("bidResult.%s.messagesReplayed", typeStr);
00880 
00881     //cerr << "doWinLoss 1" << endl;
00882 
00883     // cerr << "doWin" << message << endl;
00884     const Id & auctionId = event->auctionId;
00885     const Id & adSpotId = event->adSpotId;
00886     Amount winPrice = event->winPrice;
00887     Date timestamp = event->timestamp;
00888     const JsonHolder & meta = event->metadata;
00889     const UserIds & uids = event->uids;
00890     const AccountKey & account = event->account;
00891     if (account.size() == 0) {
00892         throw ML::Exception("invalid account key");
00893     }
00894 
00895     Date bidTimestamp = event->bidTimestamp;
00896 
00897     // debugSpot(auctionId, adSpotId, typeStr);
00898 
00899     auto getTimeGapMs = [&] ()
00900         {
00901             return 1000.0 * Date::now().secondsSince(bidTimestamp);
00902         };
00903 
00904     /*cerr << "doWinLoss for " << auctionId << "-" << adSpotId
00905          << " " << typeStr
00906          << " submitted.size() = " << submitted.size()
00907          << endl;
00908          */
00909     //cerr << "  key = (" << auctionId << "," << adSpotId << ")" << endl;
00910     auto key = make_pair(auctionId, adSpotId);
00911     /* In this case, the auction is finished which means we've already either:
00912        a) received a WIN message (and this one is a duplicate);
00913        b) received no WIN message, timed out, and inferred a loss
00914 
00915        Note that an auction is only removed when the last bidder has bid or
00916        timed out, and so an auction may be both inFlight and submitted or
00917        finished.
00918     */
00919     if (finished.count(key)) {
00920 
00921         //cerr << "doWinLoss in finished" << endl;
00922 
00923         FinishedInfo info = finished.get(key);
00924         if (info.hasWin()) {
00925             if (winPrice == info.winPrice
00926                 && status == info.reportedStatus) {
00927                 recordHit("bidResult.%s.duplicate", typeStr);
00928                 return;
00929             }
00930             else {
00931                 recordHit("bidResult.%s.duplicateWithDifferentPrice",
00932                           typeStr);
00933                 return;
00934             }
00935         }
00936         else recordHit("bidResult.%s.auctionAlreadyFinished",
00937                        typeStr);
00938         double timeGapMs = getTimeGapMs();
00939         recordOutcome(timeGapMs,
00940                       "bidResult.%s.alreadyFinishedTimeSinceBidSubmittedMs",
00941                       typeStr);
00942         cerr << "WIN for already completed auction: " << meta
00943              << " timeGapMs = " << timeGapMs << endl;
00944 
00945         cerr << "info win: " << info.winMeta << " time " << info.winTime
00946              << " info.hasWin() = " << info.hasWin() << endl;
00947 
00948         if (event->type == PAE_WIN) {
00949             // Late win with auction still around
00950             banker->forceWinBid(account, winPrice, LineItems());
00951 
00952             info.setWin(timestamp, BS_WIN, winPrice, meta.toString());
00953 
00954             finished.update(key, info);
00955 
00956             recordHit("bidResult.%s.winAfterLossAssumed", typeStr);
00957             recordOutcome(winPrice.value,
00958                           "bidResult.%s.winAfterLossAssumedAmount.%s",
00959                           typeStr, winPrice.getCurrencyStr());
00960 
00961             cerr << "got late win with price " << winPrice
00962                  << " for account " << account << endl;
00963         }
00964 
00965         /*
00966           cerr << "doWinLoss: auction " << key
00967           << " was in submitted auctions and also in finished auctions"
00968           << endl;
00969         */
00970         return;
00971     }
00972 
00973     //cerr << "doWinLoss not in finished" << endl;
00974 
00975     double lossTimeout = 15.0;
00976     /* If the auction wasn't finished, then it should be submitted.  The only
00977        time this won't happen is:
00978        a) when the WIN message raced and got in before we noticed the auction
00979           timeout.  In that case we will find the auction in inFlight and we
00980           can store that message there.
00981        b) when we were more than an hour late, which means that the auction
00982           is completely unknown.
00983     */
00984 #if 0
00985     cerr << fName << " number of elements in submitted " << submitted.size() << endl;
00986     for (auto it = submitted.begin() ; it != submitted.end() ;++it)
00987         cerr << it->first << endl;
00988 #endif
00989     if (!submitted.count(key)) {
00990         double timeGapMs = getTimeGapMs();
00991         if (timeGapMs < lossTimeout * 1000) {
00992             recordHit("bidResult.%s.noBidSubmitted", typeStr);
00993             //cerr << "WIN for active auction: " << meta
00994             //     << " timeGapMs = " << timeGapMs << endl;
00995 
00996             /* We record the win message here and play it back once we submit
00997                the auction.
00998             */
00999             SubmissionInfo info;
01000             info.earlyWinEvents.push_back(event);
01001             submitted.insert(key, info, Date::now().plusSeconds(lossTimeout));
01002 
01003             return;
01004         }
01005         else {
01006             cerr << "REALLY REALLY LATE WIN event='" << *event
01007                  << "' timeGapMs = " << timeGapMs << endl;
01008             cerr << "message = " << meta << endl;
01009             cerr << "bidTimestamp = " << bidTimestamp.print(6) << endl;
01010             cerr << "now = " << Date::now().print(6) << endl;
01011             cerr << "account = " << account << endl;
01012 
01013             recordHit("bidResult.%s.notInSubmitted", typeStr);
01014             recordOutcome(timeGapMs,
01015                           "bidResult.%s.notInSubmittedTimeSinceBidSubmittedMs",
01016                           typeStr);
01017 
01018             banker->forceWinBid(account, winPrice, LineItems());
01019 
01020             return;
01021         }
01022     }
01023     SubmissionInfo info = submitted.pop(key);
01024     if (!info.bidRequest) {
01025         //cerr << "doWinLoss doubled bid request" << endl;
01026 
01027         // We doubled up on a WIN without having got the auction yet
01028         info.earlyWinEvents.push_back(event);
01029         submitted.insert(key, info, Date::now().plusSeconds(lossTimeout));
01030         return;
01031     }
01032 
01033     recordHit("bidResult.%s.delivered", typeStr);
01034 
01035     //cerr << "event.metadata = " << event->metadata << endl;
01036     //cerr << "event.winPrice = " << event->winPrice << endl;
01037 
01038     doBidResult(auctionId, adSpotId, info,
01039                 winPrice, timestamp, status,
01040                 status == BS_WIN ? "guaranteed" : "inferred",
01041                 meta.toString(), uids);
01042     std::for_each(info.earlyCampaignEvents.begin(),
01043                   info.earlyCampaignEvents.end(),
01044                   std::bind(&PostAuctionLoop::doCampaignEvent, this,
01045                             std::placeholders::_1));
01046 
01047     //cerr << "doWinLoss done" << endl;
01048 }
01049 
01050 template<typename Value>
01051 bool findAuction(PendingList<pair<Id,Id>, Value> & pending,
01052                  const Id & auctionId)
01053 {
01054     auto key = make_pair(auctionId, Id());
01055     auto key2 = pending.completePrefix(key, IsPrefixPair());
01056     return key2.first == auctionId;
01057 }
01058 
01059 template<typename Value>
01060 bool findAuction(PendingList<pair<Id,Id>, Value> & pending,
01061                  const Id & auctionId,
01062                  Id & adSpotId, Value & val)
01063 {
01064     auto key = make_pair(auctionId, adSpotId);
01065     if (!adSpotId) {
01066         auto key2 = pending.completePrefix(key, IsPrefixPair());
01067         if (key2.first == auctionId) {
01068             //cerr << "found info for " << make_pair(auctionId, adSpotId)
01069             //     << " under " << key << endl;
01070             adSpotId = key2.second;
01071             key = key2;
01072         }
01073         else return false;
01074     }
01075 
01076     if (!pending.count(key)) return false;
01077     val = pending.get(key);
01078 
01079     return true;
01080 }
01081 
01082 void
01083 PostAuctionLoop::
01084 doCampaignEvent(const std::shared_ptr<PostAuctionEvent> & event)
01085 {
01086     //RouterProfiler profiler(this, dutyCycleCurrent.nsImpression);
01087     //static const char* fName = "PostAuctionLoop::doCampaignEvent:";
01088     const string & label = event->label;
01089     const Id & auctionId = event->auctionId;
01090     Id adSpotId = event->adSpotId;
01091     Date timestamp = event->timestamp;
01092     const JsonHolder & meta = event->metadata;
01093     const UserIds & uids = event->uids;
01094 
01095     SubmissionInfo submissionInfo;
01096     FinishedInfo finishedInfo;
01097 
01098     if (event->type != PAE_CAMPAIGN_EVENT) {
01099         throw ML::Exception("event type must be PAE_CAMPAIGN_EVENT: "
01100                             + string(print(event->type)));
01101     }
01102 
01103     lastCampaignEvent = Date::now();
01104 
01105     recordHit("delivery.EVENT.%s.messagesReceived", label);
01106 
01107     //cerr << fName << typeStr << " " << auctionId << "-" << adSpotId << endl;
01108     //cerr <<"The number of elements in submitted " << submitted.size() << endl;
01109     auto recordUnmatched = [&] (const std::string & why)
01110         {
01111             this->logMessage(string("UNMATCHED") + label, why,
01112                              auctionId.toString(), adSpotId.toString(),
01113                              to_string(timestamp.secondsSinceEpoch()),
01114                              meta);
01115         };
01116 
01117     if (findAuction(submitted, auctionId, adSpotId, submissionInfo)) {
01118         // Record the impression or click in the submission info.  This will
01119         // then be passed on once the win comes in.
01120         //
01121         // TODO: for now we just ignore the event; we should eventually
01122         // implement what is written above
01123         //cerr << "auction " << auctionId << "-" << adSpotId
01124         //     << " in flight but got " << type << endl;
01125         recordHit("delivery.%s.stillInFlight", label);
01126         logPAError(string("doCampaignEvent.auctionNotWon") + label,
01127                    "message for auction that's not won");
01128         recordUnmatched("inFlight");
01129 
01130         submissionInfo.earlyCampaignEvents.push_back(event);
01131 
01132         submitted.update(make_pair(auctionId, adSpotId), submissionInfo);
01133         return;
01134     }
01135     else if (findAuction(finished, auctionId, adSpotId, finishedInfo)) {
01136         // Update the info
01137         if (finishedInfo.campaignEvents.hasEvent(label)) {
01138             recordHit("delivery.%s.duplicate", label);
01139             logPAError(string("doCampaignEvent.duplicate")
01140                        + label, "message duplicated");
01141             recordUnmatched("duplicate");
01142             return;
01143         }
01144 
01145         finishedInfo.campaignEvents.setEvent(label, timestamp, meta);
01146         ML::atomic_inc(numCampaignEvents);
01147 
01148         recordHit("delivery.%s.account.%s.matched",
01149                   label,
01150                   finishedInfo.bid.account.toString().c_str());
01151 
01152         pair<Id, Id> key(auctionId, adSpotId);
01153         //cerr << "key = " << key << endl;
01154         if (!key.second)
01155             throw ML::Exception("updating null entry in finished map");
01156 
01157         // Add in the user IDs to the index so we can route any visits
01158         // properly
01159         finishedInfo.addUids(uids);
01160 
01161         finished.update(key, finishedInfo);
01162 
01163         routePostAuctionEvent(label, finishedInfo,
01164                               SegmentList(), false /* filterChannels */);
01165     }
01166     else {
01167         /* We get here if we got an IMPRESSION or a CLICK before we got
01168            notification that an auction had been submitted.
01169 
01170            Normally this should happen rarely.  However, in some cases
01171            (for example a transient failure in the router to post auction
01172            loop link which is rectified and allows buffered messages to
01173            be replayed) we may still want to match things up.
01174 
01175            What we should do here is to keep these messages around in a
01176            buffer (like the early win messages) and replay them when the
01177            auction event comes in.
01178         */
01179 
01180         recordHit("delivery.%s.auctionNotFound", label);
01181         //cerr << "delivery " << typeStr << ": auction "
01182         //     << auctionId << "-" << adSpotId << " not found"
01183         //     << endl;
01184         logPAError(string("doCampaignEvent.auctionNotFound") + label,
01185                    "auction not found for delivery message");
01186         recordUnmatched("auctionNotFound");
01187     }
01188 }
01189 
01190 bool
01191 PostAuctionLoop::
01192 routePostAuctionEvent(const string & label,
01193                       const FinishedInfo & finishedInfo,
01194                       const SegmentList & channels,
01195                       bool filterChannels)
01196 {
01197     // For the moment, send the message to all of the agents that are
01198     // bidding on this account
01199     const AccountKey & account = finishedInfo.bid.account;
01200 
01201     bool sent = false;
01202     auto onMatchingAgent = [&] (const AgentConfigEntry & entry)
01203         {
01204             if (!entry.config) return;
01205             if (filterChannels) {
01206                 if (!entry.config->visitChannels.match(channels))
01207                     return;
01208             }
01209 
01210             sent = true;
01211 
01212             this->sendAgentMessage(entry.name,
01213                                    label,
01214                                    Date::now(),
01215                                    finishedInfo.auctionId,
01216                                    finishedInfo.adSpotId,
01217                                    to_string(finishedInfo.spotIndex),
01218                                    finishedInfo.bidRequestStrFormat,
01219                                    finishedInfo.bidRequestStr,
01220                                    finishedInfo.augmentations,
01221                                    finishedInfo.bidToJson(),
01222                                    finishedInfo.winToJson(),
01223                                    finishedInfo.campaignEvents.toJson(),
01224                                    finishedInfo.visitsToJson());
01225         };
01226 
01227     configListener.forEachAccountAgent(account, onMatchingAgent);
01228 
01229     if (!sent) {
01230         recordHit("delivery.%s.orphaned", label);
01231         logPAError(string("doCampaignEvent.noListeners") + label,
01232                    "nothing listening for account " + account.toString());
01233     }
01234     else recordHit("delivery.%s.delivered", label);
01235 
01236     // TODO: full account
01237     this->logMessage
01238         (string("MATCHED") + label,
01239          finishedInfo.auctionId,
01240          finishedInfo.adSpotId,
01241          finishedInfo.bidRequestStr,
01242          finishedInfo.bidToJson(),
01243          finishedInfo.winToJson(),
01244          finishedInfo.campaignEvents.toJson(),
01245          finishedInfo.visitsToJson(),
01246          finishedInfo.bid.account[0],
01247          finishedInfo.bid.account[1],
01248          finishedInfo.bid.account.toString(),
01249          finishedInfo.bidRequestStrFormat);
01250 
01251     return sent;
01252 }
01253 
01254 void
01255 PostAuctionLoop::
01256 doBidResult(const Id & auctionId,
01257             const Id & adSpotId,
01258             const SubmissionInfo & submission,
01259             Amount winPrice,
01260             Date timestamp,
01261             BidStatus status,
01262             const std::string & confidence,
01263             const std::string & winLossMeta,
01264             const UserIds & uids)
01265 {
01266     //RouterProfiler profiler(this, dutyCycleCurrent.nsBidResult);
01267     //static const char *fName = "PostAuctionLoop::doBidResult:";
01268     string msg;
01269 
01270     if (status == BS_WIN) msg = "WIN";
01271     else if (status == BS_LOSS) msg = "LOSS";
01272     else throwException("doBidResult.nonWinLoss", "submitted non win/loss");
01273 
01274 #if 0
01275     cerr << "doBidResult: " << msg
01276          << " id " << auctionId << " spot " << adSpotId
01277          << " submission " << submission.bid.agent << " "
01278          << submission.bid.price.maxPrice
01279          << " winPrice " << winPrice << "bid request string format<"
01280          << submission.bidRequestStrFormat << ">" <<  endl;
01281 #endif
01282 
01283     // debugSpot(auctionId, adSpotId, msg);
01284 
01285     if (!adSpotId)
01286         throw ML::Exception("inserting null entry in finished map");
01287 
01288     string agent = submission.bid.agent;
01289 
01290     // Find the adspot ID
01291     int adspot_num = submission.bidRequest->findAdSpotIndex(adSpotId);
01292 
01293     if (adspot_num == -1) {
01294         logPAError("doBidResult.adSpotIdNotFound",
01295                    "adspot ID ", adSpotId, " not found in auction ",
01296                    submission.bidRequestStr);
01297     }
01298 
01299     const Auction::Response & response = submission.bid;
01300 
01301     const AccountKey & account = response.account;
01302     if (account.size() == 0) {
01303         throw ML::Exception("invalid account key");
01304     }
01305 
01306 #if 0
01307     if (doDebug)
01308         debugSpot(auctionId, adSpotId, "ACCOUNT " + account.toString());
01309 #endif
01310 
01311     Amount bidPrice = response.price.maxPrice;
01312 
01313 #if 0
01314     cerr << fName << "bidPriceMicros = " << bidPriceMicros
01315          << " winPriceMicros = " << winPriceMicros
01316          << endl;
01317 #endif
01318 
01319     //cerr << "account = " << account << endl;
01320 
01321     if (winPrice > bidPrice) {
01322         //cerr << submission.bidRequestStr << endl;
01323         logPAError("doBidResult.winPriceExceedsBidPrice",
01324                    ML::format("win price %s exceeds bid price %s",
01325                               winPrice.toString(),
01326                                   bidPrice.toString()));
01327     }
01328 
01329     // Make sure we account for the bid no matter what
01330     ML::Call_Guard guard
01331         ([&] ()
01332          {
01333              banker->cancelBid(account, makeBidId(auctionId, adSpotId, agent));
01334          });
01335 
01336     // No bid
01337     if (bidPrice == Amount() && response.price.priority == 0) {
01338         throwException("doBidResult.responseadNoBidPrice",
01339                        "bid response had no bid price");
01340     }
01341 
01342     if (status == BS_WIN) {
01343         //info.stats->totalSpent += winPrice;
01344         //info.stats->totalBidOnWins += bidPrice;
01345 
01346         // This is a real win
01347         guard.clear();
01348         banker->winBid(account, makeBidId(auctionId, adSpotId, agent), winPrice,
01349                        LineItems());
01350 
01351         //++info.stats->wins;
01352         // local win; send it back
01353 
01354         //cerr << "MATCHEDWIN " << auctionId << "-" << adSpotId << endl;
01355     }
01356     else if (status == BS_LOSS) {
01357         //++info.stats->losses;
01358         // local loss; send it back
01359     }
01360     else throwException("doBidResult.nonWinLoss", "submitted non win/loss");
01361     logMessage("MATCHED" + msg,
01362                auctionId,
01363                to_string(adspot_num),
01364                response.agent,
01365                account[1],
01366                winPrice.toString(),
01367                response.price.maxPrice.toString(),
01368                to_string(response.price.priority),
01369                submission.bidRequestStr,
01370                response.bidData,
01371                response.meta,
01372                to_string(response.creativeId),
01373                response.creativeName,
01374                account[0],
01375                uids.toJsonStr(),
01376                winLossMeta,
01377                account[0],
01378                adSpotId,
01379                account.toString(),
01380                submission.bidRequestStrFormat);
01381 
01382     sendAgentMessage(response.agent, msg, timestamp,
01383                      confidence, auctionId,
01384                      to_string(adspot_num),
01385                      winPrice.toString(),
01386                      submission.bidRequestStrFormat,
01387                      submission.bidRequestStr,
01388                      response.bidData,
01389                      response.meta,
01390                      submission.augmentations);
01391 
01392     // Finally, place it in the finished queue
01393     FinishedInfo i;
01394     i.auctionId = auctionId;
01395     i.adSpotId = adSpotId;
01396     i.spotIndex = adspot_num;
01397     i.bidRequest = submission.bidRequest;
01398     i.bidRequestStr = submission.bidRequestStr;
01399     i.bidRequestStrFormat = submission.bidRequestStrFormat ; 
01400     i.bid = response;
01401     i.reportedStatus = status;
01402     //i.auctionTime = auction.start;
01403     i.setWin(timestamp, status, winPrice, winLossMeta);
01404 
01405     // Copy the configuration into the finished info so that we can
01406     // know which visits to route back
01407     i.visitChannels = response.visitChannels;
01408 
01409     i.addUids(uids);
01410 
01411     double expiryInterval = 3600;
01412     if (status == BS_LOSS)
01413         expiryInterval = 900;
01414 
01415     Date expiryTime = Date::now().plusSeconds(expiryInterval);
01416 
01417     finished.insert(make_pair(auctionId, adSpotId), i, expiryTime);
01418 }
01419 
01420 void
01421 PostAuctionLoop::
01422 injectSubmittedAuction(const Id & auctionId,
01423                        const Id & adSpotId,
01424                        std::shared_ptr<BidRequest> bidRequest,
01425                        const std::string & bidRequestStr,
01426                        const std::string & bidRequestStrFormat,
01427                        const JsonHolder & augmentations,
01428                        const Auction::Response & bidResponse,
01429                        Date lossTimeout)
01430 {
01431     if (bidRequestStr.size() == 0) {
01432         throw ML::Exception("invalid bidRequestStr");
01433     }
01434     if (bidRequestStrFormat.size() == 0) {
01435         throw ML::Exception("invalid bidRequestStrFormat");
01436     }
01437 
01438     SubmittedAuctionEvent event;
01439     event.auctionId = auctionId;
01440     event.adSpotId = adSpotId;
01441     event.bidRequest = bidRequest;
01442     event.bidRequestStr = bidRequestStr;
01443     event.bidRequestStrFormat = bidRequestStrFormat;
01444     event.augmentations = augmentations;
01445     event.bidResponse = bidResponse;
01446     event.lossTimeout = lossTimeout;
01447 
01448     auctions.push(event);
01449 }
01450 
01451 void
01452 PostAuctionLoop::
01453 notifyFinishedSpot(const Id & auctionId, const Id & adSpotId)
01454 {
01455     throw ML::Exception("notifyFinishedSpot(): not implemented");
01456 }
01457 
01458 std::string
01459 PostAuctionLoop::
01460 makeBidId(Id auctionId, Id spotId, const std::string & agent)
01461 {
01462     return auctionId.toString() + "-" + spotId.toString() + "-" + agent;
01463 }
01464 
01465 std::string
01466 PostAuctionLoop::
01467 getProviderName()
01468     const
01469 {
01470     return serviceName();
01471 }
01472 
01473 Json::Value
01474 PostAuctionLoop::
01475 getProviderIndicators()
01476     const
01477 {
01478     Json::Value value;
01479 
01480     /* PA health check:
01481        - last campaign event in the last 10 seconds */
01482     Date now = Date::now();
01483     bool status(now < lastWinLoss.plusSeconds(10)
01484                 && now < lastCampaignEvent.plusSeconds(10));
01485 
01486 #if 0
01487     if (!status)  {
01488       cerr << "--- WRONGNESS DETECTED:" 
01489        << " last event: " << (now - lastCampaignEvent)
01490        << endl;
01491     }
01492 #endif
01493 
01494     value["status"] = status ? "ok" : "failure";
01495 
01496     return value;
01497 }
01498 
01499 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator