![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* augmentation.cc 00002 Jeremy Barnes, 1 March 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 How we do auction augmentation. 00006 */ 00007 00008 #include "augmentation_loop.h" 00009 #include "jml/arch/timers.h" 00010 #include "jml/arch/futex.h" 00011 #include "jml/utils/vector_utils.h" 00012 #include "jml/utils/set_utils.h" 00013 #include "jml/arch/exception_handler.h" 00014 #include "soa/service/zmq_utils.h" 00015 #include <iostream> 00016 #include <boost/make_shared.hpp> 00017 #include "rtbkit/core/agent_configuration/agent_config.h" 00018 00019 00020 using namespace std; 00021 using namespace ML; 00022 00023 00024 00025 namespace RTBKIT { 00026 00027 00028 /*****************************************************************************/ 00029 /* AUGMENTATION LOOP */ 00030 /*****************************************************************************/ 00031 00032 AugmentationLoop:: 00033 AugmentationLoop(ServiceBase & parent, 00034 const std::string & name) 00035 : ServiceBase(name, parent), 00036 allAugmentors(0), 00037 idle_(1), 00038 inbox(65536), 00039 toAugmentors(getZmqContext()) 00040 { 00041 updateAllAugmentors(); 00042 } 00043 00044 AugmentationLoop:: 00045 AugmentationLoop(std::shared_ptr<ServiceProxies> proxies, 00046 const std::string & name) 00047 : ServiceBase(name, proxies), 00048 allAugmentors(0), 00049 idle_(1), 00050 inbox(65536), 00051 toAugmentors(getZmqContext()) 00052 { 00053 updateAllAugmentors(); 00054 } 00055 00056 AugmentationLoop:: 00057 ~AugmentationLoop() 00058 { 00059 } 00060 00061 void 00062 AugmentationLoop:: 00063 init() 00064 { 00065 registerServiceProvider(serviceName(), { "rtbRouterAugmentation" }); 00066 00067 toAugmentors.init(getServices()->config, serviceName() + "/augmentors"); 00068 00069 toAugmentors.clientMessageHandler 00070 = [&] (const std::vector<std::string> & message) 00071 { 00072 //cerr << "got augmentor message " << message << endl; 00073 handleAugmentorMessage(message); 00074 }; 00075 00076 toAugmentors.bindTcp(getServices()->ports->getRange("augmentors")); 00077 00078 toAugmentors.onConnection = [=] (const std::string & client) 00079 { 00080 cerr << "augmentor " << client << " has connected" << endl; 00081 }; 00082 00083 toAugmentors.onDisconnection = [=] (const std::string & client) 00084 { 00085 cerr << "augmentor " << client << " has disconnected" << endl; 00086 }; 00087 00088 inbox.onEvent = [&] (const std::shared_ptr<Entry> & entry) 00089 { 00090 //cerr << "got event on inbox" << endl; 00091 00092 Guard guard(lock); 00093 Date now = Date::now(); 00094 00095 //cerr << "got lock on inbox" << endl; 00096 00097 // TODO: wake up loop if slower... 00098 // TODO: DRY with other function... 00099 augmenting.insert(entry->info->auction->id, entry, 00100 entry->timeout); 00101 00102 for (auto it = entry->outstanding.begin(), 00103 end = entry->outstanding.end(); 00104 it != end; ++it) { 00105 00106 auto & aug = *augmentors[*it]; 00107 00108 //cerr << "sending to " << *it << " at " 00109 // << aug.agentAddr << endl; 00110 00111 set<string> agents; 00112 const auto& bidderGroups = entry->info->potentialGroups; 00113 00114 for (auto jt = bidderGroups.begin(), end = bidderGroups.end(); 00115 jt != end; ++jt) 00116 { 00117 for (auto kt = jt->begin(), end = jt->end(); 00118 kt != end; ++kt) 00119 { 00120 agents.insert(kt->agent); 00121 } 00122 } 00123 00124 std::ostringstream availableAgentsStr; 00125 ML::DB::Store_Writer writer(availableAgentsStr); 00126 writer.save(agents); 00127 00128 // Send the message to the augmentor 00129 toAugmentors.sendMessage(aug.augmentorAddr, 00130 "AUGMENT", "1.0", *it, 00131 entry->info->auction->id.toString(), 00132 entry->info->auction->requestStrFormat, 00133 entry->info->auction->requestStr, 00134 availableAgentsStr.str(), 00135 Date::now()); 00136 00137 if (!aug.inFlight.insert 00138 (make_pair(entry->info->auction->id, now)) 00139 .second) { 00140 cerr << "warning: double augment for auction " 00141 << entry->info->auction->id << endl; 00142 } 00143 else aug.numInFlight = aug.inFlight.size(); 00144 } 00145 00146 recordLevel(Date::now().secondsSince(now), "requestTimeMs"); 00147 00148 idle_ = 0; 00149 }; 00150 00151 double lastSleepTime = 0; 00152 addPeriodic("Augmentationloop::dutyCycle", 1.0, [=] (uint64_t) mutable { 00153 double sleepTime = totalSleepSeconds(); 00154 recordLevel((sleepTime - lastSleepTime) * 1000.0, "sleepTime"); 00155 lastSleepTime = sleepTime; 00156 }); 00157 00158 addSource("AugmentationLoop::inbox", inbox); 00159 addSource("AugmentationLoop::toAugmentors", toAugmentors); 00160 addPeriodic("AugmentationLoop::checkExpiries", 0.977, 00161 [=] (int) { checkExpiries(); }); 00162 } 00163 00164 void 00165 AugmentationLoop:: 00166 start() 00167 { 00168 //toAugmentors.start(); 00169 MessageLoop::start(); 00170 } 00171 00172 void 00173 AugmentationLoop:: 00174 sleepUntilIdle() 00175 { 00176 while (!idle_) 00177 futex_wait(idle_, 0); 00178 } 00179 00180 void 00181 AugmentationLoop:: 00182 shutdown() 00183 { 00184 MessageLoop::shutdown(); 00185 toAugmentors.shutdown(); 00186 } 00187 00188 size_t 00189 AugmentationLoop:: 00190 numAugmenting() const 00191 { 00192 // TODO: can we get away without a lock here? 00193 Guard guard(lock); 00194 return augmenting.size(); 00195 } 00196 00197 bool 00198 AugmentationLoop:: 00199 currentlyAugmenting(const Id & auctionId) const 00200 { 00201 Guard guard(lock); 00202 return augmenting.count(auctionId); 00203 } 00204 00205 void 00206 AugmentationLoop:: 00207 bindAugmentors(const std::string & uri) 00208 { 00209 try { 00210 toAugmentors.bind(uri.c_str()); 00211 } catch (const std::exception & exc) { 00212 throw Exception("error while binding augmentation URI %s: %s", 00213 uri.c_str(), exc.what()); 00214 } 00215 } 00216 00217 void 00218 AugmentationLoop:: 00219 handleAugmentorMessage(const std::vector<std::string> & message) 00220 { 00221 Guard guard(lock); 00222 00223 Date now = Date::now(); 00224 00225 const std::string & type = message.at(1); 00226 if (type == "CONFIG") { 00227 doConfig(message); 00228 } 00229 else if (type == "RESPONSE") { 00230 doResponse(message); 00231 } 00232 else throw ML::Exception("error handling unknown " 00233 "augmentor message of type " 00234 + type); 00235 } 00236 00237 void 00238 AugmentationLoop:: 00239 checkExpiries() 00240 { 00241 //cerr << "checking expiries" << endl; 00242 00243 Guard guard(lock); 00244 00245 Date now = Date::now(); 00246 00247 for (auto it = augmentors.begin(), end = augmentors.end(); 00248 it != end; ++it) { 00249 00250 AugmentorInfo & aug = *it->second; 00251 00252 vector<Id> lostAuctions; 00253 00254 for (auto jt = aug.inFlight.begin(), 00255 jend = aug.inFlight.end(); 00256 jt != jend; ++jt) { 00257 if (now.secondsSince(jt->second) > 5.0) { 00258 cerr << "warning: augmentor " << it->first 00259 << " lost auction " << jt->first 00260 << endl; 00261 00262 string eventName = "augmentor." 00263 + it->first + ".lostAuction"; 00264 recordEvent(eventName.c_str()); 00265 00266 lostAuctions.push_back(jt->first); 00267 } 00268 } 00269 00270 // Delete all in flight that appear to be lost 00271 for (unsigned i = 0; i < lostAuctions.size(); ++i) 00272 aug.inFlight.erase(lostAuctions[i]); 00273 00274 string eventName = "augmentor." + it->first + ".numInFlight"; 00275 recordEvent(eventName.c_str(), ET_LEVEL, 00276 aug.inFlight.size()); 00277 } 00278 00279 #if 0 00280 vector<string> deadAugmentors; 00281 00282 for (unsigned i = 0; i < deadAugmentors.size(); ++i) { 00283 string aug = deadAugmentors[i]; 00284 augmentors.erase(aug); 00285 00286 string eventName = "augmentor." + aug + ".dead"; 00287 recordEvent(eventName.c_str()); 00288 } 00289 00290 if (!deadAugmentors.empty()) 00291 updateAllAugmentors(); 00292 00293 #endif 00294 00295 auto onExpired = [&] (const Id & id, 00296 const std::shared_ptr<Entry> & entry) -> Date 00297 { 00298 //++numAugmented; 00299 //cerr << "augmented " << ++numAugmented << " bids" << endl; 00300 00301 for (auto it = entry->outstanding.begin(), 00302 end = entry->outstanding.end(); 00303 it != end; ++it) { 00304 string eventName = "augmentor." + *it 00305 + ".expiredTooLate"; 00306 recordEvent(eventName.c_str(), ET_COUNT); 00307 } 00308 00309 this->augmentationExpired(id, *entry); 00310 return Date(); 00311 }; 00312 00313 if (augmenting.earliest <= now) { 00314 //Guard guard(lock); 00315 augmenting.expire(onExpired, now); 00316 } 00317 00318 if (augmenting.empty() && !idle_) { 00319 idle_ = 1; 00320 futex_wake(idle_); 00321 } 00322 00323 } 00324 00325 void 00326 AugmentationLoop:: 00327 updateAllAugmentors() 00328 { 00329 for (;;) { 00330 auto_ptr<AllAugmentorInfo> newInfo(new AllAugmentorInfo); 00331 00332 AllAugmentorInfo * current = allAugmentors; 00333 00334 for (auto it = augmentors.begin(), end = augmentors.end(); 00335 it != end; ++it) { 00336 AugmentorInfo & aug = *it->second; 00337 00338 //if (!it->second.configured) continue; 00339 if (!it->second) continue; 00340 if (aug.name == "") continue; 00341 AugmentorInfoEntry entry; 00342 entry.name = aug.name; 00343 entry.info = it->second; 00344 //entry.config = aug.config; 00345 newInfo->push_back(entry); 00346 } 00347 00348 // Sort they by their name 00349 std::sort(newInfo->begin(), newInfo->end(), 00350 [] (const AugmentorInfoEntry & entry1, 00351 const AugmentorInfoEntry & entry2) 00352 { 00353 return entry1.name < entry2.name; 00354 }); 00355 00356 // Add the index 00357 //for (unsigned i = 0; i < newInfo->size(); ++i) { 00358 // newInfo->index[(*newInfo.get())[i].name] = i; 00359 //} 00360 00361 if (ML::cmp_xchg(allAugmentors, current, newInfo.get())) { 00362 newInfo.release(); 00363 if (current) 00364 allAugmentorsGc.defer([=] () { delete current; }); 00365 break; 00366 } 00367 } 00368 } 00369 00370 void 00371 AugmentationLoop:: 00372 augment(const std::shared_ptr<AugmentationInfo> & info, 00373 Date timeout, 00374 const OnFinished & onFinished) 00375 { 00376 Date now = Date::now(); 00377 00378 auto entry = std::make_shared<Entry>(); 00379 entry->onFinished = onFinished; 00380 entry->info = info; 00381 entry->timeout = timeout; 00382 00383 // Get a set of all augmentors 00384 std::set<std::string> augmentors; 00385 00386 // Now go through and find all of the bidders 00387 for (unsigned i = 0; i < info->potentialGroups.size(); ++i) { 00388 const GroupPotentialBidders & group = info->potentialGroups[i]; 00389 for (unsigned j = 0; j < group.size(); ++j) { 00390 const PotentialBidder & bidder = group[j]; 00391 const AgentConfig & config = *bidder.config; 00392 for (unsigned k = 0; k < config.augmentations.size(); ++k) { 00393 const std::string & name = config.augmentations[k].name; 00394 augmentors.insert(name); 00395 } 00396 } 00397 } 00398 00399 //cerr << "need augmentors " << augmentors << endl; 00400 00401 // Find which ones are actually available... 00402 GcLock::SharedGuard guard(allAugmentorsGc); 00403 const AllAugmentorInfo * ai = allAugmentors; 00404 00405 ExcAssert(ai); 00406 00407 auto it1 = augmentors.begin(), end1 = augmentors.end(); 00408 auto it2 = ai->begin(), end2 = ai->end(); 00409 00410 while (it1 != end1 && it2 != end2) { 00411 if (*it1 == it2->name) { 00412 // Augmentor we need to run 00413 00414 //cerr << "augmenting with " << it2->name << endl; 00415 00416 recordEvent("augmentation.request"); 00417 string eventName = "augmentor." + it2->name + ".request"; 00418 recordEvent(eventName.c_str()); 00419 00420 if (it2->info->numInFlight > 3000) { 00421 string eventName = "augmentor." + it2->name 00422 + ".skippedTooManyInFlight"; 00423 recordEvent(eventName.c_str()); 00424 #if 0 00425 } else if (it2->info->lastHeartbeat.secondsUntil(now) > 2.0) { 00426 string eventName = "augmentor." + it2->name 00427 + ".skippedNoHeartbeat"; 00428 recordEvent(eventName.c_str()); 00429 #endif 00430 } 00431 else { 00432 entry->outstanding.insert(*it1); 00433 } 00434 00435 ++it1; 00436 ++it2; 00437 } 00438 else if (*it1 < it2->name) { 00439 // Augmentor is not available 00440 //cerr << "augmentor " << *it1 << " is not available" << endl; 00441 ++it1; 00442 } 00443 else if (it2->name < *it1) { 00444 // Augmentor is not required 00445 //cerr << "augmentor " << it2->name << " is not required" << endl; 00446 ++it2; 00447 } 00448 else throw ML::Exception("logic error traversing augmentors"); 00449 } 00450 00451 #if 0 00452 while (it1 != end1) { 00453 cerr << "augmentor " << *it1 << " is not available" << endl; 00454 ++it1; 00455 } 00456 00457 while (it2 != end2) { 00458 cerr << "augmentor " << it2->name << " is not required" << endl; 00459 ++it2; 00460 } 00461 #endif 00462 00463 if (entry->outstanding.empty()) { 00464 // No augmentors required... run the auction straight away 00465 onFinished(info); 00466 } 00467 else { 00468 //cerr << "putting in inbox" << endl; 00469 inbox.push(entry); 00470 00471 #if 0 // optimization 00472 // Set up to run the augmentors 00473 Guard guard(lock, boost::try_to_lock_t()); 00474 if (guard) { 00475 // Got the lock... put it straight in 00476 augmenting.insert(info->auction->id, entry, timeout); 00477 00478 for (auto it = entry->outstanding.begin(), 00479 end = entry->outstanding.end(); 00480 it != end; ++it) { 00481 auto & aug = *this->augmentors[*it]; 00482 00483 if (!aug.inFlight.insert 00484 (make_pair(info->auction->id, now)) 00485 .second) { 00486 cerr << "warning: double augment for auction " 00487 << info->auction->id << endl; 00488 } 00489 else aug.numInFlight = aug.inFlight.size(); 00490 } 00491 00492 idle_ = 0; 00493 } 00494 else { 00495 // Couldn't get the lock... put it on the queue 00496 sendMessage(toEndpoint_(), "QUEUE", entry); 00497 } 00498 #endif // optimization 00499 00500 } 00501 } 00502 00503 void 00504 AugmentationLoop:: 00505 doConfig(const std::vector<std::string> & message) 00506 { 00507 if (message.size() != 4) 00508 throw ML::Exception("config message has wrong size: %zd vs 4", 00509 message.size()); 00510 00511 const string & augmentorAddr = message[0]; 00512 const string & version = message[2]; 00513 const string & name = message[3]; 00514 00515 if (version != "1.0") 00516 throw ML::Exception("unknown version for config message"); 00517 00518 //cerr << "configuring augmentor " << name << " on " << connectTo 00519 // << endl; 00520 00521 string eventName = "augmentor." + name + ".configured"; 00522 recordEvent(eventName.c_str()); 00523 00524 auto newInfo = std::make_shared<AugmentorInfo>(); 00525 newInfo->name = name; 00526 newInfo->augmentorAddr = augmentorAddr; 00527 00528 //cerr << "connecting on " << connectTo << endl; 00529 //info->connection(); 00530 00531 if (augmentors.count(name)) { 00532 // Grab the old version 00533 auto oldInfo = augmentors[name]; 00534 00535 // There was an old entry... wait until nobody is using it 00536 00537 // First unpublish the entry 00538 updateAllAugmentors(); 00539 00540 // Now wait until nothing else can see it 00541 allAugmentorsGc.deferBarrier(); 00542 00543 augmentors.erase(name); 00544 00545 //cerr << " done removing old version" << endl; 00546 } 00547 00548 augmentors[name] = newInfo; 00549 00550 updateAllAugmentors(); 00551 00552 toAugmentors.sendMessage(augmentorAddr, "CONFIGOK"); 00553 00554 //cerr << "done updating" << endl; 00555 } 00556 00557 void 00558 AugmentationLoop:: 00559 doResponse(const std::vector<std::string> & message) 00560 { 00561 recordEvent("augmentation.response"); 00562 //cerr << "doResponse " << message << endl; 00563 if (message.size() != 7) 00564 throw ML::Exception("response message has wrong size: %zd", 00565 message.size()); 00566 const string & version = message[2]; 00567 if (version != "1.0") 00568 throw ML::Exception("unknown response version"); 00569 Date startTime = Date::parseSecondsSinceEpoch(message[3]); 00570 00571 Id id(message[4]); 00572 const std::string & augmentor = message[5]; 00573 const std::string & augmentation = message[6]; 00574 00575 ML::Timer timer; 00576 00577 AugmentationList augmentationList; 00578 if (augmentation != "" && augmentation != "null") { 00579 try { 00580 Json::Value augmentationJson; 00581 00582 JML_TRACE_EXCEPTIONS(false); 00583 augmentationJson = Json::parse(augmentation); 00584 augmentationList = AugmentationList::fromJson(augmentationJson); 00585 } catch (const std::exception & exc) { 00586 string eventName = "augmentor." + augmentor 00587 + ".responseParsingExceptions"; 00588 recordEvent(eventName.c_str(), ET_COUNT); 00589 } 00590 } 00591 00592 recordLevel(timer.elapsed_wall(), "responseParseTimeMs"); 00593 00594 { 00595 double timeTakenMs = startTime.secondsUntil(Date::now()) * 1000.0; 00596 string eventName = "augmentor." + augmentor + ".timeTakenMs"; 00597 recordEvent(eventName.c_str(), ET_OUTCOME, timeTakenMs); 00598 } 00599 00600 { 00601 double responseLength = augmentation.size(); 00602 string eventName = "augmentor." + augmentor + ".responseLengthBytes"; 00603 recordEvent(eventName.c_str(), ET_OUTCOME, responseLength); 00604 } 00605 00606 string eventName = ML::format("augmentor.%s.%s", 00607 augmentor.c_str(), 00608 (augmentation == "" || augmentation == "null" 00609 ? "nullResponse" : "validResponse")); 00610 recordEvent(eventName.c_str()); 00611 00612 // Modify the augmentor data structures 00613 //Guard guard(lock); 00614 00615 if (augmentors.count(augmentor)) { 00616 auto & entry = *augmentors[augmentor]; 00617 entry.inFlight.erase(id); 00618 entry.numInFlight = entry.inFlight.size(); 00619 } 00620 00621 auto it = augmenting.find(id); 00622 if (it == augmenting.end()) { 00623 recordEvent("augmentation.unknown"); 00624 string eventName = "augmentor." + augmentor + ".unknown"; 00625 recordEvent(eventName.c_str()); 00626 //cerr << "warning: handled response for unknown auction" << endl; 00627 return; 00628 } 00629 00630 it->second->info->auction->augmentations.mergeWith(augmentationList); 00631 00632 it->second->outstanding.erase(augmentor); 00633 if (it->second->outstanding.empty()) { 00634 it->second->onFinished(it->second->info); 00635 augmenting.erase(it); 00636 } 00637 } 00638 00639 void 00640 AugmentationLoop:: 00641 augmentationExpired(const Id & id, const Entry & entry) 00642 { 00643 entry.onFinished(entry.info); 00644 } 00645 00646 } // namespace RTBKIT