RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
plugins/exchange/http_auction_handler.cc
00001 /* http_auction_handler.cc
00002    Jeremy Barnes, 14 December 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Handler for auctions.
00006 */
00007 
00008 #include "http_auction_handler.h"
00009 #include "http_exchange_connector.h"
00010 
00011 #include "jml/arch/exception.h"
00012 #include "jml/arch/format.h"
00013 #include "jml/arch/backtrace.h"
00014 #include "jml/utils/guard.h"
00015 #include "jml/utils/set_utils.h"
00016 #include "jml/utils/vector_utils.h"
00017 #include "jml/arch/timers.h"
00018 #include <set>
00019 
00020 #include <boost/foreach.hpp>
00021 
00022 using namespace std;
00023 using namespace ML;
00024 
00025 namespace RTBKIT {
00026 
00027 
00028 /*****************************************************************************/
00029 /* HTTP AUCTION HANDLER                                                      */
00030 /*****************************************************************************/
00031 
00032 long HttpAuctionHandler::created = 0;
00033 long HttpAuctionHandler::destroyed = 0;
00034 
00035 HttpAuctionHandler::
00036 HttpAuctionHandler()
00037     : hasTimer(false), disconnected(false), servingRequest(false)
00038 {
00039     atomic_add(created, 1);
00040 }
00041 
00042 HttpAuctionHandler::
00043 ~HttpAuctionHandler()
00044 {
00045     if (servingRequest)
00046         ML::atomic_add(endpoint->numServingRequest_, -1);
00047     //cerr << "deleting HttpAuctionHandler at " << this << endl;
00048     //backtrace();
00049     atomic_add(destroyed, 1);
00050     if (hasTimer)
00051         throw Exception("deleted auction handler with timer");
00052 }
00053 
00054 void
00055 HttpAuctionHandler::
00056 onGotTransport()
00057 {
00058     transport().activities.limit(5);
00059     //transport().activities.clear();
00060 
00061     addActivityS("gotTransport");
00062 
00063     //cerr << "bid handler got transport" << endl;
00064 
00065     this->endpoint = dynamic_cast<HttpExchangeConnector *>(get_endpoint());
00066     if (!this->endpoint)
00067         throw Exception("HttpAuctionHandler needs to be owned by an "
00068                         "HttpExchangeConnector");
00069 
00070     HttpConnectionHandler::onGotTransport();
00071 
00072     startReading();
00073 }
00074 
00075 void
00076 HttpAuctionHandler::
00077 handleDisconnect()
00078 {
00079     doEvent("auctionDisconnection");
00080 
00081     disconnected = true;
00082 
00083     closeWhenHandlerFinished();
00084 }
00085 
00086 void
00087 HttpAuctionHandler::
00088 handleTimeout(Date date, size_t cookie)
00089 {
00090     checkMagic();
00091 
00092     std::shared_ptr<Auction> auction_ = auction;
00093     if (auction->tooLate()) return;  // was sent before timeout
00094 
00095 #if 0
00096     cerr << "auction " << auction_->id << " timed out with tooLate "
00097          << auction->tooLate() << " and response "
00098          << auction->getResponseJson().toString() << endl;
00099 #endif
00100 
00101     if (!auction_) {
00102         cerr << "doAuctionTimeout with no auction" << endl;
00103         abort();
00104     }
00105 
00106     if (auction_->finish()) {
00107         doEvent("auctionTimeout");
00108         if (this->endpoint->onTimeout)
00109             this->endpoint->onTimeout(auction, date);
00110     }
00111 }
00112 
00113 void
00114 HttpAuctionHandler::
00115 cancelTimer()
00116 {
00117     if (!hasTimer) return;
00118 
00119     //cerr << "cancelling timer" << endl;
00120 
00121     addActivityS("cancelTimer");
00122     ConnectionHandler::cancelTimer();
00123     hasTimer = false;
00124 }
00125 
00126 void
00127 HttpAuctionHandler::
00128 onDisassociate()
00129 {
00130     //cerr << "onDisassociate" << endl;
00131 
00132     cancelTimer();
00133 
00134     /* We need to make sure that the handler doesn't try to send us
00135        anything. */
00136     if (auction && !auction->tooLate()) {
00137         auction->isZombie = true;
00138         doEvent("disconnectWithActiveAuction");
00139         //transport().activities.dump();
00140         //cerr << "disassociation of HttpAuctionHandler " << this
00141         //     << " when auction not finished"
00142         //     << endl;
00143         //backtrace();
00144         //throw Exception("attempt to disassociate HttpAuctionHandler when not "
00145         //                "too late: error %s", error.c_str());
00146     }
00147 
00148     endpoint->finishedWithHandler(shared_from_this());
00149 }
00150 
00151 void
00152 HttpAuctionHandler::
00153 onCleanup()
00154 {
00155     onDisassociate();
00156 }
00157 
00158 void
00159 HttpAuctionHandler::
00160 doEvent(const char * eventName,
00161         EventType type,
00162         float value,
00163         const char * units)
00164 {
00165     //cerr << eventName << " " << value << endl;
00166     endpoint->recordEvent(eventName, type, value);
00167 }
00168 
00169 void
00170 HttpAuctionHandler::
00171 incNumServingRequest()
00172 {
00173     ML::atomic_add(endpoint->numServingRequest_, 1);
00174 }
00175 
00176 void
00177 HttpAuctionHandler::
00178 handleHttpPayload(const HttpHeader & header,
00179                   const std::string & payload)
00180 {
00181     // Unknown resource?  Handle it...
00182     //cerr << header.verb << " " << header.resource << endl;
00183     //cerr << header << endl;
00184     //cerr << payload << endl;
00185     //cerr << endpoint->auctionVerb << " " << endpoint->auctionResource << endl;
00186 
00187     if (header.resource != endpoint->auctionResource
00188         || header.verb != endpoint->auctionVerb) {
00189         endpoint->handleUnknownRequest(*this, header, payload);
00190         return;
00191     }
00192 
00193     doEvent("auctionReceived");
00194 
00195     incNumServingRequest();
00196     servingRequest = true;
00197 
00198     addActivityS("handleHttpPayload");
00199 
00200     stopReading();
00201 
00202     // First check if we are authorized to bid.  If not we drop the auction
00203     // with prejudice.
00204     Date now = Date::now();
00205 
00206     if (!endpoint->isEnabled(now)) {
00207         doEvent("auctionEarlyDrop.notEnabled");
00208         dropAuction("endpoint not enabled");
00209         return;
00210     }
00211     
00212     double acceptProbability = endpoint->acceptAuctionProbability;
00213 
00214     if (acceptProbability < 1.0
00215         && random() % 1000000 > 1000000 * acceptProbability) {
00216         // early drop...
00217         dropAuction("random early drop");
00218         return;
00219     }
00220 
00221     double timeAvailableMs = getTimeAvailableMs(header, payload);
00222     double networkTimeMs = getRoundTripTimeMs(header);
00223 
00224     doEvent("auctionStartLatencyMs",
00225             ET_OUTCOME,
00226             now.secondsSince(firstData) * 1000.0, "ms");
00227 
00228     doEvent("auctionTimeAvailableMs",
00229             ET_OUTCOME,
00230             timeAvailableMs, "ms");
00231 
00232 
00233     if (timeAvailableMs - networkTimeMs < 5.0) {
00234         // Do an early drop of the bid request without even creating an
00235         // auction
00236 
00237         doEvent("auctionEarlyDrop.timeLeftMs",
00238                 ET_OUTCOME,
00239                 timeAvailableMs, "ms");
00240 
00241         doEvent(ML::format("auctionEarlyDrop.peer.%s",
00242                                transport().getPeerName().c_str()).c_str());
00243 
00244         dropAuction(ML::format("timeleft of %f is too low",
00245                                timeAvailableMs));
00246 
00247         return;
00248     }
00249 
00250     /* This is the callback that the auction will call once finish()
00251        has been called on it.  It will be called exactly once.
00252     */
00253     auto handleAuction = [=] (std::shared_ptr<Auction> auction)
00254         {
00255             //cerr << "HANDLE AUCTION CALLED AFTER "
00256             //<< Date::now().secondsSince(auction->start) * 1000
00257             //<< "ms" << endl;
00258 
00259             if (!auction || auction->isZombie)
00260                 return;  // Was already externally terminated; this is invalid
00261 
00262             if (this->transport().lockedByThisThread())
00263                 this->sendResponse();
00264             else {
00265                 this->transport()
00266                     .doAsync(boost::bind(&HttpAuctionHandler::sendResponse,
00267                                          this),
00268                              "AsyncSendResponse");
00269             }
00270         };
00271     
00272     // We always give ourselves 5ms to bid in, no matter what (let upstream
00273     // deal with it if it's really that much too slow).
00274     Date expiry = firstData.plusSeconds
00275         (max(5.0, (timeAvailableMs - networkTimeMs)) / 1000.0);
00276 
00277     try {
00278         auto bidRequest = parseBidRequest(header, payload);
00279 
00280         if (!bidRequest) {
00281             cerr << "got no bid request" << endl;
00282             // The request was handled; nothing to do
00283             return;
00284         }
00285 
00286         auction.reset(new Auction(endpoint,
00287                                   handleAuction, bidRequest,
00288                                   bidRequest->toJsonStr(),
00289                                   "datacratic",
00290                                   firstData, expiry));
00291 
00292 #if 0
00293         static std::mutex lock;
00294         std::unique_lock<std::mutex> guard(lock);
00295         cerr << "bytes before = " << payload.size() << " after "
00296              << auction->requestStr.size() << " ratio "
00297              << 100.0 * auction->requestStr.size() / payload.size()
00298              << "%" << endl;
00299         string s = bidRequest->serializeToString();
00300         cerr << "serialized bytes before = " << payload.size() << " after "
00301              << s.size() << " ratio "
00302              << 100.0 * s.size() / payload.size() << "%" << endl;
00303 #endif
00304 
00305     } catch (const std::exception & exc) {
00306         // Inject the error message in
00307         sendErrorResponse("Error parsing bid request", exc.what());
00308         return;
00309     }
00310 
00311     doEvent("auctionNetworkLatencyMs",
00312             ET_OUTCOME,
00313             (firstData.secondsSince(auction->request->timestamp)) * 1000.0,
00314             "ms");
00315 
00316     doEvent("auctionTotalStartLatencyMs",
00317             ET_OUTCOME,
00318             (now.secondsSince(auction->request->timestamp)) * 1000.0,
00319             "ms");
00320 
00321     doEvent("auctionStart");
00322 
00323     addActivity("gotAuction %s", auction->id.toString().c_str());
00324     
00325     if (now > expiry) {
00326         doEvent("auctionAlreadyExpired");
00327 
00328         string msg = format("auction started after time already elapsed: "
00329                             "%s vs %s, available time = %.1fms, "
00330                             "firstData = %s",
00331                             now.print(4).c_str(),
00332                             expiry.print(4).c_str(),
00333                             timeAvailableMs,
00334                             firstData.print(4).c_str());
00335         cerr << msg << endl;
00336 
00337         dropAuction("auction already expired");
00338 
00339         return;
00340     }
00341     
00342     addActivity("timeAvailable: %.1fms", timeAvailableMs);
00343     
00344     scheduleTimerAbsolute(expiry, 1);
00345     hasTimer = true;
00346     
00347     addActivity("gotTimer for %s",
00348                 expiry.print(4).c_str());
00349 
00350     auction->doneParsing = Date::now();
00351     endpoint->onNewAuction(auction);
00352 }
00353 
00354 void
00355 HttpAuctionHandler::
00356 sendResponse()
00357 {
00358     checkMagic();
00359 
00360     if (!transport().lockedByThisThread())
00361         throw Exception("sendResponse must be in handler context");
00362 
00363     addActivityS("sendResponse");
00364 
00365     //cerr << "locked by " << bid->lock.get_thread_id() << endl;
00366     //cerr << "my thread " << ACE_OS::thr_self() << endl;
00367 
00368     Date before = Date::now();
00369 
00370     /* Make sure the transport isn't dead. */
00371     transport().checkMagic();
00372 
00373     if (!transport().lockedByThisThread())
00374         throw Exception("transport not locked by this thread");
00375 
00376     if (!auction) {
00377         throw Exception("sending response for cleared auction");
00378     }
00379 
00380     if (!auction->tooLate())
00381         throw Exception("auction is not finished");
00382 
00383     addActivity("sendResponse (lock took %.2fms)",
00384                 Date::now().secondsSince(before) * 1000);
00385     
00386     cancelTimer();
00387 
00388     endpoint->onAuctionDone(auction);
00389 
00390     //cerr << "sendResponse " << this << ": disconnected "
00391     //     << disconnected << endl;
00392 
00393     if (disconnected) {
00394         closeWhenHandlerFinished();
00395         return;
00396     }
00397     
00398     HttpResponse response = getResponse();
00399     
00400     Date startTime = auction->start;
00401     Date beforeSend = Date::now();
00402 
00403     auto onSendFinished = [=] ()
00404         {
00405             //static int n = 0;
00406             //ML::atomic_add(n, 1);
00407             //cerr << "sendFinished canBlock = " << canBlock << " "
00408             //<< n << endl;
00409             this->addActivityS("sendFinished");
00410             double sendTime = Date::now().secondsSince(beforeSend);
00411             if (sendTime > 0.01)
00412                 cerr << "sendTime = " << sendTime << " for "
00413                      << (auction ? auction->id.toString() : "NO AUCTION")
00414                      << endl;
00415 
00416             this->doEvent("auctionResponseSent");
00417             this->doEvent("auctionTotalTimeMs",
00418                           ET_OUTCOME,
00419                           Date::now().secondsSince(this->firstData) * 1000.0,
00420                           "ms");
00421 
00422             if (random() % 1000 == 0) {
00423                 this->transport().closeWhenHandlerFinished();
00424             }
00425             else {
00426                 this->transport().associateWhenHandlerFinished
00427                 (this->makeNewHandlerShared(), "sendFinished");
00428             }
00429         };
00430 
00431     addActivityS("beforeSend");
00432     
00433     double timeTaken = beforeSend.secondsSince(startTime) * 1000;
00434 
00435     response.extraHeaders
00436         .push_back({"X-Processing-Time-Ms", to_string(timeTaken)});
00437 
00438     putResponseOnWire(response, onSendFinished);
00439 }
00440 
00441 void
00442 HttpAuctionHandler::
00443 dropAuction(const std::string & reason)
00444 {
00445     auto onSendFinished = [=] ()
00446         {
00447             if (random() % 1000 == 0) {
00448                 this->transport().closeWhenHandlerFinished();
00449             }
00450             else {
00451                 this->transport().associateWhenHandlerFinished
00452                 (this->makeNewHandlerShared(), "sendFinished");
00453             }
00454         };
00455 
00456     putResponseOnWire(endpoint->getDroppedAuctionResponse(*this, *auction, reason),
00457                       onSendFinished);
00458 }
00459 
00460 void
00461 HttpAuctionHandler::
00462 sendErrorResponse(const std::string & error,
00463                   const std::string & details)
00464 {
00465     putResponseOnWire(endpoint->getErrorResponse(*this, *auction, error + ": " + details));
00466 }
00467 
00468 std::string
00469 HttpAuctionHandler::
00470 status() const
00471 {
00472     if (!hasTransport())
00473         return "newly minted HttpAuctionHandler";
00474 
00475     string result = format("GenericHttpAuctionHandler: %p readState %d hasTimer %d",
00476                            this, readState, hasTimer);
00477     result += "auction: ";
00478     if (auction) result += getResponse().body;
00479     else result += "NULL";
00480     return result;
00481 }
00482 
00483 HttpResponse
00484 HttpAuctionHandler::
00485 getResponse() const
00486 {
00487     return endpoint->getResponse(*this, this->header, *auction);
00488 }
00489 
00490 std::shared_ptr<BidRequest>
00491 HttpAuctionHandler::
00492 parseBidRequest(const HttpHeader & header,
00493                 const std::string & payload)
00494 {
00495     return endpoint->parseBidRequest(*this, header, payload);
00496 }
00497 
00498 double
00499 HttpAuctionHandler::
00500 getTimeAvailableMs(const HttpHeader & header,
00501                    const std::string & payload)
00502 {
00503     return endpoint->getTimeAvailableMs(*this, header, payload);
00504 }
00505 
00506 double
00507 HttpAuctionHandler::
00508 getRoundTripTimeMs(const HttpHeader & header)
00509 {
00510     return endpoint->getRoundTripTimeMs(*this, header);
00511 }
00512 
00513 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator