RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/augmentation_loop.cc
00001 /* augmentation.cc
00002    Jeremy Barnes, 1 March 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    How we do auction augmentation.
00006 */
00007 
00008 #include "augmentation_loop.h"
00009 #include "jml/arch/timers.h"
00010 #include "jml/arch/futex.h"
00011 #include "jml/utils/vector_utils.h"
00012 #include "jml/utils/set_utils.h"
00013 #include "jml/arch/exception_handler.h"
00014 #include "soa/service/zmq_utils.h"
00015 #include <iostream>
00016 #include <boost/make_shared.hpp>
00017 #include "rtbkit/core/agent_configuration/agent_config.h"
00018 
00019 
00020 using namespace std;
00021 using namespace ML;
00022 
00023 
00024 
00025 namespace RTBKIT {
00026 
00027 
00028 /*****************************************************************************/
00029 /* AUGMENTATION LOOP                                                         */
00030 /*****************************************************************************/
00031 
00032 AugmentationLoop::
00033 AugmentationLoop(ServiceBase & parent,
00034                  const std::string & name)
00035     : ServiceBase(name, parent),
00036       allAugmentors(0),
00037       idle_(1),
00038       inbox(65536),
00039       toAugmentors(getZmqContext())
00040 {
00041     updateAllAugmentors();
00042 }
00043 
00044 AugmentationLoop::
00045 AugmentationLoop(std::shared_ptr<ServiceProxies> proxies,
00046                  const std::string & name)
00047     : ServiceBase(name, proxies),
00048       allAugmentors(0),
00049       idle_(1),
00050       inbox(65536),
00051       toAugmentors(getZmqContext())
00052 {
00053     updateAllAugmentors();
00054 }
00055 
00056 AugmentationLoop::
00057 ~AugmentationLoop()
00058 {
00059 }
00060 
00061 void
00062 AugmentationLoop::
00063 init()
00064 {
00065     registerServiceProvider(serviceName(), { "rtbRouterAugmentation" });
00066 
00067     toAugmentors.init(getServices()->config, serviceName() + "/augmentors");
00068 
00069     toAugmentors.clientMessageHandler
00070         = [&] (const std::vector<std::string> & message)
00071         {
00072             //cerr << "got augmentor message " << message << endl;
00073             handleAugmentorMessage(message);
00074         };
00075 
00076     toAugmentors.bindTcp(getServices()->ports->getRange("augmentors"));
00077 
00078     toAugmentors.onConnection = [=] (const std::string & client)
00079         {
00080             cerr << "augmentor " << client << " has connected" << endl;
00081         };
00082 
00083     toAugmentors.onDisconnection = [=] (const std::string & client)
00084         {
00085             cerr << "augmentor " << client << " has disconnected" << endl;
00086         };
00087 
00088     inbox.onEvent = [&] (const std::shared_ptr<Entry> & entry)
00089         {
00090             //cerr << "got event on inbox" << endl;
00091 
00092             Guard guard(lock);
00093             Date now = Date::now();
00094 
00095             //cerr << "got lock on inbox" << endl;
00096 
00097             // TODO: wake up loop if slower...
00098             // TODO: DRY with other function...
00099             augmenting.insert(entry->info->auction->id, entry,
00100                               entry->timeout);
00101 
00102             for (auto it = entry->outstanding.begin(),
00103                      end = entry->outstanding.end();
00104                  it != end;  ++it) {
00105 
00106                 auto & aug = *augmentors[*it];
00107 
00108                 //cerr << "sending to " << *it << " at "
00109                 //     << aug.agentAddr << endl;
00110 
00111                 set<string> agents;
00112                 const auto& bidderGroups = entry->info->potentialGroups;
00113 
00114                 for (auto jt = bidderGroups.begin(), end = bidderGroups.end();
00115                      jt != end; ++jt)
00116                 {
00117                     for (auto kt = jt->begin(), end = jt->end();
00118                          kt != end; ++kt)
00119                     {
00120                         agents.insert(kt->agent);
00121                     }
00122                 }
00123 
00124                 std::ostringstream availableAgentsStr;
00125                 ML::DB::Store_Writer writer(availableAgentsStr);
00126                 writer.save(agents);
00127                         
00128                 // Send the message to the augmentor
00129                 toAugmentors.sendMessage(aug.augmentorAddr,
00130                                          "AUGMENT", "1.0", *it,
00131                                          entry->info->auction->id.toString(),
00132                                          entry->info->auction->requestStrFormat,
00133                                          entry->info->auction->requestStr,
00134                                          availableAgentsStr.str(),
00135                                          Date::now());
00136 
00137                 if (!aug.inFlight.insert
00138                     (make_pair(entry->info->auction->id, now))
00139                     .second) {
00140                     cerr << "warning: double augment for auction "
00141                          << entry->info->auction->id << endl;
00142                 }
00143                 else aug.numInFlight = aug.inFlight.size();
00144             }
00145 
00146             recordLevel(Date::now().secondsSince(now), "requestTimeMs");
00147                     
00148             idle_ = 0;
00149         };
00150 
00151     double lastSleepTime = 0;
00152     addPeriodic("Augmentationloop::dutyCycle", 1.0, [=] (uint64_t) mutable {
00153                 double sleepTime = totalSleepSeconds();
00154                 recordLevel((sleepTime - lastSleepTime) * 1000.0, "sleepTime");
00155                 lastSleepTime = sleepTime;
00156             });
00157 
00158     addSource("AugmentationLoop::inbox", inbox);
00159     addSource("AugmentationLoop::toAugmentors", toAugmentors);
00160     addPeriodic("AugmentationLoop::checkExpiries", 0.977,
00161                 [=] (int) { checkExpiries(); });
00162 }
00163 
00164 void
00165 AugmentationLoop::
00166 start()
00167 {
00168     //toAugmentors.start();
00169     MessageLoop::start();
00170 }
00171 
00172 void
00173 AugmentationLoop::
00174 sleepUntilIdle()
00175 {
00176     while (!idle_)
00177         futex_wait(idle_, 0);
00178 }
00179 
00180 void
00181 AugmentationLoop::
00182 shutdown()
00183 {
00184     MessageLoop::shutdown();
00185     toAugmentors.shutdown();
00186 }
00187 
00188 size_t
00189 AugmentationLoop::
00190 numAugmenting() const
00191 {
00192     // TODO: can we get away without a lock here?
00193     Guard guard(lock);
00194     return augmenting.size();
00195 }
00196 
00197 bool
00198 AugmentationLoop::
00199 currentlyAugmenting(const Id & auctionId) const
00200 {
00201     Guard guard(lock);
00202     return augmenting.count(auctionId);
00203 }
00204 
00205 void
00206 AugmentationLoop::
00207 bindAugmentors(const std::string & uri)
00208 {
00209     try {
00210         toAugmentors.bind(uri.c_str());
00211     } catch (const std::exception & exc) {
00212         throw Exception("error while binding augmentation URI %s: %s",
00213                         uri.c_str(), exc.what());
00214     }
00215 }
00216 
00217 void
00218 AugmentationLoop::
00219 handleAugmentorMessage(const std::vector<std::string> & message)
00220 {
00221     Guard guard(lock);
00222 
00223     Date now = Date::now();
00224 
00225     const std::string & type = message.at(1); 
00226     if (type == "CONFIG") {
00227         doConfig(message);
00228     }
00229     else if (type == "RESPONSE") {
00230         doResponse(message);
00231     }
00232     else throw ML::Exception("error handling unknown "
00233                              "augmentor message of type "
00234                              + type);
00235 }
00236 
00237 void
00238 AugmentationLoop::
00239 checkExpiries()
00240 {
00241     //cerr << "checking expiries" << endl;
00242 
00243     Guard guard(lock);
00244 
00245     Date now = Date::now();
00246 
00247     for (auto it = augmentors.begin(), end = augmentors.end();
00248          it != end;  ++it) {
00249 
00250         AugmentorInfo & aug = *it->second;
00251 
00252         vector<Id> lostAuctions;
00253 
00254         for (auto jt = aug.inFlight.begin(),
00255                  jend = aug.inFlight.end();
00256              jt != jend;  ++jt) {
00257             if (now.secondsSince(jt->second) > 5.0) {
00258                 cerr << "warning: augmentor " << it->first
00259                      << " lost auction " << jt->first
00260                      << endl;
00261 
00262                 string eventName = "augmentor."
00263                     + it->first + ".lostAuction";
00264                 recordEvent(eventName.c_str());
00265 
00266                 lostAuctions.push_back(jt->first);
00267             }
00268         }
00269                 
00270         // Delete all in flight that appear to be lost
00271         for (unsigned i = 0;  i < lostAuctions.size();  ++i)
00272             aug.inFlight.erase(lostAuctions[i]);
00273                 
00274         string eventName = "augmentor." + it->first + ".numInFlight";
00275         recordEvent(eventName.c_str(), ET_LEVEL,
00276                     aug.inFlight.size());
00277     }
00278     
00279 #if 0
00280     vector<string> deadAugmentors;
00281 
00282     for (unsigned i = 0;  i < deadAugmentors.size();  ++i) {
00283         string aug = deadAugmentors[i];
00284         augmentors.erase(aug);
00285                 
00286         string eventName = "augmentor." + aug + ".dead";
00287         recordEvent(eventName.c_str());
00288     }
00289 
00290     if (!deadAugmentors.empty())
00291         updateAllAugmentors();
00292     
00293 #endif
00294 
00295     auto onExpired = [&] (const Id & id,
00296                           const std::shared_ptr<Entry> & entry) -> Date
00297         {
00298             //++numAugmented;
00299             //cerr << "augmented " << ++numAugmented << " bids" << endl;
00300 
00301             for (auto it = entry->outstanding.begin(),
00302                      end = entry->outstanding.end();
00303                  it != end;  ++it) {
00304                 string eventName = "augmentor." + *it
00305                     + ".expiredTooLate";
00306                 recordEvent(eventName.c_str(), ET_COUNT);
00307             }
00308                 
00309             this->augmentationExpired(id, *entry);
00310             return Date();
00311         };
00312 
00313     if (augmenting.earliest <= now) {
00314         //Guard guard(lock);
00315         augmenting.expire(onExpired, now);
00316     }
00317 
00318     if (augmenting.empty() && !idle_) {
00319         idle_ = 1;
00320         futex_wake(idle_);
00321     }
00322 
00323 }
00324 
00325 void
00326 AugmentationLoop::
00327 updateAllAugmentors()
00328 {
00329     for (;;) {
00330         auto_ptr<AllAugmentorInfo> newInfo(new AllAugmentorInfo);
00331 
00332         AllAugmentorInfo * current = allAugmentors;
00333 
00334         for (auto it = augmentors.begin(), end = augmentors.end();
00335              it != end;  ++it) {
00336             AugmentorInfo & aug = *it->second;
00337 
00338             //if (!it->second.configured) continue;
00339             if (!it->second) continue;
00340             if (aug.name == "") continue;
00341             AugmentorInfoEntry entry;
00342             entry.name = aug.name;
00343             entry.info = it->second;
00344             //entry.config = aug.config;
00345             newInfo->push_back(entry);
00346         }
00347 
00348         // Sort they by their name
00349         std::sort(newInfo->begin(), newInfo->end(),
00350                   [] (const AugmentorInfoEntry & entry1,
00351                       const AugmentorInfoEntry & entry2)
00352                   {
00353                       return entry1.name < entry2.name;
00354                   });
00355         
00356         // Add the index
00357         //for (unsigned i = 0;  i < newInfo->size();  ++i) {
00358         //    newInfo->index[(*newInfo.get())[i].name] = i;
00359         //}
00360 
00361         if (ML::cmp_xchg(allAugmentors, current, newInfo.get())) {
00362             newInfo.release();
00363             if (current)
00364                 allAugmentorsGc.defer([=] () { delete current; });
00365             break;
00366         }
00367     }
00368 }
00369 
00370 void
00371 AugmentationLoop::
00372 augment(const std::shared_ptr<AugmentationInfo> & info,
00373         Date timeout,
00374         const OnFinished & onFinished)
00375 {
00376     Date now = Date::now();
00377 
00378     auto entry = std::make_shared<Entry>();
00379     entry->onFinished = onFinished;
00380     entry->info = info;
00381     entry->timeout = timeout;
00382 
00383     // Get a set of all augmentors
00384     std::set<std::string> augmentors;
00385 
00386     // Now go through and find all of the bidders
00387     for (unsigned i = 0;  i < info->potentialGroups.size();  ++i) {
00388         const GroupPotentialBidders & group = info->potentialGroups[i];
00389         for (unsigned j = 0;  j < group.size();  ++j) {
00390             const PotentialBidder & bidder = group[j];
00391             const AgentConfig & config = *bidder.config;
00392             for (unsigned k = 0;  k < config.augmentations.size();  ++k) {
00393                 const std::string & name = config.augmentations[k].name;
00394                 augmentors.insert(name);
00395             }
00396         }
00397     }
00398 
00399     //cerr << "need augmentors " << augmentors << endl;
00400 
00401     // Find which ones are actually available...
00402     GcLock::SharedGuard guard(allAugmentorsGc);
00403     const AllAugmentorInfo * ai = allAugmentors;
00404     
00405     ExcAssert(ai);
00406 
00407     auto it1 = augmentors.begin(), end1 = augmentors.end();
00408     auto it2 = ai->begin(), end2 = ai->end();
00409 
00410     while (it1 != end1 && it2 != end2) {
00411         if (*it1 == it2->name) {
00412             // Augmentor we need to run
00413 
00414             //cerr << "augmenting with " << it2->name << endl;
00415 
00416             recordEvent("augmentation.request");
00417             string eventName = "augmentor." + it2->name + ".request";
00418             recordEvent(eventName.c_str());
00419             
00420             if (it2->info->numInFlight > 3000) {
00421                 string eventName = "augmentor." + it2->name
00422                     + ".skippedTooManyInFlight";
00423                 recordEvent(eventName.c_str());
00424 #if 0
00425             } else if (it2->info->lastHeartbeat.secondsUntil(now) > 2.0) {
00426                 string eventName = "augmentor." + it2->name
00427                     + ".skippedNoHeartbeat";
00428                 recordEvent(eventName.c_str());
00429 #endif
00430             }
00431             else {
00432                 entry->outstanding.insert(*it1);
00433             }
00434 
00435             ++it1;
00436             ++it2;
00437         }
00438         else if (*it1 < it2->name) {
00439             // Augmentor is not available
00440             //cerr << "augmentor " << *it1 << " is not available" << endl;
00441             ++it1;
00442         }
00443         else if (it2->name < *it1) {
00444             // Augmentor is not required
00445             //cerr << "augmentor " << it2->name << " is not required" << endl;
00446             ++it2;
00447         }
00448         else throw ML::Exception("logic error traversing augmentors");
00449     }
00450 
00451 #if 0
00452     while (it1 != end1) {
00453         cerr << "augmentor " << *it1 << " is not available" << endl;
00454         ++it1;
00455     }
00456     
00457     while (it2 != end2) {
00458         cerr << "augmentor " << it2->name << " is not required" << endl;
00459         ++it2;
00460     }
00461 #endif
00462 
00463     if (entry->outstanding.empty()) {
00464         // No augmentors required... run the auction straight away
00465         onFinished(info);
00466     }
00467     else {
00468         //cerr << "putting in inbox" << endl;
00469         inbox.push(entry);
00470 
00471 #if 0 // optimization
00472         // Set up to run the augmentors
00473         Guard guard(lock, boost::try_to_lock_t());
00474         if (guard) {
00475             // Got the lock... put it straight in
00476             augmenting.insert(info->auction->id, entry, timeout);
00477 
00478             for (auto it = entry->outstanding.begin(),
00479                      end = entry->outstanding.end();
00480                  it != end;  ++it) {
00481                 auto & aug = *this->augmentors[*it];
00482 
00483                 if (!aug.inFlight.insert
00484                     (make_pair(info->auction->id, now))
00485                     .second) {
00486                     cerr << "warning: double augment for auction "
00487                          << info->auction->id << endl;
00488                 }
00489                 else aug.numInFlight = aug.inFlight.size();
00490             }
00491 
00492             idle_ = 0;
00493         }
00494         else {
00495             // Couldn't get the lock... put it on the queue
00496             sendMessage(toEndpoint_(), "QUEUE", entry);
00497         }
00498 #endif // optimization
00499 
00500     }
00501 }
00502 
00503 void
00504 AugmentationLoop::
00505 doConfig(const std::vector<std::string> & message)
00506 {
00507     if (message.size() != 4)
00508         throw ML::Exception("config message has wrong size: %zd vs 4",
00509                             message.size());
00510 
00511     const string & augmentorAddr = message[0];
00512     const string & version = message[2];
00513     const string & name = message[3];
00514 
00515     if (version != "1.0")
00516         throw ML::Exception("unknown version for config message");
00517 
00518     //cerr << "configuring augmentor " << name << " on " << connectTo
00519     //     << endl;
00520 
00521     string eventName = "augmentor." + name + ".configured";
00522     recordEvent(eventName.c_str());
00523 
00524     auto newInfo = std::make_shared<AugmentorInfo>();
00525     newInfo->name = name;
00526     newInfo->augmentorAddr = augmentorAddr;
00527 
00528     //cerr << "connecting on " << connectTo << endl;
00529     //info->connection();
00530 
00531     if (augmentors.count(name)) {
00532         // Grab the old version
00533         auto oldInfo = augmentors[name];
00534 
00535         // There was an old entry... wait until nobody is using it
00536 
00537         // First unpublish the entry
00538         updateAllAugmentors();
00539 
00540         // Now wait until nothing else can see it
00541         allAugmentorsGc.deferBarrier();
00542 
00543         augmentors.erase(name);
00544 
00545         //cerr << "  done removing old version" << endl;
00546     }
00547 
00548     augmentors[name] = newInfo;
00549 
00550     updateAllAugmentors();
00551 
00552     toAugmentors.sendMessage(augmentorAddr, "CONFIGOK");
00553 
00554     //cerr << "done updating" << endl;
00555 }
00556 
00557 void
00558 AugmentationLoop::
00559 doResponse(const std::vector<std::string> & message)
00560 {
00561     recordEvent("augmentation.response");
00562     //cerr << "doResponse " << message << endl;
00563     if (message.size() != 7)
00564         throw ML::Exception("response message has wrong size: %zd",
00565                             message.size());
00566     const string & version = message[2];
00567     if (version != "1.0")
00568         throw ML::Exception("unknown response version");
00569     Date startTime = Date::parseSecondsSinceEpoch(message[3]);
00570 
00571     Id id(message[4]);
00572     const std::string & augmentor = message[5];
00573     const std::string & augmentation = message[6];
00574 
00575     ML::Timer timer;
00576 
00577     AugmentationList augmentationList;
00578     if (augmentation != "" && augmentation != "null") {
00579         try {
00580             Json::Value augmentationJson;
00581 
00582             JML_TRACE_EXCEPTIONS(false);
00583             augmentationJson = Json::parse(augmentation);
00584             augmentationList = AugmentationList::fromJson(augmentationJson);
00585         } catch (const std::exception & exc) {
00586             string eventName = "augmentor." + augmentor
00587                 + ".responseParsingExceptions";
00588             recordEvent(eventName.c_str(), ET_COUNT);
00589         }
00590     }
00591 
00592     recordLevel(timer.elapsed_wall(), "responseParseTimeMs");
00593 
00594     {
00595         double timeTakenMs = startTime.secondsUntil(Date::now()) * 1000.0;
00596         string eventName = "augmentor." + augmentor + ".timeTakenMs";
00597         recordEvent(eventName.c_str(), ET_OUTCOME, timeTakenMs);
00598     }
00599 
00600     {
00601         double responseLength = augmentation.size();
00602         string eventName = "augmentor." + augmentor + ".responseLengthBytes";
00603         recordEvent(eventName.c_str(), ET_OUTCOME, responseLength);
00604     }
00605 
00606     string eventName = ML::format("augmentor.%s.%s",
00607                                   augmentor.c_str(),
00608                                   (augmentation == "" || augmentation == "null"
00609                                    ? "nullResponse" : "validResponse"));
00610     recordEvent(eventName.c_str());
00611 
00612     // Modify the augmentor data structures
00613     //Guard guard(lock);
00614 
00615     if (augmentors.count(augmentor)) {
00616         auto & entry = *augmentors[augmentor];
00617         entry.inFlight.erase(id);
00618         entry.numInFlight = entry.inFlight.size();
00619     }
00620 
00621     auto it = augmenting.find(id);
00622     if (it == augmenting.end()) {
00623         recordEvent("augmentation.unknown");
00624         string eventName = "augmentor." + augmentor + ".unknown";
00625         recordEvent(eventName.c_str());
00626         //cerr << "warning: handled response for unknown auction" << endl;
00627         return;
00628     }
00629 
00630     it->second->info->auction->augmentations.mergeWith(augmentationList);
00631 
00632     it->second->outstanding.erase(augmentor);
00633     if (it->second->outstanding.empty()) {
00634         it->second->onFinished(it->second->info);
00635         augmenting.erase(it);
00636     }
00637 }
00638 
00639 void
00640 AugmentationLoop::
00641 augmentationExpired(const Id & id, const Entry & entry)
00642 {
00643     entry.onFinished(entry.info);
00644 }                     
00645 
00646 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator