RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rtb_router.cc 00002 Jeremy Barnes, 24 March 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 RTB router code. 00006 */ 00007 00008 #include "router.h" 00009 #include "soa/service/zmq_utils.h" 00010 #include "jml/arch/backtrace.h" 00011 #include "jml/arch/futex.h" 00012 #include "jml/arch/exception_handler.h" 00013 #include "soa/jsoncpp/writer.h" 00014 #include <boost/foreach.hpp> 00015 #include "jml/arch/atomic_ops.h" 00016 #include "jml/utils/set_utils.h" 00017 #include "jml/utils/environment.h" 00018 #include "jml/arch/info.h" 00019 #include "jml/utils/lightweight_hash.h" 00020 #include "jml/math/xdiv.h" 00021 #include <boost/tuple/tuple.hpp> 00022 #include "jml/utils/pair_utils.h" 00023 #include "jml/utils/exc_assert.h" 00024 #include "jml/db/persistent.h" 00025 #include "jml/utils/json_parsing.h" 00026 #include <boost/make_shared.hpp> 00027 #include "profiler.h" 00028 #include "rtbkit/core/banker/banker.h" 00029 #include "rtbkit/core/banker/null_banker.h" 00030 #include <boost/algorithm/string.hpp> 00031 #include "rtbkit/common/bids.h" 00032 #include "rtbkit/common/messages.h" 00033 #include "rtbkit/common/auction_events.h" 00034 00035 00036 using namespace std; 00037 using namespace ML; 00038 00039 00040 namespace RTBKIT { 00041 00042 00043 00044 /*****************************************************************************/ 00045 /* AGENT INFO */ 00046 /*****************************************************************************/ 00047 00048 Json::Value 00049 AgentInfoEntry:: 00050 toJson() const 00051 { 00052 Json::Value result; 00053 if (!valid()) return result; 00054 result["config"] = config->toJson(); 00055 result["stats"] = stats->toJson(); 00056 return result; 00057 } 00058 00059 /*****************************************************************************/ 00060 /* AUCTION DEBUG INFO */ 00061 /*****************************************************************************/ 00062 00063 void 00064 AuctionDebugInfo:: 00065 addAuctionEvent(Date timestamp, std::string type, 00066 const std::vector<std::string> & args) 00067 { 00068 Message message; 00069 message.timestamp = timestamp; 00070 message.type = type; 00071 //message.args = args; 00072 messages.push_back(message); 00073 } 00074 00075 void 00076 AuctionDebugInfo:: 00077 addSpotEvent(const Id & spot, Date timestamp, std::string type, 00078 const std::vector<std::string> & args) 00079 { 00080 Message message; 00081 message.spot = spot; 00082 message.timestamp = timestamp; 00083 message.type = type; 00084 //message.args = args; 00085 messages.push_back(message); 00086 } 00087 00088 void 00089 AuctionDebugInfo:: 00090 dumpAuction() const 00091 { 00092 for (unsigned i = 0; i < messages.size(); ++i) { 00093 auto & m = messages[i]; 00094 cerr << m.timestamp.print(6) << " " << m.spot << " " << m.type << endl; 00095 } 00096 } 00097 00098 void 00099 AuctionDebugInfo:: 00100 dumpSpot(Id spot) const 00101 { 00102 dumpAuction(); // TODO 00103 } 00104 00105 00106 /*****************************************************************************/ 00107 /* ROUTER */ 00108 /*****************************************************************************/ 00109 00110 Router:: 00111 Router(ServiceBase & parent, 00112 const std::string & serviceName, 00113 double secondsUntilLossAssumed, 00114 bool connectPostAuctionLoop) 00115 : ServiceBase(serviceName, parent), 00116 shutdown_(false), 00117 agentEndpoint(getZmqContext()), 00118 configBuffer(1024), 00119 startBiddingBuffer(65536), 00120 submittedBuffer(65536), 00121 auctionGraveyard(65536), 00122 augmentationLoop(*this), 00123 secondsUntilLossAssumed_(secondsUntilLossAssumed), 00124 globalBidProbability(1.0), 00125 bidsErrorRate(0.0), 00126 budgetErrorRate(0.0), 00127 connectPostAuctionLoop(connectPostAuctionLoop), 00128 allAgents(new AllAgentInfo()), 00129 configListener(getZmqContext()), 00130 initialized(false), 00131 logger(getZmqContext()), 00132 doDebug(false), 00133 numAuctions(0), numBids(0), numNonEmptyBids(0), 00134 numAuctionsWithBid(0), numNoPotentialBidders(0), 00135 numNoBidders(0), 00136 monitorClient(getZmqContext()), 00137 slowModeCount(0), 00138 monitorProviderClient(getZmqContext(), *this) 00139 { 00140 } 00141 00142 Router:: 00143 Router(std::shared_ptr<ServiceProxies> services, 00144 const std::string & serviceName, 00145 double secondsUntilLossAssumed, 00146 bool connectPostAuctionLoop) 00147 : ServiceBase(serviceName, services), 00148 shutdown_(false), 00149 agentEndpoint(getZmqContext()), 00150 postAuctionEndpoint(getZmqContext()), 00151 configBuffer(1024), 00152 startBiddingBuffer(65536), 00153 submittedBuffer(65536), 00154 auctionGraveyard(65536), 00155 augmentationLoop(*this), 00156 secondsUntilLossAssumed_(secondsUntilLossAssumed), 00157 globalBidProbability(1.0), 00158 bidsErrorRate(0.0), 00159 budgetErrorRate(0.0), 00160 connectPostAuctionLoop(connectPostAuctionLoop), 00161 allAgents(new AllAgentInfo()), 00162 configListener(getZmqContext()), 00163 initialized(false), 00164 logger(getZmqContext()), 00165 doDebug(false), 00166 numAuctions(0), numBids(0), numNonEmptyBids(0), 00167 numAuctionsWithBid(0), numNoPotentialBidders(0), 00168 numNoBidders(0), 00169 monitorClient(getZmqContext()), 00170 slowModeCount(0), 00171 monitorProviderClient(getZmqContext(), *this) 00172 { 00173 } 00174 00175 void 00176 Router:: 00177 init() 00178 { 00179 ExcAssert(!initialized); 00180 00181 registerServiceProvider(serviceName(), { "rtbRequestRouter" }); 00182 00183 banker.reset(new NullBanker()); 00184 00185 augmentationLoop.init(); 00186 00187 logger.init(getServices()->config, serviceName() + "/logger"); 00188 00189 00190 agentEndpoint.init(getServices()->config, serviceName() + "/agents"); 00191 agentEndpoint.clientMessageHandler 00192 = std::bind(&Router::handleAgentMessage, this, std::placeholders::_1); 00193 agentEndpoint.onConnection = [=] (const std::string & agent) 00194 { 00195 cerr << "agent " << agent << " connected to router" << endl; 00196 }; 00197 00198 agentEndpoint.onDisconnection = [=] (const std::string & agent) 00199 { 00200 cerr << "agent " << agent << " disconnected from router" << endl; 00201 }; 00202 00203 postAuctionEndpoint.init(getServices()->config, ZMQ_XREQ); 00204 00205 configListener.onConfigChange = [=] (const std::string & agent, 00206 std::shared_ptr<const AgentConfig> config) 00207 { 00208 cerr << endl << endl << "agent " << agent << " got new configuration" << endl; 00209 configBuffer.push(make_pair(agent, config)); 00210 }; 00211 00212 onSubmittedAuction = [=] (std::shared_ptr<Auction> auction, 00213 Id adSpotId, 00214 Auction::Response response) 00215 { 00216 submitToPostAuctionService(auction, adSpotId, response); 00217 }; 00218 00219 monitorClient.init(getServices()->config); 00220 monitorProviderClient.init(getServices()->config); 00221 00222 initialized = true; 00223 } 00224 00225 Router:: 00226 ~Router() 00227 { 00228 shutdown(); 00229 } 00230 00231 std::shared_ptr<Banker> 00232 Router:: 00233 getBanker() const 00234 { 00235 return banker; 00236 } 00237 00238 void 00239 Router:: 00240 setBanker(const std::shared_ptr<Banker> & newBanker) 00241 { 00242 banker = newBanker; 00243 } 00244 00245 void 00246 Router:: 00247 bindTcp() 00248 { 00249 logger.bindTcp(getServices()->ports->getRange("logs")); 00250 agentEndpoint.bindTcp(getServices()->ports->getRange("router")); 00251 } 00252 00253 void 00254 Router:: 00255 bindAgents(std::string agentUri) 00256 { 00257 try { 00258 agentEndpoint.bind(agentUri.c_str()); 00259 } catch (const std::exception & exc) { 00260 throw Exception("error while binding agent URI %s: %s", 00261 agentUri.c_str(), exc.what()); 00262 } 00263 } 00264 00265 void 00266 Router:: 00267 bindAugmentors(const std::string & uri) 00268 { 00269 try { 00270 augmentationLoop.bindAugmentors(uri); 00271 } catch (const std::exception & exc) { 00272 throw Exception("error while binding augmentation URI %s: %s", 00273 uri.c_str(), exc.what()); 00274 } 00275 } 00276 00277 void 00278 Router:: 00279 unsafeDisableMonitor() 00280 { 00281 // TODO: we shouldn't be reaching inside these structures... 00282 monitorClient.testMode = true; 00283 monitorClient.testResponse = true; 00284 monitorProviderClient.inhibit_ = true; 00285 } 00286 00287 void 00288 Router:: 00289 start(boost::function<void ()> onStop) 00290 { 00291 ExcAssert(initialized); 00292 00293 static Lock lock; 00294 Guard guard(lock); 00295 00296 if (runThread) 00297 throw Exception("router is already running"); 00298 00299 auto runfn = [=] () 00300 { 00301 this->run(); 00302 if (onStop) onStop(); 00303 }; 00304 00305 logger.start(); 00306 augmentationLoop.start(); 00307 runThread.reset(new boost::thread(runfn)); 00308 00309 if (connectPostAuctionLoop) { 00310 postAuctionEndpoint.connectToServiceClass("rtbPostAuctionService", "events"); 00311 } 00312 00313 configListener.init(getServices()->config); 00314 configListener.start(); 00315 00316 /* This is an extra thread which sits there deleting auctions 00317 to take this out of the hands of the main loop (it can easily use 00318 up nearly 20% of the capacity of the main loop). 00319 */ 00320 auto auctionDeleter = [=] () 00321 { 00322 while (!this->shutdown_) { 00323 std::shared_ptr<Auction> toDelete; 00324 auctionGraveyard.tryPop(toDelete, 0.05); 00325 int numDeleted = 1; 00326 while (this->auctionGraveyard.tryPop(toDelete)) 00327 ++numDeleted; 00328 //cerr << "deleted " << numDeleted << " auctions" 00329 // << endl; 00330 ML::sleep(0.001); 00331 } 00332 }; 00333 00334 cleanupThread.reset(new boost::thread(auctionDeleter)); 00335 00336 monitorClient.start(); 00337 monitorProviderClient.start(); 00338 } 00339 00340 size_t 00341 Router:: 00342 numNonIdle() const 00343 { 00344 size_t numInFlight, numAwaitingAugmentation; 00345 { 00346 Guard guard(lock); 00347 numInFlight = inFlight.size(); 00348 numAwaitingAugmentation = augmentationLoop.numAugmenting(); 00349 } 00350 00351 cerr << "numInFlight = " << numInFlight << endl; 00352 cerr << "numAwaitingAugmentation = " << numAwaitingAugmentation << endl; 00353 00354 return numInFlight + numAwaitingAugmentation; 00355 } 00356 00357 void 00358 Router:: 00359 sleepUntilIdle() 00360 { 00361 for (int iter = 0;;++iter) { 00362 augmentationLoop.sleepUntilIdle(); 00363 size_t nonIdle = numNonIdle(); 00364 if (nonIdle == 0) break; 00365 //cerr << "there are " << nonIdle << " non-idle" << endl; 00366 ML::sleep(0.001); 00367 } 00368 } 00369 00370 void 00371 Router:: 00372 issueTimestamp() 00373 { 00374 Date now = Date::now(); 00375 00376 cerr << "timestamp: " 00377 << ML::format("%.6f", now.secondsSinceEpoch()) 00378 << " - " << now.printClassic() 00379 << endl; 00380 } 00381 00382 void 00383 Router:: 00384 run() 00385 { 00386 using namespace std; 00387 00388 zmq_pollitem_t items [] = { 00389 { agentEndpoint.getSocketUnsafe(), 0, ZMQ_POLLIN, 0 }, 00390 { 0, wakeupMainLoop.fd(), ZMQ_POLLIN, 0 } 00391 }; 00392 00393 double last_check = ML::wall_time(), last_check_pace = last_check, 00394 lastPings = last_check; 00395 00396 //cerr << "server listening" << endl; 00397 00398 auto getTime = [&] () { return Date::now().secondsSinceEpoch(); }; 00399 00400 double beforeSleep, afterSleep = getTime(); 00401 int numTimesCouldSleep = 0; 00402 int totalSleeps = 0; 00403 double lastTimestamp = 0; 00404 00405 recordHit("routerUp"); 00406 00407 //double lastDump = ML::wall_time(); 00408 00409 struct TimesEntry { 00410 TimesEntry() 00411 : time(0.0), count(0) 00412 { 00413 }; 00414 00415 void add(double time) 00416 { 00417 this->time += time; 00418 ++count; 00419 } 00420 00421 double time; 00422 uint64_t count; 00423 }; 00424 00425 std::map<std::string, TimesEntry> times; 00426 00427 double auctionKeepProbability = 1.0; 00428 00429 // Attempt to wake up once per millisecond 00430 00431 Date lastSleep = Date::now(); 00432 00433 while (!shutdown_) { 00434 beforeSleep = getTime(); 00435 00436 dutyCycleCurrent.nsProcessing 00437 += microsecondsBetween(beforeSleep, afterSleep); 00438 00439 int rc = 0; 00440 00441 for (unsigned i = 0; i < 20 && rc == 0; ++i) 00442 rc = zmq_poll(items, 2, 0); 00443 if (rc == 0) { 00444 ++numTimesCouldSleep; 00445 checkExpiredAuctions(); 00446 00447 #if 1 00448 // Try to sleep only once per 1/2 a millisecond to avoid too many 00449 // context switches. 00450 Date now = Date::now(); 00451 double timeSinceSleep = lastSleep.secondsUntil(now); 00452 double timeToWait = 0.0005 - timeSinceSleep; 00453 if (timeToWait > 0) { 00454 ML::sleep(timeToWait); 00455 } 00456 lastSleep = now; 00457 #endif 00458 00459 00460 rc = zmq_poll(items, 2, 50 /* milliseconds */); 00461 } 00462 00463 //cerr << "rc = " << rc << endl; 00464 00465 afterSleep = getTime(); 00466 00467 dutyCycleCurrent.nsSleeping 00468 += microsecondsBetween(afterSleep, beforeSleep); 00469 dutyCycleCurrent.nEvents += 1; 00470 00471 times["asleep"].add(microsecondsBetween(afterSleep, beforeSleep)); 00472 00473 if (rc == -1 && zmq_errno() != EINTR) { 00474 cerr << "zeromq error: " << zmq_strerror(zmq_errno()) << endl; 00475 } 00476 00477 { 00478 double atStart = getTime(); 00479 std::shared_ptr<AugmentationInfo> info; 00480 while (startBiddingBuffer.tryPop(info)) { 00481 doStartBidding(info); 00482 } 00483 00484 double atEnd = getTime(); 00485 times["doStartBidding"].add(microsecondsBetween(atEnd, atStart)); 00486 } 00487 00488 00489 00490 { 00491 double atStart = getTime(); 00492 00493 std::pair<std::string, std::shared_ptr<const AgentConfig> > config; 00494 while (configBuffer.tryPop(config)) { 00495 if (!config.second) { 00496 // deconfiguration 00497 // TODO 00498 cerr << "agent " << config.first << " lost configuration" 00499 << endl; 00500 } 00501 else { 00502 doConfig(config.first, config.second); 00503 } 00504 } 00505 00506 double atEnd = getTime(); 00507 times["doConfig"].add(microsecondsBetween(atEnd, atStart)); 00508 } 00509 00510 { 00511 double atStart = getTime(); 00512 std::shared_ptr<Auction> auction; 00513 while (submittedBuffer.tryPop(auction)) 00514 doSubmitted(auction); 00515 00516 double atEnd = getTime(); 00517 times["doSubmitted"].add(microsecondsBetween(atEnd, atStart)); 00518 } 00519 00520 if (items[0].revents & ZMQ_POLLIN) { 00521 double beforeMessage = getTime(); 00522 // Agent message 00523 vector<string> message; 00524 try { 00525 message = recvAll(agentEndpoint.getSocketUnsafe()); 00526 agentEndpoint.handleMessage(std::move(message)); 00527 double atEnd = getTime(); 00528 times[message.at(1)].add(microsecondsBetween(atEnd, beforeMessage)); 00529 } catch (const std::exception & exc) { 00530 cerr << "error handling agent message " << message 00531 << ": " << exc.what() << endl; 00532 logRouterError("handleAgentMessage", exc.what(), 00533 message); 00534 } 00535 } 00536 00537 if (items[1].revents & ZMQ_POLLIN) { 00538 wakeupMainLoop.read(); 00539 } 00540 00541 //checkExpiredAuctions(); 00542 00543 double now = ML::wall_time(); 00544 double beforeChecks = getTime(); 00545 00546 if (now - lastPings > 1.0) { 00547 // Send out pings and interpret the results of the last lot of 00548 // pinging. 00549 sendPings(); 00550 00551 lastPings = now; 00552 } 00553 00554 if (now - last_check_pace > 10.0) { 00555 if (numTimesCouldSleep < 50) { 00556 auctionKeepProbability = std::max(auctionKeepProbability - 0.10, 00557 0.10); 00558 } 00559 else if (numTimesCouldSleep > 2000) { 00560 auctionKeepProbability = std::min(auctionKeepProbability + 0.10, 00561 1.00); 00562 } 00563 else if (numTimesCouldSleep > 500) { 00564 auctionKeepProbability = std::min(auctionKeepProbability + 0.05, 00565 1.00); 00566 } 00567 else if (numTimesCouldSleep > 100) { 00568 auctionKeepProbability = std::min(auctionKeepProbability + 0.01, 00569 1.00); 00570 } 00571 00572 #if 1 00573 cerr << "auctionKeepProbability = " << auctionKeepProbability 00574 << " numTimesCouldSleep = " << numTimesCouldSleep 00575 << endl; 00576 #endif 00577 00578 // Start dropping them early if we get to 50% and we're still having 00579 // trouble keeping up. 00580 double earlyAuctionKeepProbability = 1.0; 00581 #if 0 00582 if (auctionKeepProbability < 0.5) 00583 earlyAuctionKeepProbability = auctionKeepProbability * 0.5; 00584 #else 00585 earlyAuctionKeepProbability = auctionKeepProbability; 00586 #endif 00587 setAcceptAuctionProbability(earlyAuctionKeepProbability); 00588 00589 recordEvent("auctionKeepPercentage", ET_LEVEL, 00590 auctionKeepProbability * 100.0); 00591 recordEvent("numTimesCouldSleep", ET_LEVEL, 00592 numTimesCouldSleep); 00593 00594 totalSleeps += numTimesCouldSleep; 00595 00596 numTimesCouldSleep = 0; 00597 last_check_pace = now; 00598 } 00599 00600 if (now - last_check > 10.0) { 00601 00602 logMessage("MARK", 00603 Date::fromSecondsSinceEpoch(last_check).print(), 00604 format("active: %zd augmenting, %zd inFlight, " 00605 "%zd agents", 00606 augmentationLoop.numAugmenting(), 00607 inFlight.size(), 00608 agents.size())); 00609 00610 dutyCycleCurrent.ending = Date::now(); 00611 dutyCycleHistory.push_back(dutyCycleCurrent); 00612 dutyCycleCurrent.clear(); 00613 00614 if (dutyCycleHistory.size() > 200) 00615 dutyCycleHistory.erase(dutyCycleHistory.begin(), 00616 dutyCycleHistory.end() - 100); 00617 00618 checkDeadAgents(); 00619 00620 double total = 0.0; 00621 for (auto it = times.begin(); it != times.end(); ++it) 00622 total += it->second.time; 00623 00624 cerr << "total of " << total << " microseconds and " 00625 << totalSleeps << " sleeps" << endl; 00626 00627 for (auto it = times.begin(); it != times.end(); ++it) { 00628 cerr << ML::format("%-30s %8lld %10.0f %6.2f%% %8.2fus/call\n", 00629 it->first.c_str(), 00630 (unsigned long long)it->second.count, 00631 it->second.time, 00632 100.0 * it->second.time / total, 00633 it->second.time / it->second.count); 00634 00635 recordEvent(("routerLoop." + it->first).c_str(), ET_LEVEL, 00636 1.0 * it->second.time / (now - last_check) / 1000000.0); 00637 00638 } 00639 00640 times.clear(); 00641 totalSleeps = 0; 00642 00643 last_check = now; 00644 } 00645 00646 times["checks"].add(microsecondsBetween(getTime(), beforeChecks)); 00647 00648 if (now - lastTimestamp >= 1.0) { 00649 banker->logBidEvents(*this); 00650 //issueTimestamp(); 00651 lastTimestamp = now; 00652 } 00653 } 00654 00655 //cerr << "finished run loop" << endl; 00656 00657 recordHit("routerDown"); 00658 00659 //cerr << "server shutdown" << endl; 00660 } 00661 00662 void 00663 Router:: 00664 shutdown() 00665 { 00666 configListener.shutdown(); 00667 00668 shutdown_ = true; 00669 futex_wake(shutdown_); 00670 wakeupMainLoop.signal(); 00671 00672 augmentationLoop.shutdown(); 00673 00674 if (runThread) 00675 runThread->join(); 00676 runThread.reset(); 00677 if (cleanupThread) 00678 cleanupThread->join(); 00679 cleanupThread.reset(); 00680 00681 logger.shutdown(); 00682 banker.reset(); 00683 00684 monitorClient.shutdown(); 00685 monitorProviderClient.shutdown(); 00686 } 00687 00688 void 00689 Router:: 00690 injectAuction(std::shared_ptr<Auction> auction, double lossTime) 00691 { 00692 // cerr << "injectAuction was called!!!" << endl; 00693 if (!auction->handleAuction) { 00694 // Modify the auction to insert our auction done handling 00695 auction->handleAuction 00696 = [=] (std::shared_ptr<Auction> auction) 00697 { 00698 this->onAuctionDone(auction); 00699 }; 00700 } 00701 00702 auction->lossAssumed = getCurrentTime().plusSeconds(lossTime); 00703 onNewAuction(auction); 00704 } 00705 00706 inline std::string chomp(const std::string & s) 00707 { 00708 const char * start = s.c_str(); 00709 const char * end = start + s.length(); 00710 00711 while (end > start && end[-1] == '\n') --end; 00712 00713 if (end == start + s.length()) return s; 00714 return string(start, end); 00715 } 00716 00717 std::shared_ptr<Auction> 00718 Router:: 00719 injectAuction(Auction::HandleAuction onAuctionFinished, 00720 std::shared_ptr<BidRequest> request, 00721 const std::string & requestStr, 00722 const std::string & requestStrFormat, 00723 double startTime, 00724 double expiryTime, 00725 double lossTime) 00726 { 00727 std::shared_ptr<Auction> auction 00728 (new Auction(nullptr, 00729 onAuctionFinished, 00730 request, 00731 chomp(requestStr), 00732 requestStrFormat, 00733 Date::fromSecondsSinceEpoch(startTime), 00734 Date::fromSecondsSinceEpoch(expiryTime))); 00735 00736 injectAuction(auction, lossTime); 00737 00738 return auction; 00739 } 00740 00741 void 00742 Router:: 00743 notifyFinishedAuction(const Id & auctionId) 00744 { 00745 throw ML::Exception("notifyFinishedAuction: not finished"); 00746 } 00747 00748 int 00749 Router:: 00750 numAuctionsInProgress() const 00751 { 00752 return -1;//inFlight.size(); 00753 } 00754 00755 void 00756 Router:: 00757 handleAgentMessage(const std::vector<std::string> & message) 00758 { 00759 try { 00760 using namespace std; 00761 //cerr << "got agent message " << message << endl; 00762 00763 if (message.size() < 2) { 00764 returnErrorResponse(message, "not enough message parts"); 00765 return; 00766 } 00767 00768 const string & address = message[0]; 00769 const string & request = message[1]; 00770 00771 if (request.empty()) 00772 returnErrorResponse(message, "null request field"); 00773 00774 if (request == "CONFIG") { 00775 string configName = message.at(2); 00776 if (!agents.count(configName)) { 00777 // We don't yet know about its configuration 00778 sendAgentMessage(address, "NEEDCONFIG", getCurrentTime()); 00779 return; 00780 } 00781 agents[configName].address = address; 00782 return; 00783 } 00784 00785 if (!agents.count(address)) { 00786 cerr << "doing NEEDCONFIG for " << address << endl; 00787 return; 00788 } 00789 00790 AgentInfo & info = agents[address]; 00791 info.gotHeartbeat(Date::now()); 00792 00793 if (!info.configured) { 00794 throw ML::Exception("message to unconfigured agent"); 00795 } 00796 00797 if (request[0] == 'B' && request == "BID") { 00798 doBid(message); 00799 return; 00800 } 00801 00802 //cerr << "router got message " << message << endl; 00803 00804 if (request[0] == 'P' && request == "PONG0") { 00805 doPong(0, message); 00806 return; 00807 } 00808 else if (request[0] == 'P' && request == "PONG1") { 00809 doPong(1, message); 00810 return; 00811 } 00812 00813 returnErrorResponse(message, "unknown agent request"); 00814 } catch (const std::exception & exc) { 00815 returnErrorResponse(message, 00816 "threw exception: " + string(exc.what())); 00817 } 00818 } 00819 00820 void 00821 Router:: 00822 checkDeadAgents() 00823 { 00824 //Date start = Date::now(); 00825 00826 using namespace std; 00827 //cerr << "checking for dead agents" << endl; 00828 00829 std::vector<Agents::iterator> deadAgents; 00830 00831 for (auto it = agents.begin(), end = agents.end(); it != end; 00832 ++it) { 00833 auto & info = it->second; 00834 00835 const std::string & account = info.config->account.toString('.'); 00836 00837 Date now = Date::now(); 00838 double oldest = 0.0; 00839 double total = 0.0; 00840 00841 vector<Id> toExpire; 00842 00843 // Check for in flight timeouts. This shouldn't happen, but there 00844 // appears to be a way in which we lose track of an inflight auction 00845 auto onInFlight = [&] (const Id & id, const Date & date) 00846 { 00847 double secondsSince = now.secondsSince(date); 00848 00849 oldest = std::max(oldest, secondsSince); 00850 total += secondsSince; 00851 00852 if (secondsSince > 30.0) { 00853 00854 this->recordHit("accounts.%s.lostBids", account); 00855 00856 this->sendBidResponse(it->first, 00857 info, 00858 BS_LOSTBID, 00859 this->getCurrentTime(), 00860 "guaranteed", id); 00861 00862 toExpire.push_back(id); 00863 } 00864 }; 00865 00866 info.forEachInFlight(onInFlight); 00867 00868 this->recordLevel(info.numBidsInFlight(), 00869 "accounts.%s.inFlight.numInFlight", account); 00870 this->recordLevel(oldest, 00871 "accounts.%s.inFlight.oldestAgeSeconds", account); 00872 double averageAge = 0.0; 00873 if (info.numBidsInFlight() != 0) 00874 averageAge = total / info.numBidsInFlight(); 00875 00876 this->recordLevel(averageAge, 00877 "accounts.%s.inFlight.averageAgeSeconds", account); 00878 00879 for (auto jt = toExpire.begin(), jend = toExpire.end(); jt != jend; 00880 ++jt) { 00881 info.expireBidInFlight(*jt); 00882 } 00883 00884 double timeSinceHeartbeat 00885 = now.secondsSince(info.status->lastHeartbeat); 00886 00887 this->recordLevel(timeSinceHeartbeat, 00888 "accounts.%s.timeSinceHeartbeat", account); 00889 00890 if (timeSinceHeartbeat > 5.0) { 00891 info.status->dead = true; 00892 if (it->second.numBidsInFlight() != 0) { 00893 cerr << "agent " << it->first 00894 << " has " << it->second.numBidsInFlight() 00895 << " undead auctions: " << endl; 00896 00897 auto onInFlight = [&] (const Id & id, Date date) 00898 { 00899 cerr << " " << id << " --> " 00900 << date << " (" << now.secondsSince(date) 00901 << "s ago)" << endl; 00902 }; 00903 00904 info.forEachInFlight(onInFlight); 00905 } 00906 else { 00907 // agent is dead 00908 cerr << "agent " << it->first << " appears to be dead" 00909 << endl; 00910 sendAgentMessage(it->first, "BYEBYE", getCurrentTime()); 00911 deadAgents.push_back(it); 00912 } 00913 } 00914 } 00915 00916 for (auto it = deadAgents.begin(), end = deadAgents.end(); 00917 it != end; ++it) { 00918 cerr << "WARNING: dead agent doesn't clean up its state properly" 00919 << endl; 00920 // TODO: undo all bids in progress 00921 agents.erase(*it); 00922 } 00923 00924 if (!deadAgents.empty()) 00925 // Broadcast that we have different agents 00926 updateAllAgents(); 00927 00928 //cerr << "dead agents took " << Date::now().secondsSince(start) << "s" 00929 // << endl; 00930 } 00931 00932 void 00933 Router:: 00934 checkExpiredAuctions() 00935 { 00936 //recentlySubmitted.clear(); 00937 00938 Date start = Date::now(); 00939 00940 { 00941 RouterProfiler profiler(dutyCycleCurrent.nsExpireInFlight); 00942 00943 // Look for in flight timeout expiries 00944 auto onExpiredInFlight = [&] (const Id & auctionId, 00945 const AuctionInfo & auctionInfo) 00946 { 00947 this->debugAuction(auctionId, "EXPIRED", {}); 00948 00949 // Tell any remaining bidders that it's too late... 00950 for (auto it = auctionInfo.bidders.begin(), 00951 end = auctionInfo.bidders.end(); 00952 it != end; ++it) { 00953 string agent = it->first; 00954 if (!agents.count(agent)) continue; 00955 00956 if (agents[agent].expireBidInFlight(auctionId)) { 00957 AgentInfo & info = this->agents[agent]; 00958 ++info.stats->tooLate; 00959 00960 this->recordHit("accounts.%s.droppedBids", 00961 info.config->account.toString('.')); 00962 00963 this->sendBidResponse(agent, 00964 info, 00965 BS_DROPPEDBID, 00966 this->getCurrentTime(), 00967 "guaranteed", 00968 auctionId, 00969 0, Amount(), 00970 auctionInfo.auction.get()); 00971 } 00972 } 00973 00974 #if 0 00975 string msg = ML::format("in flight auction expiry: id %s " 00976 "status %s, %zd bidders:", 00977 auctionId.toString().c_str(), 00978 auctionInfo.auction->status().c_str(), 00979 auctionInfo.bidders.size()); 00980 for (auto it = auctionInfo.bidders.begin(), 00981 end = auctionInfo.bidders.end(); 00982 it != end; ++it) 00983 msg += ' ' + it->first + "->" + it->second.bidTime.print(5); 00984 cerr << Date::now().print(5) << " " << msg << endl; 00985 dumpAuction(auctionId); 00986 this->logRouterError("checkExpiredAuctions.inFlight", 00987 msg); 00988 00989 #endif 00990 00991 // end the auction when it expires in case we're waiting on dead agents 00992 if(!auctionInfo.auction->getResponses().empty()) { 00993 if(!auctionInfo.auction->finish()) { 00994 this->recordHit("tooLateToFinish"); 00995 } 00996 } 00997 00998 return Date(); 00999 }; 01000 01001 inFlight.expire(onExpiredInFlight, start); 01002 } 01003 01004 { 01005 RouterProfiler profiler(dutyCycleCurrent.nsExpireBlacklist); 01006 blacklist.doExpiries(); 01007 } 01008 01009 if (doDebug) { 01010 RouterProfiler profiler(dutyCycleCurrent.nsExpireDebug); 01011 expireDebugInfo(); 01012 } 01013 } 01014 01015 void 01016 Router:: 01017 returnErrorResponse(const std::vector<std::string> & message, 01018 const std::string & error) 01019 { 01020 using namespace std; 01021 if (message.empty()) return; 01022 logMessage("ERROR", error, message); 01023 sendAgentMessage(message[0], "ERROR", getCurrentTime(), error, message); 01024 } 01025 01026 void 01027 Router:: 01028 doStats(const std::vector<std::string> & message) 01029 { 01030 Json::Value result(Json::objectValue); 01031 01032 result["numAugmenting"] = augmentationLoop.numAugmenting(); 01033 result["numInFlight"] = inFlight.size(); 01034 result["blacklistUsers"] = blacklist.size(); 01035 01036 result["numAgents"] = agents.size(); 01037 01038 //result["accounts"] = banker->dumpAllCampaignsJson(); 01039 01040 Json::Value agentsVal(Json::objectValue); 01041 01042 int totalAgentInFlight = 0; 01043 01044 BOOST_FOREACH(auto agent, agents) { 01045 agentsVal[agent.first] = agent.second.toJson(false, false); 01046 totalAgentInFlight += agent.second.numBidsInFlight(); 01047 } 01048 01049 result["agents"] = agentsVal; 01050 01051 result["totalAgentInFlight"] = totalAgentInFlight; 01052 01053 if (dutyCycleHistory.empty()) 01054 result["dutyCycle"] = dutyCycleCurrent.toJson(); 01055 else result["dutyCycle"] = dutyCycleHistory.back().toJson(); 01056 01057 result["fileDescriptorCount"] = ML::num_open_files(); 01058 01059 addChildServiceStatus(result); 01060 01061 result["numAuctions"] = numAuctions; 01062 result["numBids"] = numBids; 01063 result["numNonEmptyBids"] = numNonEmptyBids; 01064 result["numAuctionsWithBid"] = numAuctionsWithBid; 01065 result["numNoBidders"] = numNoBidders; 01066 result["numNoPotentialBidders"] = numNoPotentialBidders; 01067 01068 //sendMessage(controlEndpoint, message[0], result); 01069 } 01070 01071 01072 Json::Value 01073 Router:: 01074 getServiceStatus() const 01075 { 01076 return getStats(); 01077 } 01078 01079 void 01080 Router:: 01081 augmentAuction(const std::shared_ptr<AugmentationInfo> & info) 01082 { 01083 if (!info || !info->auction) 01084 throw ML::Exception("augmentAuction with no auction to augment"); 01085 01086 if (info->auction->tooLate()) { 01087 recordHit("tooLateBeforeAdd"); 01088 return; 01089 } 01090 01091 double augmentationWindow = 0.005; // 5ms available to augment 01092 01093 auto onDoneAugmenting = [=] (const std::shared_ptr<AugmentationInfo> & info) 01094 { 01095 info->auction->doneAugmenting = Date::now(); 01096 01097 if (info->auction->tooLate()) { 01098 this->recordHit("tooLateAfterAugmenting"); 01099 return; 01100 } 01101 01102 // Send it off to be farmed out to the bidders 01103 startBiddingBuffer.push(info); 01104 wakeupMainLoop.signal(); 01105 }; 01106 01107 augmentationLoop.augment(info, Date::now().plusSeconds(augmentationWindow), 01108 onDoneAugmenting); 01109 } 01110 01111 std::shared_ptr<AugmentationInfo> 01112 Router:: 01113 preprocessAuction(const std::shared_ptr<Auction> & auction) 01114 { 01115 ML::atomic_inc(numAuctions); 01116 01117 Date now = Date::now(); 01118 auction->inPrepro = now; 01119 01120 if (auction->lossAssumed == Date()) 01121 auction->lossAssumed 01122 = Date::now().plusSeconds(secondsUntilLossAssumed_); 01123 Date lossTimeout = auction->lossAssumed; 01124 01125 //cerr << "AUCTION " << auction->id << " " << auction->requestStr << endl; 01126 01127 //cerr << "url = " << auction->request->url << endl; 01128 01129 if (auction->tooLate()) { 01130 recordHit("tooLateBeforeRouting"); 01131 //inFlight.erase(auctionId); 01132 return std::shared_ptr<AugmentationInfo>(); 01133 } 01134 01135 const string & exchange = auction->request->exchange; 01136 01137 /* Parse out the adimp. */ 01138 const vector<AdSpot> & imp = auction->request->imp; 01139 01140 recordCount(imp.size(), "exchange.%s.imp", exchange.c_str()); 01141 recordHit("exchange.%s.requests", exchange.c_str()); 01142 01143 // List of possible agents per round robin group 01144 std::map<string, GroupPotentialBidders> groupAgents; 01145 01146 double timeLeftMs = auction->timeAvailable() * 1000.0; 01147 01148 bool traceAuction = auction->id.hash() % 10 == 0; 01149 01150 AgentConfig::RequestFilterCache cache(*auction->request); 01151 01152 auto exchangeConnector = auction->exchangeConnector; 01153 01154 auto checkAgent = [&] (const AgentInfoEntry & entry) 01155 { 01156 const AgentConfig & config = *entry.config; 01157 AgentStats & stats = *entry.stats; 01158 const string & agentName = entry.name; 01159 01160 auto doFilterStat = [&] (const char * reason) 01161 { 01162 if (!traceAuction) return; 01163 01164 this->recordHit("accounts.%s.filter.%s", 01165 config.account.toString('.'), 01166 reason); 01167 }; 01168 01169 ML::atomic_inc(stats.intoFilters); 01170 doFilterStat("intoStaticFilters"); 01171 01172 ExcAssert(entry.status); 01173 01174 if (entry.status->lastHeartbeat.secondsSince(now) > 2.0 01175 || entry.status->dead) { 01176 doFilterStat("static.003_agentAppearsDead"); 01177 return; 01178 } 01179 01180 if (entry.status->numBidsInFlight >= config.maxInFlight) { 01181 doFilterStat("static.004_earlyTooManyInFlight"); 01182 return; 01183 } 01184 01185 /* Check if we have enough time to process it. */ 01186 if (config.minTimeAvailableMs != 0.0 01187 && timeLeftMs < config.minTimeAvailableMs) 01188 { 01189 ML::atomic_inc(stats.notEnoughTime); 01190 doFilterStat("static.005_notEnoughTime"); 01191 return; 01192 } 01193 01194 BiddableSpots biddableSpots 01195 = config.isBiddableRequest(exchangeConnector, 01196 *auction->request, stats, 01197 cache, doFilterStat); 01198 if (biddableSpots.empty()) 01199 return; 01200 01201 ML::atomic_inc(stats.passedStaticFilters); 01202 doFilterStat("passedStaticFilters"); 01203 01204 string rrGroup = config.roundRobinGroup; 01205 if (rrGroup == "") rrGroup = agentName; 01206 01207 PotentialBidder bidder; 01208 bidder.agent = agentName; 01209 bidder.imp = biddableSpots; 01210 bidder.config = entry.config; 01211 bidder.stats = entry.stats; 01212 01213 groupAgents[rrGroup].push_back(bidder); 01214 groupAgents[rrGroup].totalBidProbability 01215 += config.bidProbability; 01216 }; 01217 01218 forEachAgent(checkAgent); 01219 01220 std::vector<GroupPotentialBidders> validGroups; 01221 01222 for (auto it = groupAgents.begin(), end = groupAgents.end(); 01223 it != end; ++it) { 01224 // Check for bid probability and skip if we don't bid 01225 double bidProbability 01226 = it->second.totalBidProbability 01227 / it->second.size() 01228 * globalBidProbability; 01229 01230 if (bidProbability < 1.0) { 01231 float val = (random() % 1000000) / 1000000.0; 01232 if (val > bidProbability) { 01233 for (unsigned i = 0; i < it->second.size(); ++i) 01234 ML::atomic_inc(it->second[i].stats->skippedBidProbability); 01235 continue; 01236 } 01237 } 01238 01239 // Group is valid for bidding; next step is to augment the bid 01240 // request 01241 validGroups.push_back(it->second); 01242 } 01243 01244 if (validGroups.empty()) { 01245 // Now we need to end the auction 01246 //inFlight.erase(auctionId); 01247 if (!auction->finish()) { 01248 recordHit("tooLateToFinish"); 01249 } 01250 01251 //cerr << "no valid groups " << endl; 01252 return std::shared_ptr<AugmentationInfo>(); 01253 } 01254 01255 auto info = std::make_shared<AugmentationInfo>(auction, lossTimeout); 01256 info->potentialGroups.swap(validGroups); 01257 01258 auction->outOfPrepro = Date::now(); 01259 01260 recordOutcome(auction->outOfPrepro.secondsSince(auction->inPrepro) * 1000.0, 01261 "preprocessAuctionTimeMs"); 01262 01263 return info; 01264 } 01265 01266 void 01267 Router:: 01268 doStartBidding(const std::vector<std::string> & message) 01269 { 01270 std::shared_ptr<AugmentationInfo> augInfo 01271 = sharedPtrFromMessage<AugmentationInfo>(message.at(2)); 01272 doStartBidding(augInfo); 01273 } 01274 01275 void 01276 Router:: 01277 doStartBidding(const std::shared_ptr<AugmentationInfo> & augInfo) 01278 { 01279 //static const char *fName = "Router::doStartBidding:"; 01280 RouterProfiler profiler(dutyCycleCurrent.nsStartBidding); 01281 01282 try { 01283 Id auctionId = augInfo->auction->id; 01284 01285 if (augmentationLoop.currentlyAugmenting(auctionId)) { 01286 throwException("doStartBidding.alreadyAugmenting", 01287 "auction with ID %s already preprocessing", 01288 auctionId.toString().c_str()); 01289 } 01290 if (inFlight.count(auctionId)) { 01291 throwException("doStartBidding.alreadyInFlight", 01292 "auction with ID %s already in progress", 01293 auctionId.toString().c_str()); 01294 } 01295 #if 0 01296 if (findAuction(finished, auctionId)) { 01297 throwException("doStartBidding.alreadyFinished", 01298 "auction with ID %s already finished", 01299 auctionId.toString().c_str()); 01300 } 01301 #endif 01302 01303 //cerr << "doStartBidding " << auctionId << endl; 01304 01305 auto groupAgents = augInfo->potentialGroups; 01306 01307 AuctionInfo & auctionInfo = addAuction(augInfo->auction, 01308 augInfo->lossTimeout); 01309 auto auction = augInfo->auction; 01310 01311 Date now = Date::now(); 01312 01313 auction->inStartBidding = now; 01314 01315 double timeLeftMs = auction->timeAvailable(now) * 1000.0; 01316 double timeUsedMs = auction->timeUsed(now) * 1000.0; 01317 01318 bool traceAuction = auction->id.hash() % 10 == 0; 01319 01320 const AugmentationList& augList = augInfo->auction->augmentations; 01321 01322 /* For each round-robin group, send the request off to exactly one 01323 element. */ 01324 for (auto it = groupAgents.begin(), end = groupAgents.end(); 01325 it != end; ++it) { 01326 01327 GroupPotentialBidders & bidders = *it; 01328 01329 for (unsigned i = 0; i < bidders.size(); ++i) { 01330 PotentialBidder & bidder = bidders[i]; 01331 if (!agents.count(bidder.agent)) continue; 01332 AgentInfo & info = agents[bidder.agent]; 01333 const AgentConfig & config = *bidder.config; 01334 01335 auto doFilterStat = [&] (const char * reason) 01336 { 01337 if (!traceAuction) return; 01338 01339 this->recordHit("accounts.%s.filter.%s", 01340 config.account.toString('.'), 01341 reason); 01342 }; 01343 01344 auto doFilterMetric = [&] (const char * reason, float val) 01345 { 01346 if (!traceAuction) return; 01347 01348 this->recordOutcome(val, "accounts.%s.filter.%s", 01349 config.account.toString('.'), 01350 reason); 01351 }; 01352 01353 01354 doFilterStat("intoDynamicFilters"); 01355 01356 /* Check if we have too many in flight. */ 01357 if (info.numBidsInFlight() >= info.config->maxInFlight) { 01358 ++info.stats->tooManyInFlight; 01359 bidder.inFlightProp = PotentialBidder::NULL_PROP; 01360 doFilterStat("dynamic.tooManyInFlight"); 01361 continue; 01362 } 01363 01364 /* Check if we have enough time to process it. */ 01365 if (config.minTimeAvailableMs != 0.0 01366 && timeLeftMs < config.minTimeAvailableMs) { 01367 01368 static ML::Spinlock lock; 01369 01370 if (auction->id.hash() % 1000 == 999 && 01371 lock.try_lock()) { 01372 01373 Date now = Date::now(); 01374 Date last = auction->start; 01375 auto printTime 01376 = [&] (const char * what, const Date & date) 01377 { 01378 cerr << ML::format("%-30s %s %10.3f %10.3f\n", 01379 what, 01380 date.print(6).c_str(), 01381 auction->start.secondsSince(date) 01382 * 1000.0, 01383 last.secondsSince(date) 01384 * 1000.0); 01385 last = date; 01386 }; 01387 01388 cerr << "no time available in dynamic" << endl; 01389 printTime("start", auction->start); 01390 printTime("doneParsing", auction->doneParsing); 01391 printTime("inPrepro", auction->inPrepro); 01392 printTime("outOfPrepro", auction->outOfPrepro); 01393 printTime("doneAugmenting", auction->doneAugmenting); 01394 printTime("inStartBidding", auction->inStartBidding); 01395 printTime("expiry", auction->expiry); 01396 printTime("now", now); 01397 01398 lock.unlock(); 01399 } 01400 01401 ML::atomic_inc(info.stats->notEnoughTime); 01402 bidder.inFlightProp = PotentialBidder::NULL_PROP; 01403 doFilterStat("dynamic.notEnoughTime"); 01404 doFilterMetric("metric.timeUsedBeforeDynamicFilter", 01405 timeUsedMs); 01406 doFilterMetric("metric.timeLeftBeforeDynamicFilter", 01407 timeLeftMs); 01408 doFilterMetric("metric.timeElapsedBeforePreproMs", 01409 auction->start.secondsUntil(auction->inPrepro) * 1000.0); 01410 doFilterMetric("metric.timeElapsedDuringPreproMs", 01411 auction->inPrepro.secondsUntil(auction->outOfPrepro) * 1000.0); 01412 doFilterMetric("metric.timeWindowMs", 01413 auction->expiry.secondsSince(auction->start) * 1000.0 - info.config->minTimeAvailableMs); 01414 continue; 01415 } 01416 01417 /* Filter on the augmentation tags */ 01418 vector<string> tags = augList.tagsForAccount(config.account); 01419 if (!config.augmentationFilter.anyIsIncluded(tags)) { 01420 ML::atomic_inc(info.stats->augmentationTagsExcluded); 01421 doFilterStat("dynamic.augmentationTagsFiltered"); 01422 continue; 01423 } 01424 01425 01426 /* Check that there is no blacklist hit on the user. */ 01427 if (config.hasBlacklist() 01428 && blacklist.matches(*auction->request, bidder.agent, 01429 config)) { 01430 ML::atomic_inc(info.stats->userBlacklisted); 01431 doFilterStat("dynamic.userBlacklisted"); 01432 continue; 01433 } 01434 01435 bidder.inFlightProp 01436 = info.numBidsInFlight() / max(info.config->maxInFlight, 1); 01437 01438 ML::atomic_inc(info.stats->passedDynamicFilters); 01439 doFilterStat("passedDynamicFilters"); 01440 } 01441 01442 // Sort the roundrobin infos to find the best one 01443 std::sort(bidders.begin(), bidders.end()); 01444 01445 int numBest = 1; 01446 float bestInFlightProp = bidders[0].inFlightProp; 01447 01448 if (bestInFlightProp == PotentialBidder::NULL_PROP) { 01449 // Excluded because too many in flight 01450 //cerr << "TOO MANY IN FLIGHT" << endl; 01451 continue; 01452 } 01453 01454 for (; numBest < bidders.size(); ++numBest) { 01455 float inFlightProp = bidders[numBest].inFlightProp; 01456 if (inFlightProp <= bestInFlightProp) continue; 01457 break; 01458 } 01459 01460 // Take a random one from all which are equally good 01461 int best = random() % numBest; 01462 01463 // Best one is the first one 01464 PotentialBidder & winner = bidders[best]; 01465 string agent = winner.agent; 01466 01467 if (!agents.count(agent)) { 01468 //cerr << "!!!AGENT IS GONE" << endl; 01469 continue; // agent is gone 01470 } 01471 AgentInfo & info = agents[agent]; 01472 01473 ++info.stats->auctions; 01474 01475 Augmentation aug 01476 = auction->augmentations.filterForAccount(winner.config->account); 01477 auction->agentAugmentations[agent] = chomp(aug.toJson().toString()); 01478 01479 //auctionInfo.activities.push_back("sent to " + agent); 01480 01481 BidInfo bidInfo; 01482 bidInfo.agentConfig = winner.config; 01483 bidInfo.bidTime = Date::now(); 01484 bidInfo.imp = winner.imp; 01485 01486 auctionInfo.bidders.insert(make_pair(agent, std::move(bidInfo))); // create empty bid response 01487 if (!info.trackBidInFlight(auctionId, bidInfo.bidTime)) 01488 throwException("doStartBidding.agentAlreadyBidding", 01489 "agent %s is already processing auction %s", 01490 agent.c_str(), 01491 auctionId.toString().c_str()); 01492 01493 //cerr << "sending to agent " << agent << endl; 01494 //cerr << fName << " sending AUCTION message " << endl;c 01495 /* Convert to JSON to send it on. */ 01496 sendAgentMessage(agent, 01497 "AUCTION", 01498 auction->start, 01499 auctionId, 01500 info.getBidRequestEncoding(*auction), 01501 info.encodeBidRequest(*auction), 01502 winner.imp.toJsonStr(), 01503 toString(timeLeftMs), 01504 auction->agentAugmentations[agent]); 01505 01506 //cerr << "done" << endl; 01507 } 01508 01509 //cerr << " auction " << id << " with " 01510 // << auctionInfo.bidders.size() << " bidders" << endl; 01511 01512 //auctionInfo.activities.push_back(ML::format("total of %zd agents", 01513 // auctionInfo.bidders.size())); 01514 if (auction->tooLate()) { 01515 recordHit("tooLateAfterRouting"); 01516 // Unwind everything? 01517 } 01518 01519 if (auctionInfo.bidders.empty()) { 01520 /* No bidders; don't bother with the bid */ 01521 ML::atomic_inc(numNoBidders); 01522 inFlight.erase(auctionId); 01523 //cerr << fName << "About to call finish " << endl; 01524 if (!auction->finish()) { 01525 recordHit("tooLateToFinish"); 01526 //cerr << "couldn't finish auction 1 " << auction->id << endl; 01527 } 01528 } 01529 01530 debugAuction(auctionId, "AUCTION"); 01531 } catch (const std::exception & exc) { 01532 cerr << "warning: auction threw exception: " << exc.what() << endl; 01533 if (augInfo) 01534 augInfo->auction->setError("auction processing error", exc.what()); 01535 } 01536 } 01537 01538 AuctionInfo & 01539 Router:: 01540 addAuction(std::shared_ptr<Auction> auction, Date lossTimeout) 01541 { 01542 const Id & id = auction->id; 01543 01544 double bidMemoryWindow = 5.0; // how many seconds we remember auctions 01545 01546 try { 01547 AuctionInfo & result 01548 = inFlight.insert(id, AuctionInfo(auction, lossTimeout), 01549 getCurrentTime().plusSeconds(bidMemoryWindow)); 01550 return result; 01551 } catch (const std::exception & exc) { 01552 //cerr << "====================================" << endl; 01553 //cerr << exc.what() << endl; 01554 throwException("addAuction.alreadyInProgress", 01555 "auction with ID %s already in progress: %s", 01556 id.toString().c_str(), exc.what()); 01557 } 01558 } 01559 01560 01561 static bool failBid(double proportion) 01562 { 01563 if (proportion < 0.01) 01564 return false; 01565 01566 return (random() % 100) < floor(proportion * 100.0); 01567 } 01568 01569 void 01570 Router:: 01571 doBid(const std::vector<std::string> & message) 01572 { 01573 //static const char *fName = "Router::doBid:"; 01574 if (failBid(bidsErrorRate)) { 01575 returnErrorResponse(message, "Intentional error response (--bids-error-rate)"); 01576 return; 01577 } 01578 01579 Date dateGotBid = Date::now(); 01580 01581 RouterProfiler profiler(dutyCycleCurrent.nsBid); 01582 01583 ML::atomic_inc(numBids); 01584 01585 if (message.size() < 4 || message.size() > 5) { 01586 returnErrorResponse(message, "BID message has 3-4 parts"); 01587 return; 01588 } 01589 01590 static std::map<const char *, unsigned long long> times; 01591 01592 static Date lastPrinted = Date::now(); 01593 01594 if (lastPrinted.secondsUntil(dateGotBid) > 10.0) { 01595 #if 0 01596 unsigned long long total = 0; 01597 for (auto it = times.begin(), end = times.end(); it != end; ++it) 01598 total += it->second; 01599 01600 cerr << "doBid of " << total << " microseconds" << endl; 01601 cerr << "id = " << message[2] << endl; 01602 for (auto it = times.begin(), end = times.end(); 01603 it != end; ++it) { 01604 cerr << ML::format("%-30s %8lld %6.2f%%\n", 01605 it->first, 01606 it->second, 01607 100.0 * it->second / total); 01608 } 01609 #endif 01610 lastPrinted = dateGotBid; 01611 times.clear(); 01612 } 01613 01614 double current = getProfilingTime(); 01615 01616 auto doProfileEvent = [&] (int i, const char * what) 01617 { 01618 return; 01619 double after = getProfilingTime(); 01620 times[what] += microsecondsBetween(after, current); 01621 current = after; 01622 }; 01623 01624 recordHit("bid"); 01625 01626 doProfileEvent(0, "start"); 01627 01628 Id auctionId(message[2]); 01629 01630 doProfileEvent(1, "idParam"); 01631 01632 01633 01634 const string & agent = message[0]; 01635 const string & biddata = message[3]; 01636 static const string nullStr("null"); 01637 const string & meta = (message.size() >= 5 ? message[4] : nullStr); 01638 01639 doProfileEvent(1, "params"); 01640 01641 debugAuction(auctionId, "BID", message); 01642 01643 if (!agents.count(agent)) { 01644 returnErrorResponse(message, "unknown agent"); 01645 return; 01646 } 01647 01648 doProfileEvent(2, "agents"); 01649 01650 AgentInfo & info = agents[agent]; 01651 01652 /* One less in flight. */ 01653 if (!info.expireBidInFlight(auctionId)) { 01654 recordHit("bidError.agentNotBidding"); 01655 returnErrorResponse(message, "agent wasn't bidding on this auction"); 01656 return; 01657 } 01658 01659 doProfileEvent(3, "inFlight"); 01660 01661 auto it = inFlight.find(auctionId); 01662 if (it == inFlight.end()) { 01663 recordHit("bidError.unknownAuction"); 01664 returnErrorResponse(message, "unknown auction"); 01665 return; 01666 } 01667 01668 doProfileEvent(4, "account"); 01669 01670 AuctionInfo & auctionInfo = it->second; 01671 01672 auto biddersIt = auctionInfo.bidders.find(agent); 01673 if (biddersIt == auctionInfo.bidders.end()) { 01674 recordHit("bidError.agentSkippedAuction"); 01675 returnErrorResponse(message, 01676 "agent shouldn't bid on this auction"); 01677 return; 01678 } 01679 01680 auto & config = *biddersIt->second.agentConfig; 01681 01682 recordHit("accounts.%s.bids", config.account.toString('.')); 01683 01684 doProfileEvent(5, "auctionInfo"); 01685 01686 //cerr << "info.inFlight = " << info.inFlight << endl; 01687 01688 const std::vector<AdSpot> & imp = auctionInfo.auction->request->imp; 01689 01690 int numValidBids = 0; 01691 01692 auto returnInvalidBid = [&] (int i, const char * reason, 01693 const char * message, ...) 01694 { 01695 this->recordHit("bidErrors.%s"); 01696 this->recordHit("accounts.%s.bidErrors.total", 01697 config.account.toString('.')); 01698 this->recordHit("accounts.%s.bidErrors.%s", 01699 config.account.toString('.'), 01700 reason); 01701 01702 ++info.stats->invalid; 01703 01704 va_list ap; 01705 va_start(ap, message); 01706 string formatted; 01707 try { 01708 formatted = vformat(message, ap); 01709 } catch (...) { 01710 va_end(ap); 01711 throw; 01712 } 01713 va_end(ap); 01714 01715 cerr << "invalid bid for agent " << agent << ": " 01716 << formatted << endl; 01717 cerr << biddata << endl; 01718 01719 this->sendBidResponse 01720 (agent, info, BS_INVALID, this->getCurrentTime(), 01721 formatted, auctionId, 01722 i, Amount(), 01723 auctionInfo.auction.get(), 01724 biddata, Json::Value(), 01725 auctionInfo.auction->agentAugmentations[agent]); 01726 }; 01727 01728 BidInfo bidInfo(std::move(biddersIt->second)); 01729 auctionInfo.bidders.erase(biddersIt); 01730 01731 doProfileEvent(6, "bidInfo"); 01732 01733 int numPassedBids = 0; 01734 01735 Bids bids; 01736 try { 01737 bids = Bids::fromJson(biddata); 01738 } 01739 catch (const std::exception & exc) { 01740 returnInvalidBid(-1, "bidParseError", 01741 "couldn't parse bid JSON %s: %s", biddata.c_str(), exc.what()); 01742 return; 01743 } 01744 01745 doProfileEvent(6, "parsing"); 01746 01747 ExcCheckEqual(bids.size(), bidInfo.imp.size(), 01748 "invalid shape for bids array"); 01749 01750 auctionInfo.auction->addDataSources(bids.dataSources); 01751 01752 for (int i = 0; i < bids.size(); ++i) { 01753 01754 const Bid& bid = bids[i]; 01755 01756 if (bid.isNullBid()) { 01757 ++numPassedBids; 01758 continue; 01759 } 01760 01761 int spotIndex = bidInfo.imp[i].first; 01762 01763 if (bid.creativeIndex == -1) { 01764 returnInvalidBid(i, "nullCreativeField", 01765 "creative field is null in response %s", 01766 biddata.c_str()); 01767 continue; 01768 } 01769 01770 if (bid.creativeIndex < 0 01771 || bid.creativeIndex >= config.creatives.size()) 01772 { 01773 returnInvalidBid(i, "outOfRangeCreative", 01774 "parsing field 'creative' of %s: creative " 01775 "number %d out of range 0-%zd", 01776 biddata.c_str(), bid.creativeIndex, 01777 config.creatives.size()); 01778 continue; 01779 } 01780 01781 if (bid.price.isNegative() || bid.price > USD_CPM(200)) { 01782 returnInvalidBid(i, "invalidPrice", 01783 "bid price of %s is outside range of $0-$200 CPM" 01784 "(%s) parsing bid %s", 01785 bid.price.toString().c_str(), 01786 USD_CPM(200).toString().c_str(), 01787 biddata.c_str()); 01788 continue; 01789 } 01790 01791 const Creative & creative = config.creatives.at(bid.creativeIndex); 01792 01793 if (!creative.compatible(imp[spotIndex])) { 01794 #if 1 01795 cerr << "creative not compatible with spot: " << endl; 01796 cerr << "auction: " << auctionInfo.auction->requestStr 01797 << endl; 01798 cerr << "config: " << config.toJson() << endl; 01799 cerr << "bid: " << biddata << endl; 01800 cerr << "spot: " << imp[i].toJson() << endl; 01801 cerr << "spot num: " << spotIndex << endl; 01802 cerr << "bid num: " << i << endl; 01803 cerr << "creative num: " << bid.creativeIndex << endl; 01804 cerr << "creative: " << creative.toJson() << endl; 01805 #endif 01806 returnInvalidBid(i, "creativeNotCompatibleWithSpot", 01807 "creative %s not compatible with spot %s", 01808 creative.toJson().toString().c_str(), 01809 imp[spotIndex].toJson().toString().c_str()); 01810 continue; 01811 } 01812 01813 if (!creative.biddable(auctionInfo.auction->request->exchange, 01814 auctionInfo.auction->request->protocolVersion)) { 01815 returnInvalidBid(i, "creativeNotBiddableOnExchange", 01816 "creative not biddable on exchange/version"); 01817 continue; 01818 } 01819 01820 doProfileEvent(6, "creativeCompatibility"); 01821 01822 string auctionKey 01823 = auctionId.toString() + "-" 01824 + imp[spotIndex].id.toString() + "-" 01825 + agent; 01826 01827 if (!banker->authorizeBid(config.account, auctionKey, bid.price) 01828 || failBid(budgetErrorRate)) 01829 { 01830 ++info.stats->noBudget; 01831 const string& agentAugmentations = 01832 auctionInfo.auction->agentAugmentations[agent]; 01833 01834 this->sendBidResponse(agent, info, BS_NOBUDGET, 01835 this->getCurrentTime(), 01836 "guaranteed", auctionId, 0, Amount(), 01837 auctionInfo.auction.get(), 01838 biddata, meta, agentAugmentations); 01839 this->logMessage("NOBUDGET", agent, auctionId, 01840 biddata, meta); 01841 continue; 01842 } 01843 01844 doProfileEvent(6, "banker"); 01845 01846 if (doDebug) 01847 this->debugSpot(auctionId, imp[spotIndex].id, 01848 ML::format("BID %s %s %f", 01849 auctionKey.c_str(), 01850 bid.price.toString().c_str(), 01851 (double)bid.priority)); 01852 01853 Auction::Price bidprice(bid.price, bid.priority); 01854 Auction::Response response( 01855 bidprice, 01856 creative.tagId, 01857 config.account, 01858 config.test, 01859 agent, 01860 biddata, 01861 meta, 01862 info.config, 01863 config.visitChannels, 01864 bid.creativeIndex); 01865 01866 response.creativeName = creative.name; 01867 response.creativeId = creative.id; 01868 01869 Auction::WinLoss localResult 01870 = auctionInfo.auction->setResponse(spotIndex, response); 01871 01872 doProfileEvent(6, "bidSubmission"); 01873 ++numValidBids; 01874 01875 // Possible results: 01876 // PENDING: we're currently winning the local auction 01877 // LOSS: we lost the local auction 01878 // TOOLATE: we bid too late 01879 // INVALID: bid was invalid 01880 01881 string msg = Auction::Response::print(localResult); 01882 01883 if (doDebug) 01884 this->debugSpot(auctionId, imp[spotIndex].id, 01885 ML::format("BID %s %s", 01886 auctionKey.c_str(), msg.c_str())); 01887 01888 01889 switch (localResult) { 01890 case Auction::PENDING: { 01891 ++info.stats->bids; 01892 info.stats->totalBid += bid.price; 01893 break; // response will be sent later once local winning bid known 01894 } 01895 case Auction::LOSS: 01896 ++info.stats->bids; 01897 info.stats->totalBid += bid.price; 01898 // fall through 01899 case Auction::TOOLATE: 01900 case Auction::INVALID: { 01901 if (localResult == Auction::TOOLATE) 01902 ++info.stats->tooLate; 01903 else if (localResult == Auction::INVALID) 01904 ++info.stats->invalid; 01905 01906 banker->cancelBid(config.account, auctionKey); 01907 01908 BidStatus status; 01909 switch (localResult) { 01910 case Auction::LOSS: status = BS_LOSS; break; 01911 case Auction::TOOLATE: status = BS_TOOLATE; break; 01912 case Auction::INVALID: status = BS_INVALID; break; 01913 default: 01914 throw ML::Exception("logic error"); 01915 } 01916 01917 const string& agentAugmentations = 01918 auctionInfo.auction->agentAugmentations[agent]; 01919 01920 this->sendBidResponse(agent, info, status, 01921 this->getCurrentTime(), 01922 "guaranteed", auctionId, 0, Amount(), 01923 auctionInfo.auction.get(), 01924 biddata, meta, agentAugmentations); 01925 this->logMessage(msg, agent, auctionId, biddata, meta); 01926 continue; 01927 } 01928 case Auction::WIN: 01929 this->throwException("doBid.localWinsNotPossible", 01930 "local wins can't be known until auction has closed"); 01931 01932 default: 01933 this->throwException("doBid.unknownBidResult", 01934 "unknown bid result returned by auction"); 01935 } 01936 01937 doProfileEvent(6, "bidResponse"); 01938 } 01939 01940 if (numValidBids > 0) { 01941 //logMessage("BID", agent, auctionId, biddata, meta); 01942 ML::atomic_add(numNonEmptyBids, 1); 01943 } 01944 else if (numPassedBids > 0) { 01945 // Passed on the ... add to the blacklist 01946 if (config.hasBlacklist()) { 01947 const BidRequest & bidRequest = *auctionInfo.auction->request; 01948 blacklist.add(bidRequest, agent, *info.config); 01949 } 01950 doProfileEvent(8, "blacklist"); 01951 } 01952 01953 doProfileEvent(8, "postParsing"); 01954 01955 double bidTime = dateGotBid.secondsSince(bidInfo.bidTime); 01956 01957 //cerr << "now " << auctionInfo.bidders.size() << " bidders" << endl; 01958 01959 //cerr << "campaign " << info.config->campaign << " bidTime " 01960 // << 1000.0 * bidTime << endl; 01961 01962 recordOutcome(1000.0 * bidTime, 01963 "accounts.%s.bidResponseTimeMs", 01964 config.account.toString('.')); 01965 01966 doProfileEvent(9, "postTiming"); 01967 01968 if (auctionInfo.bidders.empty()) { 01969 debugAuction(auctionId, "FINISH", message); 01970 if (!auctionInfo.auction->finish()) { 01971 debugAuction(auctionId, "FINISH TOO LATE", message); 01972 } 01973 inFlight.erase(auctionId); 01974 //cerr << "couldn't finish auction " << auctionInfo.auction->id 01975 //<< " after bid " << message << endl; 01976 } 01977 01978 doProfileEvent(10, "finishAuction"); 01979 01980 // TODO: clean up if no bids were made? 01981 #if 0 01982 // Bids must be the same shape as the bid info or empty 01983 if (bidInfo.imp.size() != bids.size() && bids.size() != 0) { 01984 ++info.stats->bidErrors; 01985 returnInvalidBid(-1, "wrongBidResponseShape", 01986 "number of imp in bid request doesn't match " 01987 "those in bid: %d vs %d", 01988 bidInfo.imp.size(), bids.size(), 01989 bidInfo.imp.toJson().toString().c_str(), 01990 biddata.c_str()); 01991 01992 if (auctionInfo.bidders.empty()) { 01993 auctionInfo.auction->finish(); 01994 inFlight.erase(auctionId); 01995 } 01996 } 01997 #endif 01998 } 01999 02000 void 02001 Router:: 02002 doSubmitted(std::shared_ptr<Auction> auction) 02003 { 02004 // Auction was submitted 02005 02006 // Either a) move it across to the win queue, or b) drop it if we 02007 // didn't bid anything 02008 02009 RouterProfiler profiler(dutyCycleCurrent.nsSubmitted); 02010 02011 const Id & auctionId = auction->id; 02012 02013 #if 0 // debug 02014 if (recentlySubmitted.count(auctionId)) { 02015 cerr << "ERROR: auction" << auctionId << " was double submitted" 02016 << endl; 02017 return; 02018 } 02019 recentlySubmitted.insert(auctionId); 02020 #endif 02021 02022 //cerr << "SUBMITTED " << auctionId << endl; 02023 02024 const std::vector<std::vector<Auction::Response> > & allResponses 02025 = auction->getResponses(); 02026 02027 if (doDebug) 02028 debugAuction(auctionId, ML::format("SUBMITTED %d slots", 02029 (int)allResponses.size()), 02030 {}); 02031 02032 //ExcAssertEqual(allResponses.size(), 02033 // auction->bidRequest->imp.size()); 02034 //cerr << "got a win for auction id " << auctionId << " with num imp:" << allResponses.size() << endl; 02035 // Go through the imp one by one 02036 for (unsigned spotNum = 0; spotNum < allResponses.size(); ++spotNum) { 02037 02038 bool hasSubmittedBid = false; 02039 Id spotId = auction->request->imp[spotNum].id; 02040 02041 const std::vector<Auction::Response> & responses 02042 = allResponses[spotNum]; 02043 02044 if (doDebug) 02045 debugSpot(auctionId, spotId, 02046 ML::format("has %zd bids", responses.size())); 02047 02048 // For all but the winning bid we tell them what's going on 02049 for (unsigned i = 0; i < responses.size(); ++i) { 02050 02051 const Auction::Response & response = responses[i]; 02052 Auction::WinLoss status = response.localStatus; 02053 02054 //cerr << "got a response " << response.toJson() << endl; 02055 //cerr << "response.valid() = " << response.valid() << endl; 02056 02057 // Don't deal with response 0 02058 if (i == 0 && response.valid() && response.localStatus == Auction::WIN) { 02059 hasSubmittedBid = true; 02060 continue; 02061 } 02062 02063 //cerr << "doing response " << i << endl; 02064 02065 if (!agents.count(response.agent)) continue; 02066 02067 AgentInfo & info = agents[response.agent]; 02068 02069 Amount bid_price = response.price.maxPrice; 02070 02071 string auctionKey 02072 = auctionId.toString() + "-" 02073 + spotId.toString() + "-" 02074 + response.agent; 02075 02076 // Make sure we account for the bid no matter what 02077 ML::Call_Guard guard 02078 ([&] () 02079 { 02080 banker->cancelBid(response.agentConfig->account, auctionKey); 02081 }); 02082 02083 // No bid 02084 if (bid_price == 0 && response.price.priority == 0) { 02085 cerr << "warning: auction had no bid result" << endl; 02086 continue; 02087 } 02088 02089 string msg; 02090 BidStatus bidStatus(BS_INVALID); 02091 02092 switch (status) { 02093 case Auction::PENDING: 02094 throwException("doSubmitted.shouldNotBePending", 02095 "non-winning auction should not be pending"); 02096 case Auction::WIN: 02097 if(i == 0) break; 02098 throwException("doSubmitted.shouldNotBeWin", 02099 "auction should not be a win"); 02100 case Auction::INVALID: 02101 throwException("doSubmitted.shouldNotBeInvalid", 02102 "auction should not be invalid"); 02103 case Auction::LOSS: 02104 bidStatus = BS_LOSS; 02105 ++info.stats->losses; 02106 msg = "LOSS"; 02107 break; 02108 case Auction::TOOLATE: 02109 bidStatus = BS_TOOLATE; 02110 ++info.stats->tooLate; 02111 msg = "TOOLATE"; 02112 break; 02113 default: 02114 throwException("doSubmitted.unknownStatus", 02115 "unknown auction local status"); 02116 }; 02117 02118 if (doDebug) 02119 debugSpot(auctionId, spotId, 02120 ML::format("%s %s", 02121 msg.c_str(), 02122 auctionKey.c_str())); 02123 02124 string confidence = "guaranteed"; 02125 02126 //cerr << fName << "sending agent message of type " << msg << endl; 02127 sendBidResponse(response.agent, info, bidStatus, 02128 this->getCurrentTime(), 02129 confidence, auctionId, 02130 0, Amount(), 02131 auction.get(), 02132 response.bidData, 02133 response.meta, 02134 auction->agentAugmentations[response.agent]); 02135 } 02136 02137 // If we didn't actually submit a bid then nothing else to do 02138 if (!hasSubmittedBid) continue; 02139 02140 ML::atomic_add(numAuctionsWithBid, 1); 02141 //cerr << fName << "injecting submitted auction " << endl; 02142 02143 onSubmittedAuction(auction, spotId, responses[0]); 02144 //postAuctionLoop.injectSubmittedAuction(auction, spotId, responses[0]); 02145 } 02146 02147 //cerr << "auction.use_count() = " << auction.use_count() << endl; 02148 02149 if (auction.unique()) { 02150 auctionGraveyard.tryPush(auction); 02151 } 02152 } 02153 02154 std::string 02155 reduceUrl(const Url & url) 02156 { 02157 static const boost::regex rex(".*://(www.)?([^/]+)"); 02158 boost::match_results<string::const_iterator> mr; 02159 string s = url.toString(); 02160 if (!boost::regex_search(s, mr, rex)) { 02161 //cerr << "warning: nothing matched in URL " << url << endl; 02162 return s; 02163 } 02164 02165 if (mr.size() != 3) { 02166 cerr << "warning: wrong match results size " 02167 << mr.size() << " in URL " << url << endl; 02168 return s; 02169 } 02170 02171 //cerr << "url " << url << " reduced to " << mr.str(2) << endl; 02172 02173 return mr.str(2); 02174 } 02175 02176 02177 void 02178 Router:: 02179 onNewAuction(std::shared_ptr<Auction> auction) 02180 { 02181 if (!monitorClient.getStatus()) { 02182 Date now = Date::now(); 02183 02184 if ((uint32_t) slowModeLastAuction.secondsSinceEpoch() 02185 < (uint32_t) now.secondsSinceEpoch()) { 02186 slowModeLastAuction = now; 02187 slowModeCount = 1; 02188 recordHit("monitor.systemInSlowMode"); 02189 } 02190 else { 02191 slowModeCount++; 02192 } 02193 02194 if (slowModeCount > 100) { 02195 /* we only let the first 100 auctions take place each second */ 02196 recordHit("monitor.ignoredAuctions"); 02197 auction->finish(); 02198 return; 02199 } 02200 } 02201 02202 //cerr << "AUCTION GOT THROUGH" << endl; 02203 02204 //logMessage("AUCTION", auction->id, auction->requestStr); 02205 const BidRequest & request = *auction->request; 02206 int numFields = 0; 02207 if (!request.url.empty()) ++numFields; 02208 if (request.userIds.exchangeId) ++numFields; 02209 if (request.userIds.providerId) ++numFields; 02210 02211 if (numFields > 1) { 02212 logMessageNoTimestamp("BEHAVIOUR", 02213 ML::format("%.2f", request.timestamp), 02214 request.exchange, 02215 reduceUrl(request.url), 02216 request.userIds.exchangeId, 02217 request.userIds.providerId); 02218 } 02219 auto info = preprocessAuction(auction); 02220 02221 if (info) { 02222 recordHit("auctionPassedPreprocessing"); 02223 augmentAuction(info); 02224 } 02225 else { 02226 recordHit("auctionDropped.noPotentialBidders"); 02227 ML::atomic_inc(numNoPotentialBidders); 02228 } 02229 } 02230 02231 void 02232 Router:: 02233 onAuctionDone(std::shared_ptr<Auction> auction) 02234 { 02235 #if 0 02236 static std::mutex lock; 02237 std::unique_lock<std::mutex> guard(lock); 02238 02239 cerr << endl; 02240 cerr << "Router::onAuctionDone with auction id " << auction->id << endl; 02241 backtrace(); 02242 #endif 02243 02244 debugAuction(auction->id, "SENT SUBMITTED"); 02245 submittedBuffer.push(auction); 02246 } 02247 02248 void 02249 Router:: 02250 updateAllAgents() 02251 { 02252 for (;;) { 02253 02254 auto_ptr<AllAgentInfo> newInfo(new AllAgentInfo); 02255 02256 AllAgentInfo * current = allAgents; 02257 02258 for (auto it = agents.begin(), end = agents.end(); it != end; ++it) { 02259 if (!it->second.configured) continue; 02260 if (!it->second.config) continue; 02261 if (!it->second.stats) continue; 02262 if (!it->second.status) continue; 02263 if (it->second.status->dead) continue; 02264 02265 AgentInfoEntry entry; 02266 entry.name = it->first; 02267 entry.config = it->second.config; 02268 entry.stats = it->second.stats; 02269 entry.status = it->second.status; 02270 int i = newInfo->size(); 02271 newInfo->push_back(entry); 02272 02273 newInfo->agentIndex[it->first] = i; 02274 newInfo->accountIndex[it->second.config->account].push_back(i); 02275 } 02276 02277 if (ML::cmp_xchg(allAgents, current, newInfo.get())) { 02278 newInfo.release(); 02279 ExcAssertNotEqual(current, allAgents); 02280 if (current) 02281 allAgentsGc.defer([=] () { delete current; }); 02282 break; 02283 } 02284 } 02285 } 02286 02287 void 02288 Router:: 02289 doConfig(const std::string & agent, 02290 std::shared_ptr<const AgentConfig> config) 02291 { 02292 RouterProfiler profiler(dutyCycleCurrent.nsConfig); 02293 //const string fName = "Router::doConfig:"; 02294 logMessage("CONFIG", agent, boost::trim_copy(config->toJson().toString())); 02295 02296 // TODO: no need for this... 02297 auto newConfig = std::make_shared<AgentConfig>(*config); 02298 if (newConfig->roundRobinGroup == "") 02299 newConfig->roundRobinGroup = agent; 02300 02301 AgentInfo & info = agents[agent]; 02302 02303 if (info.configured) { 02304 unconfigure(agent, *info.config); 02305 info.configured = false; 02306 } 02307 02308 info.config = newConfig; 02309 //cerr << "configured " << agent << " strategy : " << info.config->strategy << " campaign " 02310 // << info.config->campaign << endl; 02311 02312 string bidRequestFormat = "jsonRaw"; 02313 info.setBidRequestFormat(bidRequestFormat); 02314 02315 configure(agent, *newConfig); 02316 info.configured = true; 02317 sendAgentMessage(agent, "GOTCONFIG", getCurrentTime()); 02318 02319 // Broadcast that we have a new agent or it has a new configuration 02320 updateAllAgents(); 02321 } 02322 02323 void 02324 Router:: 02325 unconfigure(const std::string & agent, const AgentConfig & config) 02326 { 02327 } 02328 02329 void 02330 Router:: 02331 configure(const std::string & agent, AgentConfig & config) 02332 { 02333 if (config.account.empty()) 02334 throw ML::Exception("attempt to add an account with empty values"); 02335 // TODO: async 02336 02337 bool includeReasons = true; 02338 02339 // For each exchange, check campaign and creative compatibility 02340 auto onExchange = [&] (const std::shared_ptr<ExchangeConnector> & exch) 02341 { 02342 string exchangeName = exch->exchangeName(); 02343 02344 cerr << "scanning campaign with exchange " << exchangeName << endl; 02345 ExchangeConnector::ExchangeCompatibility ecomp 02346 = exch->getCampaignCompatibility(config, includeReasons); 02347 if (!ecomp.isCompatible) { 02348 cerr << "campaign not compatible: " << ecomp.reasons << endl; 02349 return; 02350 } 02351 02352 int numCompatibleCreatives = 0; 02353 02354 for (auto & c: config.creatives) { 02355 ExchangeConnector::ExchangeCompatibility ccomp 02356 = exch->getCreativeCompatibility(c, includeReasons); 02357 if (!ccomp.isCompatible) { 02358 cerr << "creative not compatible: " << ccomp.reasons << endl; 02359 return; 02360 } 02361 02362 std::lock_guard<ML::Spinlock> guard(c.lock); 02363 c.providerData[exchangeName] = ccomp.info; 02364 ++numCompatibleCreatives; 02365 } 02366 02367 if (numCompatibleCreatives == 0) { 02368 cerr << "no compatible creatives" << endl; 02369 return; 02370 } 02371 02372 std::lock_guard<ML::Spinlock> guard(config.lock); 02373 config.providerData[exchangeName] = ecomp.info; 02374 }; 02375 02376 forAllExchanges(onExchange); 02377 02378 auto onDone = [=] (std::exception_ptr exc, ShadowAccount&& ac) 02379 { 02380 //cerr << "got spend account for " << agent << ac << endl; 02381 if (exc) 02382 logException(exc, "Banker addAccount"); 02383 }; 02384 02385 banker->addSpendAccount(config.account, USD(0.10), onDone); 02386 } 02387 02388 Json::Value 02389 Router:: 02390 getStats() const 02391 { 02392 return Json::Value(); 02393 #if 0 02394 sendMesg(control(), "STATS"); 02395 vector<string> stats = recvAll(control()); 02396 02397 Json::Value result = Json::parse(stats.at(0)); 02398 02399 return result; 02400 #endif 02401 } 02402 02403 Json::Value 02404 Router:: 02405 getAgentInfo(const std::string & agent) const 02406 { 02407 return getAgentEntry(agent).toJson(); 02408 } 02409 02410 Json::Value 02411 Router:: 02412 getAllAgentInfo() const 02413 { 02414 Json::Value result; 02415 02416 auto onAgent = [&] (const AgentInfoEntry & info) 02417 { 02418 result[info.name] = info.toJson(); 02419 }; 02420 02421 forEachAgent(onAgent); 02422 02423 return result; 02424 } 02425 02426 void 02427 Router:: 02428 sendPings() 02429 { 02430 for (auto it = agents.begin(), end = agents.end(); 02431 it != end; ++it) { 02432 const string & agent = it->first; 02433 AgentInfo & info = it->second; 02434 02435 // 1. Send out new pings 02436 Date now = Date::now(); 02437 if (info.sendPing(0, now)) 02438 sendAgentMessage(agent, "PING0", now, "null"); 02439 if (info.sendPing(1, now)) 02440 sendAgentMessage(agent, "PING1", now, "null"); 02441 02442 // 2. Look at the trend 02443 //double mean, max; 02444 } 02445 } 02446 02447 void 02448 Router:: 02449 doPong(int level, const std::vector<std::string> & message) 02450 { 02451 //cerr << "dopong (router)" << message << endl; 02452 02453 string agent = message.at(0); 02454 Date sentTime = Date::parseSecondsSinceEpoch(message.at(2)); 02455 Date receivedTime = Date::parseSecondsSinceEpoch(message.at(3)); 02456 Date now = Date::now(); 02457 02458 double roundTripTime = now.secondsSince(sentTime); 02459 double outgoingTime = receivedTime.secondsSince(sentTime); 02460 double incomingTime = now.secondsSince(receivedTime); 02461 02462 auto it = agents.find(agent); 02463 if (it == agents.end()) { 02464 cerr << "warning: dead agent sent a pong: " << agent << endl; 02465 return; 02466 } 02467 02468 if (!it->second.configured) 02469 return; 02470 02471 auto & info = it->second; 02472 info.gotPong(level, sentTime, receivedTime, now); 02473 02474 const string & account = it->second.config->account.toString('.'); 02475 recordOutcome(roundTripTime * 1000.0, 02476 "accounts.%s.ping%d.roundTripTimeMs", account, level); 02477 recordOutcome(outgoingTime * 1000.0, 02478 "accounts.%s.ping%d.outgoingTimeMs", account, level); 02479 recordOutcome(incomingTime * 1000.0, 02480 "accounts.%s.ping%d.incomingTimeMs", account, level); 02481 } 02482 02483 void 02484 Router:: 02485 sendBidResponse(const std::string & agent, 02486 const AgentInfo & info, 02487 BidStatus status, 02488 Date timestamp, 02489 const std::string & message, 02490 const Id & auctionId, 02491 int spotNum, 02492 Amount price, 02493 const Auction * auction, 02494 const std::string & bidData, 02495 const Json::Value & metadata, 02496 const std::string & augmentationsStr) 02497 { 02498 BidResultFormat format; 02499 switch (status) { 02500 case BS_WIN: format = info.config->winFormat; break; 02501 case BS_LOSS: format = info.config->lossFormat; break; 02502 default: format = info.config->errorFormat; break; 02503 } 02504 02505 const char * statusStr = bidStatusToChar(status); 02506 02507 switch (format) { 02508 case BRF_FULL: 02509 sendAgentMessage(agent, statusStr, timestamp, message, auctionId, 02510 to_string(spotNum), 02511 price.toString(), 02512 (auction ? info.getBidRequestEncoding(*auction) : ""), 02513 (auction ? info.encodeBidRequest(*auction) : ""), 02514 bidData, metadata, augmentationsStr); 02515 break; 02516 02517 case BRF_LIGHTWEIGHT: 02518 sendAgentMessage(agent, statusStr, timestamp, message, auctionId, 02519 to_string(spotNum), price.toString()); 02520 break; 02521 02522 case BRF_NONE: 02523 break; 02524 } 02525 } 02526 02527 void 02528 Router:: 02529 forEachAgent(const OnAgentFn & onAgent) const 02530 { 02531 GcLock::SharedGuard guard(allAgentsGc); 02532 const AllAgentInfo * ac = allAgents; 02533 if (!ac) return; 02534 02535 std::for_each(ac->begin(), ac->end(), onAgent); 02536 } 02537 02538 void 02539 Router:: 02540 forEachAccountAgent(const AccountKey & account, 02541 const OnAgentFn & onAgent) const 02542 { 02543 GcLock::SharedGuard guard(allAgentsGc); 02544 const AllAgentInfo * ac = allAgents; 02545 if (!ac) return; 02546 02547 auto it = ac->accountIndex.find(account); 02548 if (it == ac->accountIndex.end()) 02549 return; 02550 02551 for (auto jt = it->second.begin(), jend = it->second.end(); 02552 jt != jend; ++jt) 02553 onAgent(ac->at(*jt)); 02554 } 02555 02556 AgentInfoEntry 02557 Router:: 02558 getAgentEntry(const std::string & agent) const 02559 { 02560 GcLock::SharedGuard guard(allAgentsGc); 02561 const AllAgentInfo * ac = allAgents; 02562 if (!ac) return AgentInfoEntry(); 02563 02564 auto it = ac->agentIndex.find(agent); 02565 if (it == ac->agentIndex.end()) 02566 return AgentInfoEntry(); 02567 return ac->at(it->second); 02568 } 02569 02570 void 02571 Router:: 02572 submitToPostAuctionService(std::shared_ptr<Auction> auction, 02573 Id adSpotId, 02574 const Auction::Response & bid) 02575 { 02576 #if 0 02577 static std::mutex lock; 02578 std::unique_lock<std::mutex> guard(lock); 02579 02580 cerr << endl; 02581 cerr << "submitted auction " << auction->id << "," 02582 << adSpotId << endl; 02583 02584 backtrace(); 02585 #endif 02586 string auctionKey = auction->id.toString() 02587 + "-" + adSpotId.toString() 02588 + "-" + bid.agent; 02589 banker->detachBid(bid.account, auctionKey); 02590 02591 SubmittedAuctionEvent event; 02592 event.auctionId = auction->id; 02593 event.adSpotId = adSpotId; 02594 event.lossTimeout = auction->lossAssumed; 02595 event.augmentations = auction->agentAugmentations[bid.agent]; 02596 event.bidRequest = auction->request; 02597 event.bidRequestStr = auction->requestStr; 02598 event.bidRequestStrFormat = auction->requestStrFormat ; 02599 event.bidResponse = bid; 02600 02601 string str = ML::DB::serializeToString(event); 02602 02603 postAuctionEndpoint.sendMessage("AUCTION", str); 02604 02605 if (auction.unique()) { 02606 auctionGraveyard.tryPush(auction); 02607 } 02608 } 02609 02610 void 02611 Router:: 02612 throwException(const std::string & key, const std::string & fmt, ...) 02613 { 02614 recordHit("error.exception"); 02615 recordHit("error.exception.%s", key); 02616 02617 string message; 02618 va_list ap; 02619 va_start(ap, fmt); 02620 try { 02621 message = vformat(fmt.c_str(), ap); 02622 va_end(ap); 02623 } 02624 catch (...) { 02625 va_end(ap); 02626 throw; 02627 } 02628 02629 logRouterError("exception", key, message); 02630 throw ML::Exception("Router Exception: " + key + ": " + message); 02631 } 02632 02633 void 02634 Router:: 02635 debugAuctionImpl(const Id & auction, const std::string & type, 02636 const std::vector<std::string> & args) 02637 { 02638 Date now = Date::now(); 02639 boost::unique_lock<ML::Spinlock> guard(debugLock); 02640 AuctionDebugInfo & entry 02641 = debugInfo.access(auction, now.plusSeconds(30.0)); 02642 02643 entry.addAuctionEvent(now, type, args); 02644 } 02645 02646 void 02647 Router:: 02648 debugSpotImpl(const Id & auction, const Id & spot, const std::string & type, 02649 const std::vector<std::string> & args) 02650 { 02651 Date now = Date::now(); 02652 boost::unique_lock<ML::Spinlock> guard(debugLock); 02653 AuctionDebugInfo & entry 02654 = debugInfo.access(auction, now.plusSeconds(30.0)); 02655 02656 entry.addSpotEvent(spot, now, type, args); 02657 } 02658 02659 void 02660 Router:: 02661 expireDebugInfo() 02662 { 02663 boost::unique_lock<ML::Spinlock> guard(debugLock); 02664 debugInfo.expire(); 02665 } 02666 02667 void 02668 Router:: 02669 dumpAuction(const Id & auction) const 02670 { 02671 boost::unique_lock<ML::Spinlock> guard(debugLock); 02672 auto it = debugInfo.find(auction); 02673 if (it == debugInfo.end()) { 02674 //cerr << "*** unknown auction " << auction << " in " 02675 // << debugInfo.size() << endl; 02676 } 02677 else it->second.dumpAuction(); 02678 } 02679 02680 void 02681 Router:: 02682 dumpSpot(const Id & auction, const Id & spot) const 02683 { 02684 boost::unique_lock<ML::Spinlock> guard(debugLock); 02685 auto it = debugInfo.find(auction); 02686 if (it == debugInfo.end()) { 02687 //cerr << "*** unknown auction " << auction << " in " 02688 // << debugInfo.size() << endl; 02689 } 02690 else it->second.dumpSpot(spot); 02691 } 02692 02694 string 02695 Router:: 02696 getProviderName() 02697 const 02698 { 02699 return serviceName(); 02700 } 02701 02702 Json::Value 02703 Router:: 02704 getProviderIndicators() 02705 const 02706 { 02707 Json::Value value; 02708 02709 /* Router health check: 02710 - valid connection to post auction loop */ 02711 value["status"] = postAuctionEndpoint.isConnected() ? "ok" : "failure"; 02712 02713 return value; 02714 } 02715 02716 void 02717 Router:: 02718 startExchange(const std::string & exchangeType, 02719 const Json::Value & exchangeConfig) 02720 { 02721 auto exchange = ExchangeConnector:: 02722 create(exchangeType, *this, exchangeType); 02723 exchange->configure(exchangeConfig); 02724 exchange->start(); 02725 02726 addExchange(std::move(exchange)); 02727 } 02728 02729 void 02730 Router:: 02731 startExchange(const Json::Value & exchangeConfig) 02732 { 02733 std::string exchangeType = exchangeConfig["exchangeType"].asString(); 02734 startExchange(exchangeType, exchangeConfig); 02735 } 02736 02737 02738 02739 } // namespace RTBKIT