![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* http_auction_handler.cc 00002 Jeremy Barnes, 14 December 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Handler for auctions. 00006 */ 00007 00008 #include "http_auction_handler.h" 00009 #include "http_exchange_connector.h" 00010 00011 #include "jml/arch/exception.h" 00012 #include "jml/arch/format.h" 00013 #include "jml/arch/backtrace.h" 00014 #include "jml/utils/guard.h" 00015 #include "jml/utils/set_utils.h" 00016 #include "jml/utils/vector_utils.h" 00017 #include "jml/arch/timers.h" 00018 #include <set> 00019 00020 #include <boost/foreach.hpp> 00021 00022 using namespace std; 00023 using namespace ML; 00024 00025 namespace RTBKIT { 00026 00027 00028 /*****************************************************************************/ 00029 /* HTTP AUCTION HANDLER */ 00030 /*****************************************************************************/ 00031 00032 long HttpAuctionHandler::created = 0; 00033 long HttpAuctionHandler::destroyed = 0; 00034 00035 HttpAuctionHandler:: 00036 HttpAuctionHandler() 00037 : hasTimer(false), disconnected(false), servingRequest(false) 00038 { 00039 atomic_add(created, 1); 00040 } 00041 00042 HttpAuctionHandler:: 00043 ~HttpAuctionHandler() 00044 { 00045 if (servingRequest) 00046 ML::atomic_add(endpoint->numServingRequest_, -1); 00047 //cerr << "deleting HttpAuctionHandler at " << this << endl; 00048 //backtrace(); 00049 atomic_add(destroyed, 1); 00050 if (hasTimer) 00051 throw Exception("deleted auction handler with timer"); 00052 } 00053 00054 void 00055 HttpAuctionHandler:: 00056 onGotTransport() 00057 { 00058 transport().activities.limit(5); 00059 //transport().activities.clear(); 00060 00061 addActivityS("gotTransport"); 00062 00063 //cerr << "bid handler got transport" << endl; 00064 00065 this->endpoint = dynamic_cast<HttpExchangeConnector *>(get_endpoint()); 00066 if (!this->endpoint) 00067 throw Exception("HttpAuctionHandler needs to be owned by an " 00068 "HttpExchangeConnector"); 00069 00070 HttpConnectionHandler::onGotTransport(); 00071 00072 startReading(); 00073 } 00074 00075 void 00076 HttpAuctionHandler:: 00077 handleDisconnect() 00078 { 00079 doEvent("auctionDisconnection"); 00080 00081 disconnected = true; 00082 00083 closeWhenHandlerFinished(); 00084 } 00085 00086 void 00087 HttpAuctionHandler:: 00088 handleTimeout(Date date, size_t cookie) 00089 { 00090 checkMagic(); 00091 00092 std::shared_ptr<Auction> auction_ = auction; 00093 if (auction->tooLate()) return; // was sent before timeout 00094 00095 #if 0 00096 cerr << "auction " << auction_->id << " timed out with tooLate " 00097 << auction->tooLate() << " and response " 00098 << auction->getResponseJson().toString() << endl; 00099 #endif 00100 00101 if (!auction_) { 00102 cerr << "doAuctionTimeout with no auction" << endl; 00103 abort(); 00104 } 00105 00106 if (auction_->finish()) { 00107 doEvent("auctionTimeout"); 00108 if (this->endpoint->onTimeout) 00109 this->endpoint->onTimeout(auction, date); 00110 } 00111 } 00112 00113 void 00114 HttpAuctionHandler:: 00115 cancelTimer() 00116 { 00117 if (!hasTimer) return; 00118 00119 //cerr << "cancelling timer" << endl; 00120 00121 addActivityS("cancelTimer"); 00122 ConnectionHandler::cancelTimer(); 00123 hasTimer = false; 00124 } 00125 00126 void 00127 HttpAuctionHandler:: 00128 onDisassociate() 00129 { 00130 //cerr << "onDisassociate" << endl; 00131 00132 cancelTimer(); 00133 00134 /* We need to make sure that the handler doesn't try to send us 00135 anything. */ 00136 if (auction && !auction->tooLate()) { 00137 auction->isZombie = true; 00138 doEvent("disconnectWithActiveAuction"); 00139 //transport().activities.dump(); 00140 //cerr << "disassociation of HttpAuctionHandler " << this 00141 // << " when auction not finished" 00142 // << endl; 00143 //backtrace(); 00144 //throw Exception("attempt to disassociate HttpAuctionHandler when not " 00145 // "too late: error %s", error.c_str()); 00146 } 00147 00148 endpoint->finishedWithHandler(shared_from_this()); 00149 } 00150 00151 void 00152 HttpAuctionHandler:: 00153 onCleanup() 00154 { 00155 onDisassociate(); 00156 } 00157 00158 void 00159 HttpAuctionHandler:: 00160 doEvent(const char * eventName, 00161 EventType type, 00162 float value, 00163 const char * units) 00164 { 00165 //cerr << eventName << " " << value << endl; 00166 endpoint->recordEvent(eventName, type, value); 00167 } 00168 00169 void 00170 HttpAuctionHandler:: 00171 incNumServingRequest() 00172 { 00173 ML::atomic_add(endpoint->numServingRequest_, 1); 00174 } 00175 00176 void 00177 HttpAuctionHandler:: 00178 handleHttpPayload(const HttpHeader & header, 00179 const std::string & payload) 00180 { 00181 // Unknown resource? Handle it... 00182 //cerr << header.verb << " " << header.resource << endl; 00183 //cerr << header << endl; 00184 //cerr << payload << endl; 00185 //cerr << endpoint->auctionVerb << " " << endpoint->auctionResource << endl; 00186 00187 if (header.resource != endpoint->auctionResource 00188 || header.verb != endpoint->auctionVerb) { 00189 endpoint->handleUnknownRequest(*this, header, payload); 00190 return; 00191 } 00192 00193 doEvent("auctionReceived"); 00194 00195 incNumServingRequest(); 00196 servingRequest = true; 00197 00198 addActivityS("handleHttpPayload"); 00199 00200 stopReading(); 00201 00202 // First check if we are authorized to bid. If not we drop the auction 00203 // with prejudice. 00204 Date now = Date::now(); 00205 00206 if (!endpoint->isEnabled(now)) { 00207 doEvent("auctionEarlyDrop.notEnabled"); 00208 dropAuction("endpoint not enabled"); 00209 return; 00210 } 00211 00212 double acceptProbability = endpoint->acceptAuctionProbability; 00213 00214 if (acceptProbability < 1.0 00215 && random() % 1000000 > 1000000 * acceptProbability) { 00216 // early drop... 00217 dropAuction("random early drop"); 00218 return; 00219 } 00220 00221 double timeAvailableMs = getTimeAvailableMs(header, payload); 00222 double networkTimeMs = getRoundTripTimeMs(header); 00223 00224 doEvent("auctionStartLatencyMs", 00225 ET_OUTCOME, 00226 now.secondsSince(firstData) * 1000.0, "ms"); 00227 00228 doEvent("auctionTimeAvailableMs", 00229 ET_OUTCOME, 00230 timeAvailableMs, "ms"); 00231 00232 00233 if (timeAvailableMs - networkTimeMs < 5.0) { 00234 // Do an early drop of the bid request without even creating an 00235 // auction 00236 00237 doEvent("auctionEarlyDrop.timeLeftMs", 00238 ET_OUTCOME, 00239 timeAvailableMs, "ms"); 00240 00241 doEvent(ML::format("auctionEarlyDrop.peer.%s", 00242 transport().getPeerName().c_str()).c_str()); 00243 00244 dropAuction(ML::format("timeleft of %f is too low", 00245 timeAvailableMs)); 00246 00247 return; 00248 } 00249 00250 /* This is the callback that the auction will call once finish() 00251 has been called on it. It will be called exactly once. 00252 */ 00253 auto handleAuction = [=] (std::shared_ptr<Auction> auction) 00254 { 00255 //cerr << "HANDLE AUCTION CALLED AFTER " 00256 //<< Date::now().secondsSince(auction->start) * 1000 00257 //<< "ms" << endl; 00258 00259 if (!auction || auction->isZombie) 00260 return; // Was already externally terminated; this is invalid 00261 00262 if (this->transport().lockedByThisThread()) 00263 this->sendResponse(); 00264 else { 00265 this->transport() 00266 .doAsync(boost::bind(&HttpAuctionHandler::sendResponse, 00267 this), 00268 "AsyncSendResponse"); 00269 } 00270 }; 00271 00272 // We always give ourselves 5ms to bid in, no matter what (let upstream 00273 // deal with it if it's really that much too slow). 00274 Date expiry = firstData.plusSeconds 00275 (max(5.0, (timeAvailableMs - networkTimeMs)) / 1000.0); 00276 00277 try { 00278 auto bidRequest = parseBidRequest(header, payload); 00279 00280 if (!bidRequest) { 00281 cerr << "got no bid request" << endl; 00282 // The request was handled; nothing to do 00283 return; 00284 } 00285 00286 auction.reset(new Auction(endpoint, 00287 handleAuction, bidRequest, 00288 bidRequest->toJsonStr(), 00289 "datacratic", 00290 firstData, expiry)); 00291 00292 #if 0 00293 static std::mutex lock; 00294 std::unique_lock<std::mutex> guard(lock); 00295 cerr << "bytes before = " << payload.size() << " after " 00296 << auction->requestStr.size() << " ratio " 00297 << 100.0 * auction->requestStr.size() / payload.size() 00298 << "%" << endl; 00299 string s = bidRequest->serializeToString(); 00300 cerr << "serialized bytes before = " << payload.size() << " after " 00301 << s.size() << " ratio " 00302 << 100.0 * s.size() / payload.size() << "%" << endl; 00303 #endif 00304 00305 } catch (const std::exception & exc) { 00306 // Inject the error message in 00307 sendErrorResponse("Error parsing bid request", exc.what()); 00308 return; 00309 } 00310 00311 doEvent("auctionNetworkLatencyMs", 00312 ET_OUTCOME, 00313 (firstData.secondsSince(auction->request->timestamp)) * 1000.0, 00314 "ms"); 00315 00316 doEvent("auctionTotalStartLatencyMs", 00317 ET_OUTCOME, 00318 (now.secondsSince(auction->request->timestamp)) * 1000.0, 00319 "ms"); 00320 00321 doEvent("auctionStart"); 00322 00323 addActivity("gotAuction %s", auction->id.toString().c_str()); 00324 00325 if (now > expiry) { 00326 doEvent("auctionAlreadyExpired"); 00327 00328 string msg = format("auction started after time already elapsed: " 00329 "%s vs %s, available time = %.1fms, " 00330 "firstData = %s", 00331 now.print(4).c_str(), 00332 expiry.print(4).c_str(), 00333 timeAvailableMs, 00334 firstData.print(4).c_str()); 00335 cerr << msg << endl; 00336 00337 dropAuction("auction already expired"); 00338 00339 return; 00340 } 00341 00342 addActivity("timeAvailable: %.1fms", timeAvailableMs); 00343 00344 scheduleTimerAbsolute(expiry, 1); 00345 hasTimer = true; 00346 00347 addActivity("gotTimer for %s", 00348 expiry.print(4).c_str()); 00349 00350 auction->doneParsing = Date::now(); 00351 endpoint->onNewAuction(auction); 00352 } 00353 00354 void 00355 HttpAuctionHandler:: 00356 sendResponse() 00357 { 00358 checkMagic(); 00359 00360 if (!transport().lockedByThisThread()) 00361 throw Exception("sendResponse must be in handler context"); 00362 00363 addActivityS("sendResponse"); 00364 00365 //cerr << "locked by " << bid->lock.get_thread_id() << endl; 00366 //cerr << "my thread " << ACE_OS::thr_self() << endl; 00367 00368 Date before = Date::now(); 00369 00370 /* Make sure the transport isn't dead. */ 00371 transport().checkMagic(); 00372 00373 if (!transport().lockedByThisThread()) 00374 throw Exception("transport not locked by this thread"); 00375 00376 if (!auction) { 00377 throw Exception("sending response for cleared auction"); 00378 } 00379 00380 if (!auction->tooLate()) 00381 throw Exception("auction is not finished"); 00382 00383 addActivity("sendResponse (lock took %.2fms)", 00384 Date::now().secondsSince(before) * 1000); 00385 00386 cancelTimer(); 00387 00388 endpoint->onAuctionDone(auction); 00389 00390 //cerr << "sendResponse " << this << ": disconnected " 00391 // << disconnected << endl; 00392 00393 if (disconnected) { 00394 closeWhenHandlerFinished(); 00395 return; 00396 } 00397 00398 HttpResponse response = getResponse(); 00399 00400 Date startTime = auction->start; 00401 Date beforeSend = Date::now(); 00402 00403 auto onSendFinished = [=] () 00404 { 00405 //static int n = 0; 00406 //ML::atomic_add(n, 1); 00407 //cerr << "sendFinished canBlock = " << canBlock << " " 00408 //<< n << endl; 00409 this->addActivityS("sendFinished"); 00410 double sendTime = Date::now().secondsSince(beforeSend); 00411 if (sendTime > 0.01) 00412 cerr << "sendTime = " << sendTime << " for " 00413 << (auction ? auction->id.toString() : "NO AUCTION") 00414 << endl; 00415 00416 this->doEvent("auctionResponseSent"); 00417 this->doEvent("auctionTotalTimeMs", 00418 ET_OUTCOME, 00419 Date::now().secondsSince(this->firstData) * 1000.0, 00420 "ms"); 00421 00422 if (random() % 1000 == 0) { 00423 this->transport().closeWhenHandlerFinished(); 00424 } 00425 else { 00426 this->transport().associateWhenHandlerFinished 00427 (this->makeNewHandlerShared(), "sendFinished"); 00428 } 00429 }; 00430 00431 addActivityS("beforeSend"); 00432 00433 double timeTaken = beforeSend.secondsSince(startTime) * 1000; 00434 00435 response.extraHeaders 00436 .push_back({"X-Processing-Time-Ms", to_string(timeTaken)}); 00437 00438 putResponseOnWire(response, onSendFinished); 00439 } 00440 00441 void 00442 HttpAuctionHandler:: 00443 dropAuction(const std::string & reason) 00444 { 00445 auto onSendFinished = [=] () 00446 { 00447 if (random() % 1000 == 0) { 00448 this->transport().closeWhenHandlerFinished(); 00449 } 00450 else { 00451 this->transport().associateWhenHandlerFinished 00452 (this->makeNewHandlerShared(), "sendFinished"); 00453 } 00454 }; 00455 00456 putResponseOnWire(endpoint->getDroppedAuctionResponse(*this, *auction, reason), 00457 onSendFinished); 00458 } 00459 00460 void 00461 HttpAuctionHandler:: 00462 sendErrorResponse(const std::string & error, 00463 const std::string & details) 00464 { 00465 putResponseOnWire(endpoint->getErrorResponse(*this, *auction, error + ": " + details)); 00466 } 00467 00468 std::string 00469 HttpAuctionHandler:: 00470 status() const 00471 { 00472 if (!hasTransport()) 00473 return "newly minted HttpAuctionHandler"; 00474 00475 string result = format("GenericHttpAuctionHandler: %p readState %d hasTimer %d", 00476 this, readState, hasTimer); 00477 result += "auction: "; 00478 if (auction) result += getResponse().body; 00479 else result += "NULL"; 00480 return result; 00481 } 00482 00483 HttpResponse 00484 HttpAuctionHandler:: 00485 getResponse() const 00486 { 00487 return endpoint->getResponse(*this, this->header, *auction); 00488 } 00489 00490 std::shared_ptr<BidRequest> 00491 HttpAuctionHandler:: 00492 parseBidRequest(const HttpHeader & header, 00493 const std::string & payload) 00494 { 00495 return endpoint->parseBidRequest(*this, header, payload); 00496 } 00497 00498 double 00499 HttpAuctionHandler:: 00500 getTimeAvailableMs(const HttpHeader & header, 00501 const std::string & payload) 00502 { 00503 return endpoint->getTimeAvailableMs(*this, header, payload); 00504 } 00505 00506 double 00507 HttpAuctionHandler:: 00508 getRoundTripTimeMs(const HttpHeader & header) 00509 { 00510 return endpoint->getRoundTripTimeMs(*this, header); 00511 } 00512 00513 } // namespace RTBKIT
1.7.6.1