RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/router.cc
00001 /* rtb_router.cc
00002    Jeremy Barnes, 24 March 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    RTB router code.
00006 */
00007 
00008 #include "router.h"
00009 #include "soa/service/zmq_utils.h"
00010 #include "jml/arch/backtrace.h"
00011 #include "jml/arch/futex.h"
00012 #include "jml/arch/exception_handler.h"
00013 #include "soa/jsoncpp/writer.h"
00014 #include <boost/foreach.hpp>
00015 #include "jml/arch/atomic_ops.h"
00016 #include "jml/utils/set_utils.h"
00017 #include "jml/utils/environment.h"
00018 #include "jml/arch/info.h"
00019 #include "jml/utils/lightweight_hash.h"
00020 #include "jml/math/xdiv.h"
00021 #include <boost/tuple/tuple.hpp>
00022 #include "jml/utils/pair_utils.h"
00023 #include "jml/utils/exc_assert.h"
00024 #include "jml/db/persistent.h"
00025 #include "jml/utils/json_parsing.h"
00026 #include <boost/make_shared.hpp>
00027 #include "profiler.h"
00028 #include "rtbkit/core/banker/banker.h"
00029 #include "rtbkit/core/banker/null_banker.h"
00030 #include <boost/algorithm/string.hpp>
00031 #include "rtbkit/common/bids.h"
00032 #include "rtbkit/common/messages.h"
00033 #include "rtbkit/common/auction_events.h"
00034 
00035 
00036 using namespace std;
00037 using namespace ML;
00038 
00039 
00040 namespace RTBKIT {
00041 
00042 
00043 
00044 /*****************************************************************************/
00045 /* AGENT INFO                                                                */
00046 /*****************************************************************************/
00047 
00048 Json::Value
00049 AgentInfoEntry::
00050 toJson() const
00051 {
00052     Json::Value result;
00053     if (!valid()) return result;
00054     result["config"] = config->toJson();
00055     result["stats"] = stats->toJson();
00056     return result;
00057 }
00058 
00059 /*****************************************************************************/
00060 /* AUCTION DEBUG INFO                                                        */
00061 /*****************************************************************************/
00062 
00063 void
00064 AuctionDebugInfo::
00065 addAuctionEvent(Date timestamp, std::string type,
00066                 const std::vector<std::string> & args)
00067 {
00068     Message message;
00069     message.timestamp = timestamp;
00070     message.type = type;
00071     //message.args = args;
00072     messages.push_back(message);
00073 }
00074 
00075 void
00076 AuctionDebugInfo::
00077 addSpotEvent(const Id & spot, Date timestamp, std::string type,
00078              const std::vector<std::string> & args)
00079 {
00080     Message message;
00081     message.spot = spot;
00082     message.timestamp = timestamp;
00083     message.type = type;
00084     //message.args = args;
00085     messages.push_back(message);
00086 }
00087 
00088 void
00089 AuctionDebugInfo::
00090 dumpAuction() const
00091 {
00092     for (unsigned i = 0;  i < messages.size();  ++i) {
00093         auto & m = messages[i];
00094         cerr << m.timestamp.print(6) << " " << m.spot << " " << m.type << endl;
00095     }
00096 }
00097 
00098 void
00099 AuctionDebugInfo::
00100 dumpSpot(Id spot) const
00101 {
00102     dumpAuction();  // TODO
00103 }
00104 
00105 
00106 /*****************************************************************************/
00107 /* ROUTER                                                                    */
00108 /*****************************************************************************/
00109 
00110 Router::
00111 Router(ServiceBase & parent,
00112        const std::string & serviceName,
00113        double secondsUntilLossAssumed,
00114        bool connectPostAuctionLoop)
00115     : ServiceBase(serviceName, parent),
00116       shutdown_(false),
00117       agentEndpoint(getZmqContext()),
00118       configBuffer(1024),
00119       startBiddingBuffer(65536),
00120       submittedBuffer(65536),
00121       auctionGraveyard(65536),
00122       augmentationLoop(*this),
00123       secondsUntilLossAssumed_(secondsUntilLossAssumed),
00124       globalBidProbability(1.0),
00125       bidsErrorRate(0.0),
00126       budgetErrorRate(0.0),
00127       connectPostAuctionLoop(connectPostAuctionLoop),
00128       allAgents(new AllAgentInfo()),
00129       configListener(getZmqContext()),
00130       initialized(false),
00131       logger(getZmqContext()),
00132       doDebug(false),
00133       numAuctions(0), numBids(0), numNonEmptyBids(0),
00134       numAuctionsWithBid(0), numNoPotentialBidders(0),
00135       numNoBidders(0),
00136       monitorClient(getZmqContext()),
00137       slowModeCount(0),
00138       monitorProviderClient(getZmqContext(), *this)
00139 {
00140 }
00141 
00142 Router::
00143 Router(std::shared_ptr<ServiceProxies> services,
00144        const std::string & serviceName,
00145        double secondsUntilLossAssumed,
00146        bool connectPostAuctionLoop)
00147     : ServiceBase(serviceName, services),
00148       shutdown_(false),
00149       agentEndpoint(getZmqContext()),
00150       postAuctionEndpoint(getZmqContext()),
00151       configBuffer(1024),
00152       startBiddingBuffer(65536),
00153       submittedBuffer(65536),
00154       auctionGraveyard(65536),
00155       augmentationLoop(*this),
00156       secondsUntilLossAssumed_(secondsUntilLossAssumed),
00157       globalBidProbability(1.0),
00158       bidsErrorRate(0.0),
00159       budgetErrorRate(0.0),
00160       connectPostAuctionLoop(connectPostAuctionLoop),
00161       allAgents(new AllAgentInfo()),
00162       configListener(getZmqContext()),
00163       initialized(false),
00164       logger(getZmqContext()),
00165       doDebug(false),
00166       numAuctions(0), numBids(0), numNonEmptyBids(0),
00167       numAuctionsWithBid(0), numNoPotentialBidders(0),
00168       numNoBidders(0),
00169       monitorClient(getZmqContext()),
00170       slowModeCount(0),
00171       monitorProviderClient(getZmqContext(), *this)
00172 {
00173 }
00174 
00175 void
00176 Router::
00177 init()
00178 {
00179     ExcAssert(!initialized);
00180 
00181     registerServiceProvider(serviceName(), { "rtbRequestRouter" });
00182 
00183     banker.reset(new NullBanker());
00184 
00185     augmentationLoop.init();
00186 
00187     logger.init(getServices()->config, serviceName() + "/logger");
00188 
00189 
00190     agentEndpoint.init(getServices()->config, serviceName() + "/agents");
00191     agentEndpoint.clientMessageHandler
00192         = std::bind(&Router::handleAgentMessage, this, std::placeholders::_1);
00193     agentEndpoint.onConnection = [=] (const std::string & agent)
00194         {
00195             cerr << "agent " << agent << " connected to router" << endl;
00196         };
00197 
00198     agentEndpoint.onDisconnection = [=] (const std::string & agent)
00199         {
00200             cerr << "agent " << agent << " disconnected from router" << endl;
00201         };
00202 
00203     postAuctionEndpoint.init(getServices()->config, ZMQ_XREQ);
00204 
00205     configListener.onConfigChange = [=] (const std::string & agent,
00206                                          std::shared_ptr<const AgentConfig> config)
00207         {
00208             cerr << endl << endl << "agent " << agent << " got new configuration" << endl;
00209             configBuffer.push(make_pair(agent, config));
00210         };
00211 
00212     onSubmittedAuction = [=] (std::shared_ptr<Auction> auction,
00213                               Id adSpotId,
00214                               Auction::Response response)
00215         {
00216             submitToPostAuctionService(auction, adSpotId, response);
00217         };
00218 
00219     monitorClient.init(getServices()->config);
00220     monitorProviderClient.init(getServices()->config);
00221 
00222     initialized = true;
00223 }
00224 
00225 Router::
00226 ~Router()
00227 {
00228     shutdown();
00229 }
00230 
00231 std::shared_ptr<Banker>
00232 Router::
00233 getBanker() const
00234 {
00235     return banker;
00236 }
00237 
00238 void
00239 Router::
00240 setBanker(const std::shared_ptr<Banker> & newBanker)
00241 {
00242     banker = newBanker;
00243 }
00244 
00245 void
00246 Router::
00247 bindTcp()
00248 {
00249     logger.bindTcp(getServices()->ports->getRange("logs"));
00250     agentEndpoint.bindTcp(getServices()->ports->getRange("router"));
00251 }
00252 
00253 void
00254 Router::
00255 bindAgents(std::string agentUri)
00256 {
00257     try {
00258         agentEndpoint.bind(agentUri.c_str());
00259     } catch (const std::exception & exc) {
00260         throw Exception("error while binding agent URI %s: %s",
00261                             agentUri.c_str(), exc.what());
00262     }
00263 }
00264 
00265 void
00266 Router::
00267 bindAugmentors(const std::string & uri)
00268 {
00269     try {
00270         augmentationLoop.bindAugmentors(uri);
00271     } catch (const std::exception & exc) {
00272         throw Exception("error while binding augmentation URI %s: %s",
00273                         uri.c_str(), exc.what());
00274     }
00275 }
00276 
00277 void
00278 Router::
00279 unsafeDisableMonitor()
00280 {
00281     // TODO: we shouldn't be reaching inside these structures...
00282     monitorClient.testMode = true;
00283     monitorClient.testResponse = true;
00284     monitorProviderClient.inhibit_ = true;
00285 }
00286 
00287 void
00288 Router::
00289 start(boost::function<void ()> onStop)
00290 {
00291     ExcAssert(initialized);
00292 
00293     static Lock lock;
00294     Guard guard(lock);
00295 
00296     if (runThread)
00297         throw Exception("router is already running");
00298 
00299     auto runfn = [=] ()
00300         {
00301             this->run();
00302             if (onStop) onStop();
00303         };
00304 
00305     logger.start();
00306     augmentationLoop.start();
00307     runThread.reset(new boost::thread(runfn));
00308 
00309     if (connectPostAuctionLoop) {
00310         postAuctionEndpoint.connectToServiceClass("rtbPostAuctionService", "events");
00311     }
00312 
00313     configListener.init(getServices()->config);
00314     configListener.start();
00315 
00316     /* This is an extra thread which sits there deleting auctions
00317        to take this out of the hands of the main loop (it can easily use
00318        up nearly 20% of the capacity of the main loop).
00319     */
00320     auto auctionDeleter = [=] ()
00321         {
00322             while (!this->shutdown_) {
00323                 std::shared_ptr<Auction> toDelete;
00324                 auctionGraveyard.tryPop(toDelete, 0.05);
00325                 int numDeleted = 1;
00326                 while (this->auctionGraveyard.tryPop(toDelete))
00327                     ++numDeleted;
00328                 //cerr << "deleted " << numDeleted << " auctions"
00329                 //     << endl;
00330                 ML::sleep(0.001);
00331             }
00332         };
00333 
00334     cleanupThread.reset(new boost::thread(auctionDeleter));
00335 
00336     monitorClient.start();
00337     monitorProviderClient.start();
00338 }
00339 
00340 size_t
00341 Router::
00342 numNonIdle() const
00343 {
00344     size_t numInFlight, numAwaitingAugmentation;
00345     {
00346         Guard guard(lock);
00347         numInFlight = inFlight.size();
00348         numAwaitingAugmentation = augmentationLoop.numAugmenting();
00349     }
00350 
00351     cerr << "numInFlight = " << numInFlight << endl;
00352     cerr << "numAwaitingAugmentation = " << numAwaitingAugmentation << endl;
00353 
00354     return numInFlight + numAwaitingAugmentation;
00355 }
00356 
00357 void
00358 Router::
00359 sleepUntilIdle()
00360 {
00361     for (int iter = 0;;++iter) {
00362         augmentationLoop.sleepUntilIdle();
00363         size_t nonIdle = numNonIdle();
00364         if (nonIdle == 0) break;
00365         //cerr << "there are " << nonIdle << " non-idle" << endl;
00366         ML::sleep(0.001);
00367     }
00368 }
00369 
00370 void
00371 Router::
00372 issueTimestamp()
00373 {
00374     Date now = Date::now();
00375 
00376     cerr << "timestamp: "
00377          << ML::format("%.6f", now.secondsSinceEpoch())
00378          << " - " << now.printClassic()
00379          << endl;
00380 }
00381 
00382 void
00383 Router::
00384 run()
00385 {
00386     using namespace std;
00387 
00388     zmq_pollitem_t items [] = {
00389         { agentEndpoint.getSocketUnsafe(), 0, ZMQ_POLLIN, 0 },
00390         { 0, wakeupMainLoop.fd(), ZMQ_POLLIN, 0 }
00391     };
00392 
00393     double last_check = ML::wall_time(), last_check_pace = last_check,
00394         lastPings = last_check;
00395 
00396     //cerr << "server listening" << endl;
00397 
00398     auto getTime = [&] () { return Date::now().secondsSinceEpoch(); };
00399 
00400     double beforeSleep, afterSleep = getTime();
00401     int numTimesCouldSleep = 0;
00402     int totalSleeps = 0;
00403     double lastTimestamp = 0;
00404 
00405     recordHit("routerUp");
00406 
00407     //double lastDump = ML::wall_time();
00408 
00409     struct TimesEntry {
00410         TimesEntry()
00411             : time(0.0), count(0)
00412         {
00413         };
00414 
00415         void add(double time)
00416         {
00417             this->time += time;
00418             ++count;
00419         }
00420 
00421         double time;
00422         uint64_t count;
00423     };
00424 
00425     std::map<std::string, TimesEntry> times;
00426 
00427     double auctionKeepProbability = 1.0;
00428 
00429     // Attempt to wake up once per millisecond
00430 
00431     Date lastSleep = Date::now();
00432 
00433     while (!shutdown_) {
00434         beforeSleep = getTime();
00435 
00436         dutyCycleCurrent.nsProcessing
00437             += microsecondsBetween(beforeSleep, afterSleep);
00438 
00439         int rc = 0;
00440 
00441         for (unsigned i = 0;  i < 20 && rc == 0;  ++i)
00442             rc = zmq_poll(items, 2, 0);
00443         if (rc == 0) {
00444             ++numTimesCouldSleep;
00445             checkExpiredAuctions();
00446 
00447 #if 1
00448             // Try to sleep only once per 1/2 a millisecond to avoid too many
00449             // context switches.
00450             Date now = Date::now();
00451             double timeSinceSleep = lastSleep.secondsUntil(now);
00452             double timeToWait = 0.0005 - timeSinceSleep;
00453             if (timeToWait > 0) {
00454                 ML::sleep(timeToWait);
00455             }
00456             lastSleep = now;
00457 #endif
00458 
00459 
00460             rc = zmq_poll(items, 2, 50 /* milliseconds */);
00461         }
00462 
00463         //cerr << "rc = " << rc << endl;
00464 
00465         afterSleep = getTime();
00466 
00467         dutyCycleCurrent.nsSleeping
00468             += microsecondsBetween(afterSleep, beforeSleep);
00469         dutyCycleCurrent.nEvents += 1;
00470 
00471         times["asleep"].add(microsecondsBetween(afterSleep, beforeSleep));
00472 
00473         if (rc == -1 && zmq_errno() != EINTR) {
00474             cerr << "zeromq error: " << zmq_strerror(zmq_errno()) << endl;
00475         }
00476 
00477         {
00478             double atStart = getTime();
00479             std::shared_ptr<AugmentationInfo> info;
00480             while (startBiddingBuffer.tryPop(info)) {
00481                 doStartBidding(info);
00482             }
00483 
00484             double atEnd = getTime();
00485             times["doStartBidding"].add(microsecondsBetween(atEnd, atStart));
00486         }
00487 
00488 
00489 
00490         {
00491             double atStart = getTime();
00492 
00493             std::pair<std::string, std::shared_ptr<const AgentConfig> > config;
00494             while (configBuffer.tryPop(config)) {
00495                 if (!config.second) {
00496                     // deconfiguration
00497                     // TODO
00498                     cerr << "agent " << config.first << " lost configuration"
00499                          << endl;
00500                 }
00501                 else {
00502                     doConfig(config.first, config.second);
00503                 }
00504             }
00505 
00506             double atEnd = getTime();
00507             times["doConfig"].add(microsecondsBetween(atEnd, atStart));
00508         }
00509 
00510         {
00511             double atStart = getTime();
00512             std::shared_ptr<Auction> auction;
00513             while (submittedBuffer.tryPop(auction))
00514                 doSubmitted(auction);
00515 
00516             double atEnd = getTime();
00517             times["doSubmitted"].add(microsecondsBetween(atEnd, atStart));
00518         }
00519 
00520         if (items[0].revents & ZMQ_POLLIN) {
00521             double beforeMessage = getTime();
00522             // Agent message
00523             vector<string> message;
00524             try {
00525                 message = recvAll(agentEndpoint.getSocketUnsafe());
00526                 agentEndpoint.handleMessage(std::move(message));
00527                 double atEnd = getTime();
00528                 times[message.at(1)].add(microsecondsBetween(atEnd, beforeMessage));
00529             } catch (const std::exception & exc) {
00530                 cerr << "error handling agent message " << message
00531                      << ": " << exc.what() << endl;
00532                 logRouterError("handleAgentMessage", exc.what(),
00533                                message);
00534             }
00535         }
00536 
00537         if (items[1].revents & ZMQ_POLLIN) {
00538             wakeupMainLoop.read();
00539         }
00540 
00541         //checkExpiredAuctions();
00542 
00543         double now = ML::wall_time();
00544         double beforeChecks = getTime();
00545 
00546         if (now - lastPings > 1.0) {
00547             // Send out pings and interpret the results of the last lot of
00548             // pinging.
00549             sendPings();
00550 
00551             lastPings = now;
00552         }
00553 
00554         if (now - last_check_pace > 10.0) {
00555             if (numTimesCouldSleep < 50) {
00556                 auctionKeepProbability = std::max(auctionKeepProbability - 0.10,
00557                                                  0.10);
00558             }
00559             else if (numTimesCouldSleep > 2000) {
00560                 auctionKeepProbability = std::min(auctionKeepProbability + 0.10,
00561                                                  1.00);
00562             }
00563             else if (numTimesCouldSleep > 500) {
00564                 auctionKeepProbability = std::min(auctionKeepProbability + 0.05,
00565                                                  1.00);
00566             }
00567             else if (numTimesCouldSleep > 100) {
00568                 auctionKeepProbability = std::min(auctionKeepProbability + 0.01,
00569                                                  1.00);
00570             }
00571 
00572 #if 1
00573             cerr << "auctionKeepProbability = " << auctionKeepProbability
00574                  << " numTimesCouldSleep = " << numTimesCouldSleep
00575                  << endl;
00576 #endif
00577 
00578             // Start dropping them early if we get to 50% and we're still having
00579             // trouble keeping up.
00580             double earlyAuctionKeepProbability = 1.0;
00581 #if 0
00582             if (auctionKeepProbability < 0.5)
00583                 earlyAuctionKeepProbability = auctionKeepProbability * 0.5;
00584 #else
00585             earlyAuctionKeepProbability = auctionKeepProbability;
00586 #endif
00587             setAcceptAuctionProbability(earlyAuctionKeepProbability);
00588 
00589             recordEvent("auctionKeepPercentage", ET_LEVEL,
00590                     auctionKeepProbability * 100.0);
00591             recordEvent("numTimesCouldSleep", ET_LEVEL,
00592                         numTimesCouldSleep);
00593 
00594             totalSleeps += numTimesCouldSleep;
00595 
00596             numTimesCouldSleep = 0;
00597             last_check_pace = now;
00598         }
00599 
00600         if (now - last_check > 10.0) {
00601 
00602             logMessage("MARK",
00603                        Date::fromSecondsSinceEpoch(last_check).print(),
00604                        format("active: %zd augmenting, %zd inFlight, "
00605                               "%zd agents",
00606                               augmentationLoop.numAugmenting(),
00607                               inFlight.size(),
00608                               agents.size()));
00609 
00610             dutyCycleCurrent.ending = Date::now();
00611             dutyCycleHistory.push_back(dutyCycleCurrent);
00612             dutyCycleCurrent.clear();
00613 
00614             if (dutyCycleHistory.size() > 200)
00615                 dutyCycleHistory.erase(dutyCycleHistory.begin(),
00616                                        dutyCycleHistory.end() - 100);
00617 
00618             checkDeadAgents();
00619 
00620             double total = 0.0;
00621             for (auto it = times.begin(); it != times.end();  ++it)
00622                 total += it->second.time;
00623 
00624             cerr << "total of " << total << " microseconds and "
00625                  << totalSleeps << " sleeps" << endl;
00626 
00627             for (auto it = times.begin(); it != times.end();  ++it) {
00628                 cerr << ML::format("%-30s %8lld %10.0f %6.2f%% %8.2fus/call\n",
00629                                    it->first.c_str(),
00630                                    (unsigned long long)it->second.count,
00631                                    it->second.time,
00632                                    100.0 * it->second.time / total,
00633                                    it->second.time / it->second.count);
00634 
00635                 recordEvent(("routerLoop." + it->first).c_str(), ET_LEVEL,
00636                         1.0 * it->second.time / (now - last_check) / 1000000.0);
00637 
00638             }
00639 
00640             times.clear();
00641             totalSleeps = 0;
00642 
00643             last_check = now;
00644         }
00645 
00646         times["checks"].add(microsecondsBetween(getTime(), beforeChecks));
00647 
00648         if (now - lastTimestamp >= 1.0) {
00649             banker->logBidEvents(*this);
00650             //issueTimestamp();
00651             lastTimestamp = now;
00652         }
00653     }
00654 
00655     //cerr << "finished run loop" << endl;
00656 
00657     recordHit("routerDown");
00658 
00659     //cerr << "server shutdown" << endl;
00660 }
00661 
00662 void
00663 Router::
00664 shutdown()
00665 {
00666     configListener.shutdown();
00667 
00668     shutdown_ = true;
00669     futex_wake(shutdown_);
00670     wakeupMainLoop.signal();
00671 
00672     augmentationLoop.shutdown();
00673 
00674     if (runThread)
00675         runThread->join();
00676     runThread.reset();
00677     if (cleanupThread)
00678         cleanupThread->join();
00679     cleanupThread.reset();
00680 
00681     logger.shutdown();
00682     banker.reset();
00683 
00684     monitorClient.shutdown();
00685     monitorProviderClient.shutdown();
00686 }
00687 
00688 void
00689 Router::
00690 injectAuction(std::shared_ptr<Auction> auction, double lossTime)
00691 {
00692     // cerr << "injectAuction was called!!!" << endl;
00693     if (!auction->handleAuction) {
00694         // Modify the auction to insert our auction done handling
00695         auction->handleAuction
00696             = [=] (std::shared_ptr<Auction> auction)
00697             {
00698                 this->onAuctionDone(auction);
00699             };
00700     }
00701 
00702     auction->lossAssumed = getCurrentTime().plusSeconds(lossTime);
00703     onNewAuction(auction);
00704 }
00705 
00706 inline std::string chomp(const std::string & s)
00707 {
00708     const char * start = s.c_str();
00709     const char * end = start + s.length();
00710 
00711     while (end > start && end[-1] == '\n') --end;
00712 
00713     if (end == start + s.length()) return s;
00714     return string(start, end);
00715 }
00716 
00717 std::shared_ptr<Auction>
00718 Router::
00719 injectAuction(Auction::HandleAuction onAuctionFinished,
00720               std::shared_ptr<BidRequest> request,
00721               const std::string & requestStr,
00722               const std::string & requestStrFormat,
00723               double startTime,
00724               double expiryTime,
00725               double lossTime)
00726 {
00727     std::shared_ptr<Auction> auction
00728         (new Auction(nullptr,
00729                      onAuctionFinished,
00730                      request,
00731                      chomp(requestStr),
00732                      requestStrFormat,
00733                      Date::fromSecondsSinceEpoch(startTime),
00734                      Date::fromSecondsSinceEpoch(expiryTime)));
00735 
00736     injectAuction(auction, lossTime);
00737 
00738     return auction;
00739 }
00740 
00741 void
00742 Router::
00743 notifyFinishedAuction(const Id & auctionId)
00744 {
00745     throw ML::Exception("notifyFinishedAuction: not finished");
00746 }
00747 
00748 int
00749 Router::
00750 numAuctionsInProgress() const
00751 {
00752     return -1;//inFlight.size();
00753 }
00754 
00755 void
00756 Router::
00757 handleAgentMessage(const std::vector<std::string> & message)
00758 {
00759     try {
00760         using namespace std;
00761         //cerr << "got agent message " << message << endl;
00762 
00763         if (message.size() < 2) {
00764             returnErrorResponse(message, "not enough message parts");
00765             return;
00766         }
00767 
00768         const string & address = message[0];
00769         const string & request = message[1];
00770 
00771         if (request.empty())
00772             returnErrorResponse(message, "null request field");
00773 
00774         if (request == "CONFIG") {
00775             string configName = message.at(2);
00776             if (!agents.count(configName)) {
00777                 // We don't yet know about its configuration
00778                 sendAgentMessage(address, "NEEDCONFIG", getCurrentTime());
00779                 return;
00780             }
00781             agents[configName].address = address;
00782             return;
00783         }
00784 
00785         if (!agents.count(address)) {
00786             cerr << "doing NEEDCONFIG for " << address << endl;
00787             return;
00788         }
00789 
00790         AgentInfo & info = agents[address];
00791         info.gotHeartbeat(Date::now());
00792 
00793         if (!info.configured) {
00794             throw ML::Exception("message to unconfigured agent");
00795         }
00796 
00797         if (request[0] == 'B' && request == "BID") {
00798             doBid(message);
00799             return;
00800         }
00801 
00802         //cerr << "router got message " << message << endl;
00803 
00804         if (request[0] == 'P' && request == "PONG0") {
00805             doPong(0, message);
00806             return;
00807         }
00808         else if (request[0] == 'P' && request == "PONG1") {
00809             doPong(1, message);
00810             return;
00811         }
00812 
00813         returnErrorResponse(message, "unknown agent request");
00814     } catch (const std::exception & exc) {
00815         returnErrorResponse(message,
00816                             "threw exception: " + string(exc.what()));
00817     }
00818 }
00819 
00820 void
00821 Router::
00822 checkDeadAgents()
00823 {
00824     //Date start = Date::now();
00825 
00826     using namespace std;
00827     //cerr << "checking for dead agents" << endl;
00828 
00829     std::vector<Agents::iterator> deadAgents;
00830 
00831     for (auto it = agents.begin(), end = agents.end();  it != end;
00832          ++it) {
00833         auto & info = it->second;
00834 
00835         const std::string & account = info.config->account.toString('.');
00836 
00837         Date now = Date::now();
00838         double oldest = 0.0;
00839         double total = 0.0;
00840 
00841         vector<Id> toExpire;
00842 
00843         // Check for in flight timeouts.  This shouldn't happen, but there
00844         // appears to be a way in which we lose track of an inflight auction
00845         auto onInFlight = [&] (const Id & id, const Date & date)
00846             {
00847                 double secondsSince = now.secondsSince(date);
00848 
00849                 oldest = std::max(oldest, secondsSince);
00850                 total += secondsSince;
00851 
00852                 if (secondsSince > 30.0) {
00853 
00854                     this->recordHit("accounts.%s.lostBids", account);
00855 
00856                     this->sendBidResponse(it->first,
00857                                           info,
00858                                           BS_LOSTBID,
00859                                           this->getCurrentTime(),
00860                                           "guaranteed", id);
00861 
00862                     toExpire.push_back(id);
00863                 }
00864             };
00865 
00866         info.forEachInFlight(onInFlight);
00867 
00868         this->recordLevel(info.numBidsInFlight(),
00869                           "accounts.%s.inFlight.numInFlight", account);
00870         this->recordLevel(oldest,
00871                           "accounts.%s.inFlight.oldestAgeSeconds", account);
00872         double averageAge = 0.0;
00873         if (info.numBidsInFlight() != 0)
00874             averageAge = total / info.numBidsInFlight();
00875 
00876         this->recordLevel(averageAge,
00877                           "accounts.%s.inFlight.averageAgeSeconds", account);
00878 
00879         for (auto jt = toExpire.begin(), jend = toExpire.end();  jt != jend;
00880              ++jt) {
00881             info.expireBidInFlight(*jt);
00882         }
00883 
00884         double timeSinceHeartbeat
00885             = now.secondsSince(info.status->lastHeartbeat);
00886 
00887         this->recordLevel(timeSinceHeartbeat,
00888                           "accounts.%s.timeSinceHeartbeat", account);
00889 
00890         if (timeSinceHeartbeat > 5.0) {
00891             info.status->dead = true;
00892             if (it->second.numBidsInFlight() != 0) {
00893                 cerr << "agent " << it->first
00894                      << " has " << it->second.numBidsInFlight()
00895                      << " undead auctions: " << endl;
00896 
00897                 auto onInFlight = [&] (const Id & id, Date date)
00898                     {
00899                         cerr << "  " << id << " --> "
00900                         << date << " (" << now.secondsSince(date)
00901                         << "s ago)" << endl;
00902                     };
00903 
00904                 info.forEachInFlight(onInFlight);
00905             }
00906             else {
00907                 // agent is dead
00908                 cerr << "agent " << it->first << " appears to be dead"
00909                      << endl;
00910                 sendAgentMessage(it->first, "BYEBYE", getCurrentTime());
00911                 deadAgents.push_back(it);
00912             }
00913         }
00914     }
00915 
00916     for (auto it = deadAgents.begin(), end = deadAgents.end();
00917          it != end;  ++it) {
00918         cerr << "WARNING: dead agent doesn't clean up its state properly"
00919              << endl;
00920         // TODO: undo all bids in progress
00921         agents.erase(*it);
00922     }
00923 
00924     if (!deadAgents.empty())
00925         // Broadcast that we have different agents
00926         updateAllAgents();
00927 
00928     //cerr << "dead agents took " << Date::now().secondsSince(start) << "s"
00929     //     << endl;
00930 }
00931 
00932 void
00933 Router::
00934 checkExpiredAuctions()
00935 {
00936     //recentlySubmitted.clear();
00937 
00938     Date start = Date::now();
00939 
00940     {
00941         RouterProfiler profiler(dutyCycleCurrent.nsExpireInFlight);
00942 
00943         // Look for in flight timeout expiries
00944         auto onExpiredInFlight = [&] (const Id & auctionId,
00945                                       const AuctionInfo & auctionInfo)
00946             {
00947                 this->debugAuction(auctionId, "EXPIRED", {});
00948 
00949                 // Tell any remaining bidders that it's too late...
00950                 for (auto it = auctionInfo.bidders.begin(),
00951                          end = auctionInfo.bidders.end();
00952                      it != end;  ++it) {
00953                     string agent = it->first;
00954                     if (!agents.count(agent)) continue;
00955 
00956                     if (agents[agent].expireBidInFlight(auctionId)) {
00957                         AgentInfo & info = this->agents[agent];
00958                         ++info.stats->tooLate;
00959 
00960                         this->recordHit("accounts.%s.droppedBids",
00961                                         info.config->account.toString('.'));
00962 
00963                         this->sendBidResponse(agent,
00964                                               info,
00965                                               BS_DROPPEDBID,
00966                                               this->getCurrentTime(),
00967                                               "guaranteed",
00968                                               auctionId,
00969                                               0, Amount(),
00970                                               auctionInfo.auction.get());
00971                     }
00972                 }
00973 
00974 #if 0
00975                 string msg = ML::format("in flight auction expiry: id %s "
00976                                         "status %s, %zd bidders:",
00977                                         auctionId.toString().c_str(),
00978                                         auctionInfo.auction->status().c_str(),
00979                                         auctionInfo.bidders.size());
00980                 for (auto it = auctionInfo.bidders.begin(),
00981                          end = auctionInfo.bidders.end();
00982                      it != end;  ++it)
00983                     msg += ' ' + it->first + "->" + it->second.bidTime.print(5);
00984                 cerr << Date::now().print(5) << " " << msg << endl;
00985                 dumpAuction(auctionId);
00986                 this->logRouterError("checkExpiredAuctions.inFlight",
00987                                      msg);
00988 
00989 #endif
00990 
00991                 // end the auction when it expires in case we're waiting on dead agents
00992         if(!auctionInfo.auction->getResponses().empty()) {
00993                     if(!auctionInfo.auction->finish()) {
00994                 this->recordHit("tooLateToFinish");
00995             }
00996         }
00997 
00998                 return Date();
00999             };
01000 
01001         inFlight.expire(onExpiredInFlight, start);
01002     }
01003 
01004     {
01005         RouterProfiler profiler(dutyCycleCurrent.nsExpireBlacklist);
01006         blacklist.doExpiries();
01007     }
01008 
01009     if (doDebug) {
01010         RouterProfiler profiler(dutyCycleCurrent.nsExpireDebug);
01011         expireDebugInfo();
01012     }
01013 }
01014 
01015 void
01016 Router::
01017 returnErrorResponse(const std::vector<std::string> & message,
01018                     const std::string & error)
01019 {
01020     using namespace std;
01021     if (message.empty()) return;
01022     logMessage("ERROR", error, message);
01023     sendAgentMessage(message[0], "ERROR", getCurrentTime(), error, message);
01024 }
01025 
01026 void
01027 Router::
01028 doStats(const std::vector<std::string> & message)
01029 {
01030     Json::Value result(Json::objectValue);
01031 
01032     result["numAugmenting"] = augmentationLoop.numAugmenting();
01033     result["numInFlight"] = inFlight.size();
01034     result["blacklistUsers"] = blacklist.size();
01035 
01036     result["numAgents"] = agents.size();
01037 
01038     //result["accounts"] = banker->dumpAllCampaignsJson();
01039 
01040     Json::Value agentsVal(Json::objectValue);
01041 
01042     int totalAgentInFlight = 0;
01043 
01044     BOOST_FOREACH(auto agent, agents) {
01045         agentsVal[agent.first] = agent.second.toJson(false, false);
01046         totalAgentInFlight += agent.second.numBidsInFlight();
01047     }
01048 
01049     result["agents"] = agentsVal;
01050 
01051     result["totalAgentInFlight"] = totalAgentInFlight;
01052 
01053     if (dutyCycleHistory.empty())
01054         result["dutyCycle"] = dutyCycleCurrent.toJson();
01055     else result["dutyCycle"] = dutyCycleHistory.back().toJson();
01056 
01057     result["fileDescriptorCount"] = ML::num_open_files();
01058 
01059     addChildServiceStatus(result);
01060 
01061     result["numAuctions"] = numAuctions;
01062     result["numBids"] = numBids;
01063     result["numNonEmptyBids"] = numNonEmptyBids;
01064     result["numAuctionsWithBid"] = numAuctionsWithBid;
01065     result["numNoBidders"] = numNoBidders;
01066     result["numNoPotentialBidders"] = numNoPotentialBidders;
01067 
01068     //sendMessage(controlEndpoint, message[0], result);
01069 }
01070 
01071 
01072 Json::Value
01073 Router::
01074 getServiceStatus() const
01075 {
01076     return getStats();
01077 }
01078 
01079 void
01080 Router::
01081 augmentAuction(const std::shared_ptr<AugmentationInfo> & info)
01082 {
01083     if (!info || !info->auction)
01084         throw ML::Exception("augmentAuction with no auction to augment");
01085 
01086     if (info->auction->tooLate()) {
01087         recordHit("tooLateBeforeAdd");
01088         return;
01089     }
01090 
01091     double augmentationWindow = 0.005; // 5ms available to augment
01092 
01093     auto onDoneAugmenting = [=] (const std::shared_ptr<AugmentationInfo> & info)
01094         {
01095             info->auction->doneAugmenting = Date::now();
01096 
01097             if (info->auction->tooLate()) {
01098                 this->recordHit("tooLateAfterAugmenting");
01099                 return;
01100             }
01101 
01102             // Send it off to be farmed out to the bidders
01103             startBiddingBuffer.push(info);
01104             wakeupMainLoop.signal();
01105         };
01106 
01107     augmentationLoop.augment(info, Date::now().plusSeconds(augmentationWindow),
01108                              onDoneAugmenting);
01109 }
01110 
01111 std::shared_ptr<AugmentationInfo>
01112 Router::
01113 preprocessAuction(const std::shared_ptr<Auction> & auction)
01114 {
01115     ML::atomic_inc(numAuctions);
01116 
01117     Date now = Date::now();
01118     auction->inPrepro = now;
01119 
01120     if (auction->lossAssumed == Date())
01121         auction->lossAssumed
01122             = Date::now().plusSeconds(secondsUntilLossAssumed_);
01123     Date lossTimeout = auction->lossAssumed;
01124 
01125     //cerr << "AUCTION " << auction->id << " " << auction->requestStr << endl;
01126 
01127     //cerr << "url = " << auction->request->url << endl;
01128 
01129     if (auction->tooLate()) {
01130         recordHit("tooLateBeforeRouting");
01131         //inFlight.erase(auctionId);
01132         return std::shared_ptr<AugmentationInfo>();
01133     }
01134 
01135     const string & exchange = auction->request->exchange;
01136 
01137     /* Parse out the adimp. */
01138     const vector<AdSpot> & imp = auction->request->imp;
01139 
01140     recordCount(imp.size(), "exchange.%s.imp", exchange.c_str());
01141     recordHit("exchange.%s.requests", exchange.c_str());
01142 
01143     // List of possible agents per round robin group
01144     std::map<string, GroupPotentialBidders> groupAgents;
01145 
01146     double timeLeftMs = auction->timeAvailable() * 1000.0;
01147 
01148     bool traceAuction = auction->id.hash() % 10 == 0;
01149 
01150     AgentConfig::RequestFilterCache cache(*auction->request);
01151 
01152     auto exchangeConnector = auction->exchangeConnector;
01153 
01154     auto checkAgent = [&] (const AgentInfoEntry & entry)
01155         {
01156             const AgentConfig & config = *entry.config;
01157             AgentStats & stats = *entry.stats;
01158             const string & agentName = entry.name;
01159 
01160             auto doFilterStat = [&] (const char * reason)
01161             {
01162                 if (!traceAuction) return;
01163 
01164                 this->recordHit("accounts.%s.filter.%s",
01165                                 config.account.toString('.'),
01166                                 reason);
01167             };
01168 
01169             ML::atomic_inc(stats.intoFilters);
01170             doFilterStat("intoStaticFilters");
01171 
01172             ExcAssert(entry.status);
01173 
01174             if (entry.status->lastHeartbeat.secondsSince(now) > 2.0
01175                 || entry.status->dead) {
01176                 doFilterStat("static.003_agentAppearsDead");
01177                 return;
01178             }
01179 
01180             if (entry.status->numBidsInFlight >= config.maxInFlight) {
01181                 doFilterStat("static.004_earlyTooManyInFlight");
01182                 return;
01183             }
01184 
01185             /* Check if we have enough time to process it. */
01186             if (config.minTimeAvailableMs != 0.0
01187                 && timeLeftMs < config.minTimeAvailableMs)
01188                 {
01189                     ML::atomic_inc(stats.notEnoughTime);
01190                     doFilterStat("static.005_notEnoughTime");
01191                     return;
01192                 }
01193 
01194             BiddableSpots biddableSpots
01195                 = config.isBiddableRequest(exchangeConnector,
01196                                            *auction->request, stats,
01197                                            cache, doFilterStat);
01198             if (biddableSpots.empty())
01199                 return;
01200 
01201             ML::atomic_inc(stats.passedStaticFilters);
01202             doFilterStat("passedStaticFilters");
01203 
01204             string rrGroup = config.roundRobinGroup;
01205             if (rrGroup == "") rrGroup = agentName;
01206 
01207             PotentialBidder bidder;
01208             bidder.agent = agentName;
01209             bidder.imp = biddableSpots;
01210             bidder.config = entry.config;
01211             bidder.stats = entry.stats;
01212 
01213             groupAgents[rrGroup].push_back(bidder);
01214             groupAgents[rrGroup].totalBidProbability
01215                 += config.bidProbability;
01216         };
01217 
01218     forEachAgent(checkAgent);
01219 
01220     std::vector<GroupPotentialBidders> validGroups;
01221 
01222     for (auto it = groupAgents.begin(), end = groupAgents.end();
01223          it != end;  ++it) {
01224         // Check for bid probability and skip if we don't bid
01225         double bidProbability
01226             = it->second.totalBidProbability
01227             / it->second.size()
01228             * globalBidProbability;
01229 
01230         if (bidProbability < 1.0) {
01231             float val = (random() % 1000000) / 1000000.0;
01232             if (val > bidProbability) {
01233                 for (unsigned i = 0;  i < it->second.size();  ++i)
01234                     ML::atomic_inc(it->second[i].stats->skippedBidProbability);
01235                 continue;
01236             }
01237         }
01238 
01239         // Group is valid for bidding; next step is to augment the bid
01240         // request
01241         validGroups.push_back(it->second);
01242     }
01243 
01244     if (validGroups.empty()) {
01245         // Now we need to end the auction
01246         //inFlight.erase(auctionId);
01247         if (!auction->finish()) {
01248             recordHit("tooLateToFinish");
01249         }
01250 
01251         //cerr << "no valid groups " << endl;
01252         return std::shared_ptr<AugmentationInfo>();
01253     }
01254 
01255     auto info = std::make_shared<AugmentationInfo>(auction, lossTimeout);
01256     info->potentialGroups.swap(validGroups);
01257 
01258     auction->outOfPrepro = Date::now();
01259 
01260     recordOutcome(auction->outOfPrepro.secondsSince(auction->inPrepro) * 1000.0,
01261                   "preprocessAuctionTimeMs");
01262 
01263     return info;
01264 }
01265 
01266 void
01267 Router::
01268 doStartBidding(const std::vector<std::string> & message)
01269 {
01270     std::shared_ptr<AugmentationInfo> augInfo
01271         = sharedPtrFromMessage<AugmentationInfo>(message.at(2));
01272     doStartBidding(augInfo);
01273 }
01274 
01275 void
01276 Router::
01277 doStartBidding(const std::shared_ptr<AugmentationInfo> & augInfo)
01278 {
01279     //static const char *fName = "Router::doStartBidding:";
01280     RouterProfiler profiler(dutyCycleCurrent.nsStartBidding);
01281 
01282     try {
01283         Id auctionId = augInfo->auction->id;
01284 
01285         if (augmentationLoop.currentlyAugmenting(auctionId)) {
01286             throwException("doStartBidding.alreadyAugmenting",
01287                            "auction with ID %s already preprocessing",
01288                            auctionId.toString().c_str());
01289         }
01290         if (inFlight.count(auctionId)) {
01291             throwException("doStartBidding.alreadyInFlight",
01292                            "auction with ID %s already in progress",
01293                            auctionId.toString().c_str());
01294         }
01295 #if 0
01296         if (findAuction(finished, auctionId)) {
01297             throwException("doStartBidding.alreadyFinished",
01298                            "auction with ID %s already finished",
01299                            auctionId.toString().c_str());
01300         }
01301 #endif
01302 
01303         //cerr << "doStartBidding " << auctionId << endl;
01304 
01305         auto groupAgents = augInfo->potentialGroups;
01306 
01307         AuctionInfo & auctionInfo = addAuction(augInfo->auction,
01308                                                augInfo->lossTimeout);
01309         auto auction = augInfo->auction;
01310 
01311         Date now = Date::now();
01312 
01313         auction->inStartBidding = now;
01314 
01315         double timeLeftMs = auction->timeAvailable(now) * 1000.0;
01316         double timeUsedMs = auction->timeUsed(now) * 1000.0;
01317 
01318         bool traceAuction = auction->id.hash() % 10 == 0;
01319 
01320         const AugmentationList& augList = augInfo->auction->augmentations;
01321 
01322         /* For each round-robin group, send the request off to exactly one
01323            element. */
01324         for (auto it = groupAgents.begin(), end = groupAgents.end();
01325              it != end;  ++it) {
01326 
01327             GroupPotentialBidders & bidders = *it;
01328 
01329             for (unsigned i = 0;  i < bidders.size();  ++i) {
01330                 PotentialBidder & bidder = bidders[i];
01331                 if (!agents.count(bidder.agent)) continue;
01332                 AgentInfo & info = agents[bidder.agent];
01333                 const AgentConfig & config = *bidder.config;
01334 
01335                 auto doFilterStat = [&] (const char * reason)
01336                     {
01337                         if (!traceAuction) return;
01338 
01339                         this->recordHit("accounts.%s.filter.%s",
01340                                         config.account.toString('.'),
01341                                         reason);
01342                     };
01343 
01344                 auto doFilterMetric = [&] (const char * reason, float val)
01345                     {
01346                         if (!traceAuction) return;
01347 
01348                         this->recordOutcome(val, "accounts.%s.filter.%s",
01349                                             config.account.toString('.'),
01350                                             reason);
01351                     };
01352 
01353 
01354                 doFilterStat("intoDynamicFilters");
01355 
01356                 /* Check if we have too many in flight. */
01357                 if (info.numBidsInFlight() >= info.config->maxInFlight) {
01358                     ++info.stats->tooManyInFlight;
01359                     bidder.inFlightProp = PotentialBidder::NULL_PROP;
01360                     doFilterStat("dynamic.tooManyInFlight");
01361                     continue;
01362                 }
01363 
01364                 /* Check if we have enough time to process it. */
01365                 if (config.minTimeAvailableMs != 0.0
01366                     && timeLeftMs < config.minTimeAvailableMs) {
01367 
01368                     static ML::Spinlock lock;
01369 
01370                     if (auction->id.hash() % 1000 == 999 &&
01371                         lock.try_lock()) {
01372 
01373                         Date now = Date::now();
01374                         Date last = auction->start;
01375                         auto printTime
01376                             = [&] (const char * what, const Date & date)
01377                             {
01378                                 cerr << ML::format("%-30s %s %10.3f %10.3f\n",
01379                                                    what,
01380                                                    date.print(6).c_str(),
01381                                                    auction->start.secondsSince(date)
01382                                                    * 1000.0,
01383                                                    last.secondsSince(date)
01384                                                    * 1000.0);
01385                                 last = date;
01386                             };
01387 
01388                         cerr << "no time available in dynamic" << endl;
01389                         printTime("start", auction->start);
01390                         printTime("doneParsing", auction->doneParsing);
01391                         printTime("inPrepro", auction->inPrepro);
01392                         printTime("outOfPrepro", auction->outOfPrepro);
01393                         printTime("doneAugmenting", auction->doneAugmenting);
01394                         printTime("inStartBidding", auction->inStartBidding);
01395                         printTime("expiry", auction->expiry);
01396                         printTime("now", now);
01397 
01398                         lock.unlock();
01399                     }
01400 
01401                     ML::atomic_inc(info.stats->notEnoughTime);
01402                     bidder.inFlightProp = PotentialBidder::NULL_PROP;
01403                     doFilterStat("dynamic.notEnoughTime");
01404                     doFilterMetric("metric.timeUsedBeforeDynamicFilter",
01405                                    timeUsedMs);
01406                     doFilterMetric("metric.timeLeftBeforeDynamicFilter",
01407                                    timeLeftMs);
01408                     doFilterMetric("metric.timeElapsedBeforePreproMs",
01409                                    auction->start.secondsUntil(auction->inPrepro) * 1000.0);
01410                     doFilterMetric("metric.timeElapsedDuringPreproMs",
01411                                    auction->inPrepro.secondsUntil(auction->outOfPrepro) * 1000.0);
01412                     doFilterMetric("metric.timeWindowMs",
01413                                    auction->expiry.secondsSince(auction->start) * 1000.0 - info.config->minTimeAvailableMs);
01414                     continue;
01415                 }
01416 
01417                 /* Filter on the augmentation tags */
01418                 vector<string> tags = augList.tagsForAccount(config.account);
01419                 if (!config.augmentationFilter.anyIsIncluded(tags)) {
01420                     ML::atomic_inc(info.stats->augmentationTagsExcluded);
01421                     doFilterStat("dynamic.augmentationTagsFiltered");
01422                     continue;
01423                 }
01424 
01425 
01426                 /* Check that there is no blacklist hit on the user. */
01427                 if (config.hasBlacklist()
01428                     && blacklist.matches(*auction->request, bidder.agent,
01429                                          config)) {
01430                     ML::atomic_inc(info.stats->userBlacklisted);
01431                     doFilterStat("dynamic.userBlacklisted");
01432                     continue;
01433                 }
01434 
01435                 bidder.inFlightProp
01436                     = info.numBidsInFlight() / max(info.config->maxInFlight, 1);
01437 
01438                 ML::atomic_inc(info.stats->passedDynamicFilters);
01439                 doFilterStat("passedDynamicFilters");
01440             }
01441 
01442             // Sort the roundrobin infos to find the best one
01443             std::sort(bidders.begin(), bidders.end());
01444 
01445             int numBest = 1;
01446             float bestInFlightProp = bidders[0].inFlightProp;
01447 
01448             if (bestInFlightProp == PotentialBidder::NULL_PROP) {
01449                 // Excluded because too many in flight
01450                 //cerr << "TOO MANY IN FLIGHT" << endl;
01451                 continue;
01452             }
01453 
01454             for (;  numBest < bidders.size();  ++numBest) {
01455                 float inFlightProp = bidders[numBest].inFlightProp;
01456                 if (inFlightProp <= bestInFlightProp) continue;
01457                 break;
01458             }
01459 
01460             // Take a random one from all which are equally good
01461             int best = random() % numBest;
01462 
01463             // Best one is the first one
01464             PotentialBidder & winner = bidders[best];
01465             string agent = winner.agent;
01466 
01467             if (!agents.count(agent)) {
01468                 //cerr << "!!!AGENT IS GONE" << endl;
01469                 continue;  // agent is gone
01470             }
01471             AgentInfo & info = agents[agent];
01472 
01473             ++info.stats->auctions;
01474 
01475             Augmentation aug
01476                 = auction->augmentations.filterForAccount(winner.config->account);
01477             auction->agentAugmentations[agent] = chomp(aug.toJson().toString());
01478 
01479             //auctionInfo.activities.push_back("sent to " + agent);
01480 
01481             BidInfo bidInfo;
01482             bidInfo.agentConfig = winner.config;
01483             bidInfo.bidTime = Date::now();
01484             bidInfo.imp = winner.imp;
01485 
01486             auctionInfo.bidders.insert(make_pair(agent, std::move(bidInfo)));  // create empty bid response
01487             if (!info.trackBidInFlight(auctionId, bidInfo.bidTime))
01488                 throwException("doStartBidding.agentAlreadyBidding",
01489                                "agent %s is already processing auction %s",
01490                                agent.c_str(),
01491                                auctionId.toString().c_str());
01492 
01493             //cerr << "sending to agent " << agent << endl;
01494             //cerr << fName << " sending AUCTION message " << endl;c
01495             /* Convert to JSON to send it on. */
01496             sendAgentMessage(agent,
01497                              "AUCTION",
01498                              auction->start,
01499                              auctionId,
01500                              info.getBidRequestEncoding(*auction),
01501                              info.encodeBidRequest(*auction),
01502                              winner.imp.toJsonStr(),
01503                              toString(timeLeftMs),
01504                              auction->agentAugmentations[agent]);
01505 
01506             //cerr << "done" << endl;
01507         }
01508 
01509         //cerr << " auction " << id << " with "
01510         //     << auctionInfo.bidders.size() << " bidders" << endl;
01511 
01512         //auctionInfo.activities.push_back(ML::format("total of %zd agents",
01513         //                                 auctionInfo.bidders.size()));
01514         if (auction->tooLate()) {
01515             recordHit("tooLateAfterRouting");
01516             // Unwind everything?
01517         }
01518 
01519         if (auctionInfo.bidders.empty()) {
01520             /* No bidders; don't bother with the bid */
01521             ML::atomic_inc(numNoBidders);
01522             inFlight.erase(auctionId);
01523             //cerr << fName << "About to call finish " << endl;
01524             if (!auction->finish()) {
01525                 recordHit("tooLateToFinish");
01526                 //cerr << "couldn't finish auction 1 " << auction->id << endl;
01527             }
01528         }
01529 
01530         debugAuction(auctionId, "AUCTION");
01531     } catch (const std::exception & exc) {
01532         cerr << "warning: auction threw exception: " << exc.what() << endl;
01533         if (augInfo)
01534             augInfo->auction->setError("auction processing error", exc.what());
01535     }
01536 }
01537 
01538 AuctionInfo &
01539 Router::
01540 addAuction(std::shared_ptr<Auction> auction, Date lossTimeout)
01541 {
01542     const Id & id = auction->id;
01543 
01544     double bidMemoryWindow = 5.0;  // how many seconds we remember auctions
01545 
01546     try {
01547         AuctionInfo & result
01548             = inFlight.insert(id, AuctionInfo(auction, lossTimeout),
01549                               getCurrentTime().plusSeconds(bidMemoryWindow));
01550         return result;
01551     } catch (const std::exception & exc) {
01552         //cerr << "====================================" << endl;
01553         //cerr << exc.what() << endl;
01554         throwException("addAuction.alreadyInProgress",
01555                        "auction with ID %s already in progress: %s",
01556                        id.toString().c_str(), exc.what());
01557     }
01558 }
01559 
01560 
01561 static bool failBid(double proportion)
01562 {
01563     if (proportion < 0.01)
01564         return false;
01565 
01566     return (random() % 100) < floor(proportion * 100.0);
01567 }
01568 
01569 void
01570 Router::
01571 doBid(const std::vector<std::string> & message)
01572 {
01573     //static const char *fName = "Router::doBid:";
01574     if (failBid(bidsErrorRate)) {
01575         returnErrorResponse(message, "Intentional error response (--bids-error-rate)");
01576         return;
01577     }
01578 
01579     Date dateGotBid = Date::now();
01580 
01581     RouterProfiler profiler(dutyCycleCurrent.nsBid);
01582 
01583     ML::atomic_inc(numBids);
01584 
01585     if (message.size() < 4 || message.size() > 5) {
01586         returnErrorResponse(message, "BID message has 3-4 parts");
01587         return;
01588     }
01589 
01590     static std::map<const char *, unsigned long long> times;
01591 
01592     static Date lastPrinted = Date::now();
01593 
01594     if (lastPrinted.secondsUntil(dateGotBid) > 10.0) {
01595 #if 0
01596         unsigned long long total = 0;
01597         for (auto it = times.begin(), end = times.end(); it != end;  ++it)
01598             total += it->second;
01599 
01600         cerr << "doBid of " << total << " microseconds" << endl;
01601         cerr << "id = " << message[2] << endl;
01602         for (auto it = times.begin(), end = times.end();
01603              it != end;  ++it) {
01604             cerr << ML::format("%-30s %8lld %6.2f%%\n",
01605                                it->first,
01606                                it->second,
01607                                100.0 * it->second / total);
01608         }
01609 #endif
01610         lastPrinted = dateGotBid;
01611         times.clear();
01612     }
01613 
01614     double current = getProfilingTime();
01615 
01616     auto doProfileEvent = [&] (int i, const char * what)
01617         {
01618             return;
01619             double after = getProfilingTime();
01620             times[what] += microsecondsBetween(after, current);
01621             current = after;
01622         };
01623 
01624     recordHit("bid");
01625 
01626     doProfileEvent(0, "start");
01627 
01628     Id auctionId(message[2]);
01629 
01630     doProfileEvent(1, "idParam");
01631 
01632 
01633 
01634     const string & agent = message[0];
01635     const string & biddata = message[3];
01636     static const string nullStr("null");
01637     const string & meta = (message.size() >= 5 ? message[4] : nullStr);
01638 
01639     doProfileEvent(1, "params");
01640 
01641     debugAuction(auctionId, "BID", message);
01642 
01643     if (!agents.count(agent)) {
01644         returnErrorResponse(message, "unknown agent");
01645         return;
01646     }
01647 
01648     doProfileEvent(2, "agents");
01649 
01650     AgentInfo & info = agents[agent];
01651 
01652     /* One less in flight. */
01653     if (!info.expireBidInFlight(auctionId)) {
01654         recordHit("bidError.agentNotBidding");
01655         returnErrorResponse(message, "agent wasn't bidding on this auction");
01656         return;
01657     }
01658 
01659     doProfileEvent(3, "inFlight");
01660 
01661     auto it = inFlight.find(auctionId);
01662     if (it == inFlight.end()) {
01663         recordHit("bidError.unknownAuction");
01664         returnErrorResponse(message, "unknown auction");
01665         return;
01666     }
01667 
01668     doProfileEvent(4, "account");
01669 
01670     AuctionInfo & auctionInfo = it->second;
01671 
01672     auto biddersIt = auctionInfo.bidders.find(agent);
01673     if (biddersIt == auctionInfo.bidders.end()) {
01674         recordHit("bidError.agentSkippedAuction");
01675         returnErrorResponse(message,
01676                             "agent shouldn't bid on this auction");
01677         return;
01678     }
01679 
01680     auto & config = *biddersIt->second.agentConfig;
01681 
01682     recordHit("accounts.%s.bids", config.account.toString('.'));
01683 
01684     doProfileEvent(5, "auctionInfo");
01685 
01686     //cerr << "info.inFlight = " << info.inFlight << endl;
01687 
01688     const std::vector<AdSpot> & imp = auctionInfo.auction->request->imp;
01689 
01690     int numValidBids = 0;
01691 
01692     auto returnInvalidBid = [&] (int i, const char * reason,
01693                                  const char * message, ...)
01694         {
01695             this->recordHit("bidErrors.%s");
01696             this->recordHit("accounts.%s.bidErrors.total",
01697                             config.account.toString('.'));
01698             this->recordHit("accounts.%s.bidErrors.%s",
01699                             config.account.toString('.'),
01700                             reason);
01701 
01702             ++info.stats->invalid;
01703 
01704             va_list ap;
01705             va_start(ap, message);
01706             string formatted;
01707             try {
01708                 formatted = vformat(message, ap);
01709             } catch (...) {
01710                 va_end(ap);
01711                 throw;
01712             }
01713             va_end(ap);
01714 
01715             cerr << "invalid bid for agent " << agent << ": "
01716                  << formatted << endl;
01717             cerr << biddata << endl;
01718 
01719             this->sendBidResponse
01720                 (agent, info, BS_INVALID, this->getCurrentTime(),
01721                  formatted, auctionId,
01722                  i, Amount(),
01723                  auctionInfo.auction.get(),
01724                  biddata, Json::Value(),
01725                  auctionInfo.auction->agentAugmentations[agent]);
01726         };
01727 
01728     BidInfo bidInfo(std::move(biddersIt->second));
01729     auctionInfo.bidders.erase(biddersIt);
01730 
01731     doProfileEvent(6, "bidInfo");
01732 
01733     int numPassedBids = 0;
01734 
01735     Bids bids;
01736     try {
01737         bids = Bids::fromJson(biddata);
01738     }
01739     catch (const std::exception & exc) {
01740         returnInvalidBid(-1, "bidParseError",
01741                 "couldn't parse bid JSON %s: %s", biddata.c_str(), exc.what());
01742         return;
01743     }
01744 
01745     doProfileEvent(6, "parsing");
01746 
01747     ExcCheckEqual(bids.size(), bidInfo.imp.size(),
01748             "invalid shape for bids array");
01749 
01750     auctionInfo.auction->addDataSources(bids.dataSources);
01751 
01752     for (int i = 0; i < bids.size(); ++i) {
01753 
01754         const Bid& bid = bids[i];
01755 
01756         if (bid.isNullBid()) {
01757             ++numPassedBids;
01758             continue;
01759         }
01760 
01761         int spotIndex = bidInfo.imp[i].first;
01762 
01763         if (bid.creativeIndex == -1) {
01764             returnInvalidBid(i, "nullCreativeField",
01765                     "creative field is null in response %s",
01766                     biddata.c_str());
01767             continue;
01768         }
01769 
01770         if (bid.creativeIndex < 0
01771                 || bid.creativeIndex >= config.creatives.size())
01772         {
01773             returnInvalidBid(i, "outOfRangeCreative",
01774                     "parsing field 'creative' of %s: creative "
01775                     "number %d out of range 0-%zd",
01776                     biddata.c_str(), bid.creativeIndex,
01777                     config.creatives.size());
01778             continue;
01779         }
01780 
01781         if (bid.price.isNegative() || bid.price > USD_CPM(200)) {
01782             returnInvalidBid(i, "invalidPrice",
01783                     "bid price of %s is outside range of $0-$200 CPM"
01784                     "(%s) parsing bid %s",
01785                     bid.price.toString().c_str(),
01786                     USD_CPM(200).toString().c_str(),
01787                     biddata.c_str());
01788             continue;
01789         }
01790 
01791         const Creative & creative = config.creatives.at(bid.creativeIndex);
01792 
01793         if (!creative.compatible(imp[spotIndex])) {
01794 #if 1
01795             cerr << "creative not compatible with spot: " << endl;
01796             cerr << "auction: " << auctionInfo.auction->requestStr
01797                 << endl;
01798             cerr << "config: " << config.toJson() << endl;
01799             cerr << "bid: " << biddata << endl;
01800             cerr << "spot: " << imp[i].toJson() << endl;
01801             cerr << "spot num: " << spotIndex << endl;
01802             cerr << "bid num: " << i << endl;
01803             cerr << "creative num: " << bid.creativeIndex << endl;
01804             cerr << "creative: " << creative.toJson() << endl;
01805 #endif
01806             returnInvalidBid(i, "creativeNotCompatibleWithSpot",
01807                     "creative %s not compatible with spot %s",
01808                     creative.toJson().toString().c_str(),
01809                     imp[spotIndex].toJson().toString().c_str());
01810             continue;
01811         }
01812 
01813         if (!creative.biddable(auctionInfo.auction->request->exchange,
01814                         auctionInfo.auction->request->protocolVersion)) {
01815             returnInvalidBid(i, "creativeNotBiddableOnExchange",
01816                     "creative not biddable on exchange/version");
01817             continue;
01818         }
01819 
01820         doProfileEvent(6, "creativeCompatibility");
01821 
01822         string auctionKey
01823             = auctionId.toString() + "-"
01824             + imp[spotIndex].id.toString() + "-"
01825             + agent;
01826 
01827         if (!banker->authorizeBid(config.account, auctionKey, bid.price)
01828                 || failBid(budgetErrorRate))
01829         {
01830             ++info.stats->noBudget;
01831             const string& agentAugmentations =
01832                 auctionInfo.auction->agentAugmentations[agent];
01833 
01834             this->sendBidResponse(agent, info, BS_NOBUDGET,
01835                     this->getCurrentTime(),
01836                     "guaranteed", auctionId, 0, Amount(),
01837                     auctionInfo.auction.get(),
01838                     biddata, meta, agentAugmentations);
01839             this->logMessage("NOBUDGET", agent, auctionId,
01840                     biddata, meta);
01841             continue;
01842         }
01843 
01844         doProfileEvent(6, "banker");
01845 
01846         if (doDebug)
01847             this->debugSpot(auctionId, imp[spotIndex].id,
01848                     ML::format("BID %s %s %f",
01849                             auctionKey.c_str(),
01850                             bid.price.toString().c_str(),
01851                             (double)bid.priority));
01852 
01853         Auction::Price bidprice(bid.price, bid.priority);
01854         Auction::Response response(
01855                 bidprice,
01856                 creative.tagId,
01857                 config.account,
01858                 config.test,
01859                 agent,
01860                 biddata,
01861                 meta,
01862                 info.config,
01863                 config.visitChannels,
01864                 bid.creativeIndex);
01865 
01866         response.creativeName = creative.name;
01867         response.creativeId = creative.id;
01868 
01869         Auction::WinLoss localResult
01870             = auctionInfo.auction->setResponse(spotIndex, response);
01871 
01872         doProfileEvent(6, "bidSubmission");
01873         ++numValidBids;
01874 
01875         // Possible results:
01876         // PENDING: we're currently winning the local auction
01877         // LOSS: we lost the local auction
01878         // TOOLATE: we bid too late
01879         // INVALID: bid was invalid
01880 
01881         string msg = Auction::Response::print(localResult);
01882 
01883         if (doDebug)
01884             this->debugSpot(auctionId, imp[spotIndex].id,
01885                     ML::format("BID %s %s",
01886                             auctionKey.c_str(), msg.c_str()));
01887 
01888 
01889         switch (localResult) {
01890         case Auction::PENDING: {
01891             ++info.stats->bids;
01892             info.stats->totalBid += bid.price;
01893             break; // response will be sent later once local winning bid known
01894         }
01895         case Auction::LOSS:
01896             ++info.stats->bids;
01897             info.stats->totalBid += bid.price;
01898             // fall through
01899         case Auction::TOOLATE:
01900         case Auction::INVALID: {
01901             if (localResult == Auction::TOOLATE)
01902                 ++info.stats->tooLate;
01903             else if (localResult == Auction::INVALID)
01904                 ++info.stats->invalid;
01905 
01906             banker->cancelBid(config.account, auctionKey);
01907 
01908             BidStatus status;
01909             switch (localResult) {
01910             case Auction::LOSS:    status = BS_LOSS;     break;
01911             case Auction::TOOLATE: status = BS_TOOLATE;  break;
01912             case Auction::INVALID: status = BS_INVALID;  break;
01913             default:
01914                 throw ML::Exception("logic error");
01915             }
01916 
01917             const string& agentAugmentations =
01918                 auctionInfo.auction->agentAugmentations[agent];
01919 
01920             this->sendBidResponse(agent, info, status,
01921                     this->getCurrentTime(),
01922                     "guaranteed", auctionId, 0, Amount(),
01923                     auctionInfo.auction.get(),
01924                     biddata, meta, agentAugmentations);
01925             this->logMessage(msg, agent, auctionId, biddata, meta);
01926             continue;
01927         }
01928         case Auction::WIN:
01929             this->throwException("doBid.localWinsNotPossible",
01930                     "local wins can't be known until auction has closed");
01931 
01932         default:
01933             this->throwException("doBid.unknownBidResult",
01934                     "unknown bid result returned by auction");
01935         }
01936 
01937         doProfileEvent(6, "bidResponse");
01938     }
01939 
01940     if (numValidBids > 0) {
01941         //logMessage("BID", agent, auctionId, biddata, meta);
01942         ML::atomic_add(numNonEmptyBids, 1);
01943     }
01944     else if (numPassedBids > 0) {
01945         // Passed on the ... add to the blacklist
01946         if (config.hasBlacklist()) {
01947             const BidRequest & bidRequest = *auctionInfo.auction->request;
01948             blacklist.add(bidRequest, agent, *info.config);
01949         }
01950         doProfileEvent(8, "blacklist");
01951     }
01952 
01953     doProfileEvent(8, "postParsing");
01954 
01955     double bidTime = dateGotBid.secondsSince(bidInfo.bidTime);
01956 
01957     //cerr << "now " << auctionInfo.bidders.size() << " bidders" << endl;
01958 
01959     //cerr << "campaign " << info.config->campaign << " bidTime "
01960     //     << 1000.0 * bidTime << endl;
01961 
01962     recordOutcome(1000.0 * bidTime,
01963                   "accounts.%s.bidResponseTimeMs",
01964                   config.account.toString('.'));
01965 
01966     doProfileEvent(9, "postTiming");
01967 
01968     if (auctionInfo.bidders.empty()) {
01969         debugAuction(auctionId, "FINISH", message);
01970         if (!auctionInfo.auction->finish()) {
01971             debugAuction(auctionId, "FINISH TOO LATE", message);
01972         }
01973         inFlight.erase(auctionId);
01974         //cerr << "couldn't finish auction " << auctionInfo.auction->id
01975         //<< " after bid " << message << endl;
01976     }
01977 
01978     doProfileEvent(10, "finishAuction");
01979 
01980     // TODO: clean up if no bids were made?
01981 #if 0
01982     // Bids must be the same shape as the bid info or empty
01983     if (bidInfo.imp.size() != bids.size() && bids.size() != 0) {
01984         ++info.stats->bidErrors;
01985         returnInvalidBid(-1, "wrongBidResponseShape",
01986                          "number of imp in bid request doesn't match "
01987                          "those in bid: %d vs %d",
01988                          bidInfo.imp.size(), bids.size(),
01989                          bidInfo.imp.toJson().toString().c_str(),
01990                          biddata.c_str());
01991 
01992         if (auctionInfo.bidders.empty()) {
01993             auctionInfo.auction->finish();
01994             inFlight.erase(auctionId);
01995         }
01996     }
01997 #endif
01998 }
01999 
02000 void
02001 Router::
02002 doSubmitted(std::shared_ptr<Auction> auction)
02003 {
02004     // Auction was submitted
02005 
02006     // Either a) move it across to the win queue, or b) drop it if we
02007     // didn't bid anything
02008 
02009     RouterProfiler profiler(dutyCycleCurrent.nsSubmitted);
02010 
02011     const Id & auctionId = auction->id;
02012 
02013 #if 0 // debug
02014     if (recentlySubmitted.count(auctionId)) {
02015         cerr << "ERROR: auction" << auctionId << " was double submitted"
02016              << endl;
02017         return;
02018     }
02019     recentlySubmitted.insert(auctionId);
02020 #endif
02021 
02022     //cerr << "SUBMITTED " << auctionId << endl;
02023 
02024     const std::vector<std::vector<Auction::Response> > & allResponses
02025         = auction->getResponses();
02026 
02027     if (doDebug)
02028         debugAuction(auctionId, ML::format("SUBMITTED %d slots",
02029                                            (int)allResponses.size()),
02030                      {});
02031 
02032     //ExcAssertEqual(allResponses.size(),
02033     //               auction->bidRequest->imp.size());
02034     //cerr << "got a win for auction id " << auctionId << " with num imp:" << allResponses.size() << endl;
02035     // Go through the imp one by one
02036     for (unsigned spotNum = 0;  spotNum < allResponses.size();  ++spotNum) {
02037 
02038         bool hasSubmittedBid = false;
02039         Id spotId = auction->request->imp[spotNum].id;
02040 
02041         const std::vector<Auction::Response> & responses
02042             = allResponses[spotNum];
02043 
02044         if (doDebug)
02045             debugSpot(auctionId, spotId,
02046                       ML::format("has %zd bids", responses.size()));
02047 
02048         // For all but the winning bid we tell them what's going on
02049         for (unsigned i = 0;  i < responses.size();  ++i) {
02050 
02051             const Auction::Response & response = responses[i];
02052             Auction::WinLoss status = response.localStatus;
02053 
02054             //cerr << "got a response " << response.toJson() << endl;
02055             //cerr << "response.valid() = " << response.valid() << endl;
02056 
02057             // Don't deal with response 0
02058             if (i == 0 && response.valid() && response.localStatus == Auction::WIN) {
02059                 hasSubmittedBid = true;
02060                 continue;
02061             }
02062 
02063             //cerr << "doing response " << i << endl;
02064 
02065             if (!agents.count(response.agent)) continue;
02066 
02067             AgentInfo & info = agents[response.agent];
02068 
02069             Amount bid_price = response.price.maxPrice;
02070 
02071             string auctionKey
02072                 = auctionId.toString() + "-"
02073                 + spotId.toString() + "-"
02074                 + response.agent;
02075 
02076             // Make sure we account for the bid no matter what
02077             ML::Call_Guard guard
02078                 ([&] ()
02079                  {
02080                      banker->cancelBid(response.agentConfig->account, auctionKey);
02081                  });
02082 
02083             // No bid
02084             if (bid_price == 0 && response.price.priority == 0) {
02085                 cerr << "warning: auction had no bid result" << endl;
02086                 continue;
02087             }
02088 
02089             string msg;
02090             BidStatus bidStatus(BS_INVALID);
02091 
02092             switch (status) {
02093             case Auction::PENDING:
02094                 throwException("doSubmitted.shouldNotBePending",
02095                                "non-winning auction should not be pending");
02096             case Auction::WIN:
02097                 if(i == 0) break;
02098                 throwException("doSubmitted.shouldNotBeWin",
02099                                "auction should not be a win");
02100             case Auction::INVALID:
02101                 throwException("doSubmitted.shouldNotBeInvalid",
02102                                "auction should not be invalid");
02103             case Auction::LOSS:
02104                 bidStatus = BS_LOSS;
02105                 ++info.stats->losses;
02106                 msg = "LOSS";
02107                 break;
02108             case Auction::TOOLATE:
02109                 bidStatus = BS_TOOLATE;
02110                 ++info.stats->tooLate;
02111                 msg = "TOOLATE";
02112                 break;
02113             default:
02114                 throwException("doSubmitted.unknownStatus",
02115                                "unknown auction local status");
02116             };
02117 
02118             if (doDebug)
02119                 debugSpot(auctionId, spotId,
02120                           ML::format("%s %s",
02121                                      msg.c_str(),
02122                                      auctionKey.c_str()));
02123 
02124             string confidence = "guaranteed";
02125 
02126             //cerr << fName << "sending agent message of type " << msg << endl;
02127             sendBidResponse(response.agent, info, bidStatus,
02128                             this->getCurrentTime(),
02129                             confidence, auctionId,
02130                             0, Amount(),
02131                             auction.get(),
02132                             response.bidData,
02133                             response.meta,
02134                             auction->agentAugmentations[response.agent]);
02135         }
02136 
02137         // If we didn't actually submit a bid then nothing else to do
02138         if (!hasSubmittedBid) continue;
02139 
02140         ML::atomic_add(numAuctionsWithBid, 1);
02141         //cerr << fName << "injecting submitted auction " << endl;
02142 
02143         onSubmittedAuction(auction, spotId, responses[0]);
02144         //postAuctionLoop.injectSubmittedAuction(auction, spotId, responses[0]);
02145     }
02146 
02147     //cerr << "auction.use_count() = " << auction.use_count() << endl;
02148 
02149     if (auction.unique()) {
02150         auctionGraveyard.tryPush(auction);
02151     }
02152 }
02153 
02154 std::string
02155 reduceUrl(const Url & url)
02156 {
02157     static const boost::regex rex(".*://(www.)?([^/]+)");
02158     boost::match_results<string::const_iterator> mr;
02159     string s = url.toString();
02160     if (!boost::regex_search(s, mr, rex)) {
02161         //cerr << "warning: nothing matched in URL " << url << endl;
02162         return s;
02163     }
02164 
02165     if (mr.size() != 3) {
02166         cerr << "warning: wrong match results size "
02167              << mr.size() << " in URL " << url << endl;
02168         return s;
02169     }
02170 
02171     //cerr << "url " << url << " reduced to " << mr.str(2) << endl;
02172 
02173     return mr.str(2);
02174 }
02175 
02176 
02177 void
02178 Router::
02179 onNewAuction(std::shared_ptr<Auction> auction)
02180 {
02181     if (!monitorClient.getStatus()) {
02182         Date now = Date::now();
02183 
02184         if ((uint32_t) slowModeLastAuction.secondsSinceEpoch()
02185             < (uint32_t) now.secondsSinceEpoch()) {
02186             slowModeLastAuction = now;
02187             slowModeCount = 1;
02188             recordHit("monitor.systemInSlowMode");
02189         }
02190         else {
02191             slowModeCount++;
02192         }
02193 
02194         if (slowModeCount > 100) {
02195             /* we only let the first 100 auctions take place each second */
02196             recordHit("monitor.ignoredAuctions");
02197             auction->finish();
02198             return;
02199         }
02200     }
02201 
02202     //cerr << "AUCTION GOT THROUGH" << endl;
02203 
02204     //logMessage("AUCTION", auction->id, auction->requestStr);
02205     const BidRequest & request = *auction->request;
02206     int numFields = 0;
02207     if (!request.url.empty()) ++numFields;
02208     if (request.userIds.exchangeId) ++numFields;
02209     if (request.userIds.providerId) ++numFields;
02210 
02211     if (numFields > 1) {
02212         logMessageNoTimestamp("BEHAVIOUR",
02213                               ML::format("%.2f", request.timestamp),
02214                               request.exchange,
02215                               reduceUrl(request.url),
02216                               request.userIds.exchangeId,
02217                               request.userIds.providerId);
02218     }
02219     auto info = preprocessAuction(auction);
02220 
02221     if (info) {
02222         recordHit("auctionPassedPreprocessing");
02223         augmentAuction(info);
02224     }
02225     else {
02226         recordHit("auctionDropped.noPotentialBidders");
02227         ML::atomic_inc(numNoPotentialBidders);
02228     }
02229 }
02230 
02231 void
02232 Router::
02233 onAuctionDone(std::shared_ptr<Auction> auction)
02234 {
02235 #if 0
02236     static std::mutex lock;
02237     std::unique_lock<std::mutex> guard(lock);
02238 
02239     cerr << endl;
02240     cerr << "Router::onAuctionDone with auction id " << auction->id << endl;
02241     backtrace();
02242 #endif
02243 
02244     debugAuction(auction->id, "SENT SUBMITTED");
02245     submittedBuffer.push(auction);
02246 }
02247 
02248 void
02249 Router::
02250 updateAllAgents()
02251 {
02252     for (;;) {
02253 
02254         auto_ptr<AllAgentInfo> newInfo(new AllAgentInfo);
02255 
02256         AllAgentInfo * current = allAgents;
02257 
02258         for (auto it = agents.begin(), end = agents.end();  it != end;  ++it) {
02259             if (!it->second.configured) continue;
02260             if (!it->second.config) continue;
02261             if (!it->second.stats) continue;
02262             if (!it->second.status) continue;
02263             if (it->second.status->dead) continue;
02264 
02265             AgentInfoEntry entry;
02266             entry.name = it->first;
02267             entry.config = it->second.config;
02268             entry.stats = it->second.stats;
02269             entry.status = it->second.status;
02270             int i = newInfo->size();
02271             newInfo->push_back(entry);
02272 
02273             newInfo->agentIndex[it->first] = i;
02274             newInfo->accountIndex[it->second.config->account].push_back(i);
02275         }
02276 
02277         if (ML::cmp_xchg(allAgents, current, newInfo.get())) {
02278             newInfo.release();
02279             ExcAssertNotEqual(current, allAgents);
02280             if (current)
02281                 allAgentsGc.defer([=] () { delete current; });
02282             break;
02283         }
02284     }
02285 }
02286 
02287 void
02288 Router::
02289 doConfig(const std::string & agent,
02290          std::shared_ptr<const AgentConfig> config)
02291 {
02292     RouterProfiler profiler(dutyCycleCurrent.nsConfig);
02293     //const string fName = "Router::doConfig:";
02294     logMessage("CONFIG", agent, boost::trim_copy(config->toJson().toString()));
02295 
02296     // TODO: no need for this...
02297     auto newConfig = std::make_shared<AgentConfig>(*config);
02298     if (newConfig->roundRobinGroup == "")
02299         newConfig->roundRobinGroup = agent;
02300 
02301     AgentInfo & info = agents[agent];
02302 
02303     if (info.configured) {
02304         unconfigure(agent, *info.config);
02305         info.configured = false;
02306     }
02307 
02308     info.config = newConfig;
02309     //cerr << "configured " << agent << " strategy : " << info.config->strategy << " campaign "
02310     //     <<  info.config->campaign << endl;
02311 
02312     string bidRequestFormat = "jsonRaw";
02313     info.setBidRequestFormat(bidRequestFormat);
02314 
02315     configure(agent, *newConfig);
02316     info.configured = true;
02317     sendAgentMessage(agent, "GOTCONFIG", getCurrentTime());
02318 
02319     // Broadcast that we have a new agent or it has a new configuration
02320     updateAllAgents();
02321 }
02322 
02323 void
02324 Router::
02325 unconfigure(const std::string & agent, const AgentConfig & config)
02326 {
02327 }
02328 
02329 void
02330 Router::
02331 configure(const std::string & agent, AgentConfig & config)
02332 {
02333     if (config.account.empty())
02334         throw ML::Exception("attempt to add an account with empty values");
02335     // TODO: async
02336 
02337     bool includeReasons = true;
02338 
02339     // For each exchange, check campaign and creative compatibility
02340     auto onExchange = [&] (const std::shared_ptr<ExchangeConnector> & exch)
02341         {
02342             string exchangeName = exch->exchangeName();
02343 
02344             cerr << "scanning campaign with exchange " << exchangeName << endl;
02345             ExchangeConnector::ExchangeCompatibility ecomp
02346                 = exch->getCampaignCompatibility(config, includeReasons);
02347             if (!ecomp.isCompatible) {
02348                 cerr << "campaign not compatible: " << ecomp.reasons << endl;
02349                 return;
02350             }
02351 
02352             int numCompatibleCreatives = 0;
02353 
02354             for (auto & c: config.creatives) {
02355                 ExchangeConnector::ExchangeCompatibility ccomp
02356                     = exch->getCreativeCompatibility(c, includeReasons);
02357                 if (!ccomp.isCompatible) {
02358                     cerr << "creative not compatible: " << ccomp.reasons << endl;
02359                     return;
02360                 }
02361 
02362                 std::lock_guard<ML::Spinlock> guard(c.lock);
02363                 c.providerData[exchangeName] = ccomp.info;
02364                 ++numCompatibleCreatives;
02365             }
02366 
02367             if (numCompatibleCreatives == 0) {
02368                 cerr << "no compatible creatives" << endl;
02369                 return;
02370             }
02371 
02372             std::lock_guard<ML::Spinlock> guard(config.lock);
02373             config.providerData[exchangeName] = ecomp.info;
02374         };
02375 
02376     forAllExchanges(onExchange);
02377 
02378     auto onDone = [=] (std::exception_ptr exc, ShadowAccount&& ac)
02379         {
02380             //cerr << "got spend account for " << agent << ac << endl;
02381             if (exc)
02382                 logException(exc, "Banker addAccount");
02383         };
02384 
02385     banker->addSpendAccount(config.account, USD(0.10), onDone);
02386 }
02387 
02388 Json::Value
02389 Router::
02390 getStats() const
02391 {
02392     return Json::Value();
02393 #if 0
02394     sendMesg(control(), "STATS");
02395     vector<string> stats = recvAll(control());
02396 
02397     Json::Value result = Json::parse(stats.at(0));
02398 
02399     return result;
02400 #endif
02401 }
02402 
02403 Json::Value
02404 Router::
02405 getAgentInfo(const std::string & agent) const
02406 {
02407     return getAgentEntry(agent).toJson();
02408 }
02409 
02410 Json::Value
02411 Router::
02412 getAllAgentInfo() const
02413 {
02414     Json::Value result;
02415 
02416     auto onAgent = [&] (const AgentInfoEntry & info)
02417         {
02418             result[info.name] = info.toJson();
02419         };
02420 
02421     forEachAgent(onAgent);
02422 
02423     return result;
02424 }
02425 
02426 void
02427 Router::
02428 sendPings()
02429 {
02430     for (auto it = agents.begin(), end = agents.end();
02431          it != end;  ++it) {
02432         const string & agent = it->first;
02433         AgentInfo & info = it->second;
02434 
02435         // 1.  Send out new pings
02436         Date now = Date::now();
02437         if (info.sendPing(0, now))
02438             sendAgentMessage(agent, "PING0", now, "null");
02439         if (info.sendPing(1, now))
02440             sendAgentMessage(agent, "PING1", now, "null");
02441 
02442         // 2.  Look at the trend
02443         //double mean, max;
02444     }
02445 }
02446 
02447 void
02448 Router::
02449 doPong(int level, const std::vector<std::string> & message)
02450 {
02451     //cerr << "dopong (router)" << message << endl;
02452 
02453     string agent = message.at(0);
02454     Date sentTime = Date::parseSecondsSinceEpoch(message.at(2));
02455     Date receivedTime = Date::parseSecondsSinceEpoch(message.at(3));
02456     Date now = Date::now();
02457 
02458     double roundTripTime = now.secondsSince(sentTime);
02459     double outgoingTime = receivedTime.secondsSince(sentTime);
02460     double incomingTime = now.secondsSince(receivedTime);
02461 
02462     auto it = agents.find(agent);
02463     if (it == agents.end()) {
02464         cerr << "warning: dead agent sent a pong: " << agent << endl;
02465         return;
02466     }
02467 
02468     if (!it->second.configured)
02469         return;
02470 
02471     auto & info = it->second;
02472     info.gotPong(level, sentTime, receivedTime, now);
02473 
02474     const string & account = it->second.config->account.toString('.');
02475     recordOutcome(roundTripTime * 1000.0,
02476                   "accounts.%s.ping%d.roundTripTimeMs", account, level);
02477     recordOutcome(outgoingTime * 1000.0,
02478                   "accounts.%s.ping%d.outgoingTimeMs", account, level);
02479     recordOutcome(incomingTime * 1000.0,
02480                   "accounts.%s.ping%d.incomingTimeMs", account, level);
02481 }
02482 
02483 void
02484 Router::
02485 sendBidResponse(const std::string & agent,
02486                 const AgentInfo & info,
02487                 BidStatus status,
02488                 Date timestamp,
02489                 const std::string & message,
02490                 const Id & auctionId,
02491                 int spotNum,
02492                 Amount price,
02493                 const Auction * auction,
02494                 const std::string & bidData,
02495                 const Json::Value & metadata,
02496                 const std::string & augmentationsStr)
02497 {
02498     BidResultFormat format;
02499     switch (status) {
02500     case BS_WIN:   format = info.config->winFormat;   break;
02501     case BS_LOSS:  format = info.config->lossFormat;  break;
02502     default:       format = info.config->errorFormat;  break;
02503     }
02504 
02505     const char * statusStr = bidStatusToChar(status);
02506 
02507     switch (format) {
02508     case BRF_FULL:
02509         sendAgentMessage(agent, statusStr, timestamp, message, auctionId,
02510                          to_string(spotNum),
02511                          price.toString(),
02512                          (auction ? info.getBidRequestEncoding(*auction) : ""),
02513                          (auction ? info.encodeBidRequest(*auction) : ""),
02514                          bidData, metadata, augmentationsStr);
02515         break;
02516 
02517     case BRF_LIGHTWEIGHT:
02518         sendAgentMessage(agent, statusStr, timestamp, message, auctionId,
02519                          to_string(spotNum), price.toString());
02520         break;
02521 
02522     case BRF_NONE:
02523         break;
02524     }
02525 }
02526 
02527 void
02528 Router::
02529 forEachAgent(const OnAgentFn & onAgent) const
02530 {
02531     GcLock::SharedGuard guard(allAgentsGc);
02532     const AllAgentInfo * ac = allAgents;
02533     if (!ac) return;
02534 
02535     std::for_each(ac->begin(), ac->end(), onAgent);
02536 }
02537 
02538 void
02539 Router::
02540 forEachAccountAgent(const AccountKey & account,
02541                     const OnAgentFn & onAgent) const
02542 {
02543     GcLock::SharedGuard guard(allAgentsGc);
02544     const AllAgentInfo * ac = allAgents;
02545     if (!ac) return;
02546 
02547     auto it = ac->accountIndex.find(account);
02548     if (it == ac->accountIndex.end())
02549         return;
02550 
02551     for (auto jt = it->second.begin(), jend = it->second.end();
02552          jt != jend;  ++jt)
02553         onAgent(ac->at(*jt));
02554 }
02555 
02556 AgentInfoEntry
02557 Router::
02558 getAgentEntry(const std::string & agent) const
02559 {
02560     GcLock::SharedGuard guard(allAgentsGc);
02561     const AllAgentInfo * ac = allAgents;
02562     if (!ac) return AgentInfoEntry();
02563 
02564     auto it = ac->agentIndex.find(agent);
02565     if (it == ac->agentIndex.end())
02566         return AgentInfoEntry();
02567     return ac->at(it->second);
02568 }
02569 
02570 void
02571 Router::
02572 submitToPostAuctionService(std::shared_ptr<Auction> auction,
02573                            Id adSpotId,
02574                            const Auction::Response & bid)
02575 {
02576 #if 0
02577     static std::mutex lock;
02578     std::unique_lock<std::mutex> guard(lock);
02579 
02580     cerr << endl;
02581     cerr << "submitted auction " << auction->id << ","
02582          << adSpotId << endl;
02583 
02584     backtrace();
02585 #endif
02586     string auctionKey = auction->id.toString()
02587                         + "-" + adSpotId.toString()
02588                         + "-" + bid.agent;
02589     banker->detachBid(bid.account, auctionKey);
02590 
02591     SubmittedAuctionEvent event;
02592     event.auctionId = auction->id;
02593     event.adSpotId = adSpotId;
02594     event.lossTimeout = auction->lossAssumed;
02595     event.augmentations = auction->agentAugmentations[bid.agent];
02596     event.bidRequest = auction->request;
02597     event.bidRequestStr = auction->requestStr;
02598     event.bidRequestStrFormat = auction->requestStrFormat ;
02599     event.bidResponse = bid;
02600 
02601     string str = ML::DB::serializeToString(event);
02602 
02603     postAuctionEndpoint.sendMessage("AUCTION", str);
02604 
02605     if (auction.unique()) {
02606         auctionGraveyard.tryPush(auction);
02607     }
02608 }
02609 
02610 void
02611 Router::
02612 throwException(const std::string & key, const std::string & fmt, ...)
02613 {
02614     recordHit("error.exception");
02615     recordHit("error.exception.%s", key);
02616 
02617     string message;
02618     va_list ap;
02619     va_start(ap, fmt);
02620     try {
02621         message = vformat(fmt.c_str(), ap);
02622         va_end(ap);
02623     }
02624     catch (...) {
02625         va_end(ap);
02626         throw;
02627     }
02628 
02629     logRouterError("exception", key, message);
02630     throw ML::Exception("Router Exception: " + key + ": " + message);
02631 }
02632 
02633 void
02634 Router::
02635 debugAuctionImpl(const Id & auction, const std::string & type,
02636                  const std::vector<std::string> & args)
02637 {
02638     Date now = Date::now();
02639     boost::unique_lock<ML::Spinlock> guard(debugLock);
02640     AuctionDebugInfo & entry
02641         = debugInfo.access(auction, now.plusSeconds(30.0));
02642 
02643     entry.addAuctionEvent(now, type, args);
02644 }
02645 
02646 void
02647 Router::
02648 debugSpotImpl(const Id & auction, const Id & spot, const std::string & type,
02649               const std::vector<std::string> & args)
02650 {
02651     Date now = Date::now();
02652     boost::unique_lock<ML::Spinlock> guard(debugLock);
02653     AuctionDebugInfo & entry
02654         = debugInfo.access(auction, now.plusSeconds(30.0));
02655 
02656     entry.addSpotEvent(spot, now, type, args);
02657 }
02658 
02659 void
02660 Router::
02661 expireDebugInfo()
02662 {
02663     boost::unique_lock<ML::Spinlock> guard(debugLock);
02664     debugInfo.expire();
02665 }
02666 
02667 void
02668 Router::
02669 dumpAuction(const Id & auction) const
02670 {
02671     boost::unique_lock<ML::Spinlock> guard(debugLock);
02672     auto it = debugInfo.find(auction);
02673     if (it == debugInfo.end()) {
02674         //cerr << "*** unknown auction " << auction << " in "
02675         //     << debugInfo.size() << endl;
02676     }
02677     else it->second.dumpAuction();
02678 }
02679 
02680 void
02681 Router::
02682 dumpSpot(const Id & auction, const Id & spot) const
02683 {
02684     boost::unique_lock<ML::Spinlock> guard(debugLock);
02685     auto it = debugInfo.find(auction);
02686     if (it == debugInfo.end()) {
02687         //cerr << "*** unknown auction " << auction << " in "
02688         //     << debugInfo.size() << endl;
02689     }
02690     else it->second.dumpSpot(spot);
02691 }
02692 
02694 string
02695 Router::
02696 getProviderName()
02697     const
02698 {
02699     return serviceName();
02700 }
02701 
02702 Json::Value
02703 Router::
02704 getProviderIndicators()
02705     const
02706 {
02707     Json::Value value;
02708 
02709     /* Router health check:
02710        - valid connection to post auction loop */
02711     value["status"] = postAuctionEndpoint.isConnected() ? "ok" : "failure";
02712 
02713     return value;
02714 }
02715 
02716 void
02717 Router::
02718 startExchange(const std::string & exchangeType,
02719               const Json::Value & exchangeConfig)
02720 {
02721     auto exchange = ExchangeConnector::
02722         create(exchangeType, *this, exchangeType);
02723     exchange->configure(exchangeConfig);
02724     exchange->start();
02725 
02726     addExchange(std::move(exchange));
02727 }
02728 
02729 void
02730 Router::
02731 startExchange(const Json::Value & exchangeConfig)
02732 {
02733     std::string exchangeType = exchangeConfig["exchangeType"].asString();
02734     startExchange(exchangeType, exchangeConfig);
02735 }
02736 
02737 
02738 
02739 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator