RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* bidding_agent.cc -*- C++ -*- 00002 RĂ©mi Attab, 14 December 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Implementation details of the router proxy. 00006 */ 00007 00008 #include "rtbkit/plugins/bidding_agent/bidding_agent.h" 00009 #include "rtbkit/core/agent_configuration/agent_config.h" 00010 00011 #include "jml/arch/exception.h" 00012 #include "jml/arch/timers.h" 00013 #include "jml/utils/vector_utils.h" 00014 #include "jml/utils/exc_check.h" 00015 #include "jml/utils/exc_assert.h" 00016 #include "jml/arch/futex.h" 00017 #include "soa/service/zmq_utils.h" 00018 #include "soa/service/process_stats.h" 00019 00020 #include <boost/lexical_cast.hpp> 00021 #include <boost/algorithm/string.hpp> 00022 #include <iostream> 00023 00024 using namespace std; 00025 using namespace Datacratic; 00026 using namespace RTBKIT; 00027 using namespace ML; 00028 00029 namespace RTBKIT { 00030 00031 00032 /******************************************************************************/ 00033 /* OVERLOADED UTILITIES */ 00034 /******************************************************************************/ 00035 00036 inline void 00037 sendMesg( 00038 zmq::socket_t & sock, 00039 const Id & id, 00040 int options = 0) 00041 { 00042 Datacratic::sendMesg(sock, id.toString(), options); 00043 } 00044 00045 static Json::Value 00046 jsonParse(const std::string & str) 00047 { 00048 if (str.empty()) return Json::Value(); 00049 return Json::parse(str); 00050 } 00051 00052 /******************************************************************************/ 00053 /* ROUTER PROXY */ 00054 /******************************************************************************/ 00055 00056 BiddingAgent:: 00057 BiddingAgent(std::shared_ptr<ServiceProxies> proxies, 00058 const std::string & name) 00059 : ServiceBase(name, proxies), 00060 agentName(name + "_" + to_string(getpid())), 00061 toRouters(getZmqContext()), 00062 toPostAuctionServices(getZmqContext()), 00063 toConfigurationAgent(getZmqContext()), 00064 toRouterChannel(65536), 00065 requiresAllCB(true) 00066 { 00067 } 00068 00069 BiddingAgent:: 00070 BiddingAgent(ServiceBase& parent, 00071 const std::string & name) 00072 : ServiceBase(name, parent), 00073 agentName(name + "_" + to_string(getpid())), 00074 toRouters(getZmqContext()), 00075 toPostAuctionServices(getZmqContext()), 00076 toConfigurationAgent(getZmqContext()), 00077 toRouterChannel(65536), 00078 requiresAllCB(true) 00079 { 00080 } 00081 00082 BiddingAgent:: 00083 ~BiddingAgent() 00084 { 00085 shutdown(); 00086 } 00087 00088 void 00089 BiddingAgent:: 00090 init() 00091 { 00092 auto messageHandler = [=] ( 00093 const string & service, const vector<string>& msg) 00094 { 00095 try { 00096 handleRouterMessage(service, msg); 00097 } 00098 catch (const std::exception& ex) { 00099 recordHit("error"); 00100 cerr << "Error handling auction message " << ex.what() << endl; 00101 for (size_t i = 0; i < msg.size(); ++i) 00102 cerr << "\t" << i << ": " << msg[i] << endl; 00103 cerr << endl; 00104 } 00105 }; 00106 00107 toRouters.messageHandler = messageHandler; 00108 00109 toPostAuctionServices.messageHandler = messageHandler; 00110 toConfigurationAgent.init(getServices()->config, agentName); 00111 toConfigurationAgent.connectToServiceClass 00112 ("rtbAgentConfiguration", "agents"); 00113 00114 toConfigurationAgent.connectHandler = [&] (const std::string&) 00115 { 00116 sendConfig(); 00117 }; 00118 00119 toRouters.init(getServices()->config, agentName); 00120 toRouters.connectHandler = [=] (const std::string & connectedTo) 00121 { 00122 std::stringstream ss; 00123 ss << "BiddingAgent is connected to router " 00124 << connectedTo << endl; 00125 cerr << ss.str() ; 00126 toRouters.sendMessage(connectedTo, "CONFIG", agentName); 00127 }; 00128 toRouters.connectAllServiceProviders("rtbRequestRouter", "agents"); 00129 toRouterChannel.onEvent = [=] (const RouterMessage & msg) 00130 { 00131 toRouters.sendMessage(msg.toRouter, msg.type, msg.payload); 00132 }; 00133 toPostAuctionServices.init(getServices()->config, agentName); 00134 toPostAuctionServices.connectHandler = [=] (const std::string & connectedTo) 00135 { 00136 cerr << "BiddingAgent is connected to post auction service " 00137 << connectedTo << endl; 00138 //toPostAuctionServices.sendMessage(connectedTo, "CONFIG", agentName); 00139 }; 00140 toPostAuctionServices.connectAllServiceProviders("rtbPostAuctionService", 00141 "agents"); 00142 00143 addSource("BiddingAgent::toRouters", toRouters); 00144 addSource("BiddingAgent::toPostAuctionServices", toPostAuctionServices); 00145 addSource("BiddingAgent::toConfigurationAgent", toConfigurationAgent); 00146 addSource("BiddingAgent::toRouterChannel", toRouterChannel); 00147 00148 MessageLoop::init(); 00149 } 00150 00151 void 00152 BiddingAgent:: 00153 shutdown() 00154 { 00155 MessageLoop::shutdown(); 00156 00157 toConfigurationAgent.shutdown(); 00158 toRouters.shutdown(); 00159 //toPostAuctionService.shutdown(); 00160 } 00161 00162 void 00163 BiddingAgent:: 00164 handleRouterMessage(const std::string & fromRouter, 00165 const std::vector<std::string> & message) 00166 { 00167 if (message.empty()) { 00168 cerr << "invalid empty message received" << endl; 00169 recordHit("errorEmptyMessage"); 00170 return; 00171 } 00172 recordHit(message[0]); 00173 if (message[0].empty()) { 00174 cerr << "invalid message with empty type received" << endl; 00175 recordHit("errorEmptyMessageType"); 00176 return; 00177 } 00178 00179 bool invalid = false; 00180 00181 switch (message[0][0]) { 00182 case 'A': 00183 if (message[0] == "AUCTION") 00184 handleBidRequest(fromRouter, message, onBidRequest); 00185 else invalid = true; 00186 break; 00187 00188 case 'W': 00189 if (message[0] == "WIN") 00190 handleResult(message, onWin); 00191 else invalid = true; 00192 break; 00193 00194 case 'L': 00195 if (message[0] == "LOSS") 00196 handleResult(message, onLoss); 00197 else invalid = true; 00198 break; 00199 00200 case 'N': 00201 if (message[0] == "NOBUDGET") 00202 handleResult(message, onNoBudget); 00203 else if (message[0] == "NEEDCONFIG") sendConfig(); 00204 else invalid = true; 00205 break; 00206 00207 case 'T': 00208 if (message[0] == "TOOLATE") 00209 handleResult(message, onTooLate); 00210 else invalid = true; 00211 break; 00212 00213 case 'I': 00214 if (message[0] == "INVALID") 00215 handleResult(message, onInvalidBid); 00216 else if (message[0] == "IMPRESSION") 00217 handleDelivery(message, onImpression); 00218 else invalid = true; 00219 break; 00220 00221 case 'D': 00222 if (message[0] == "DROPPEDBID") 00223 handleResult(message, onDroppedBid); 00224 else invalid = true; 00225 break; 00226 00227 case 'G': 00228 if (message[0] == "GOTCONFIG") { /* no-op */ } 00229 else invalid = true; 00230 break; 00231 00232 case 'E': 00233 if (message[0] == "ERROR") 00234 handleError(message, onError); 00235 else invalid = true; 00236 break; 00237 00238 case 'B': 00239 if (message[0] == "BYEBYE") { /*no-op*/ } 00240 else invalid = true; 00241 break; 00242 00243 case 'C': 00244 if (message[0] == "CLICK") 00245 handleDelivery(message, onClick); 00246 else invalid = true; 00247 break; 00248 00249 case 'V': 00250 if (message[0] == "VISIT") 00251 handleDelivery(message, onVisit); 00252 else invalid = true; 00253 break; 00254 00255 case 'P': 00256 if (message[0] == "PING0") { 00257 //cerr << "ping0: message " << message << endl; 00258 00259 // Low-level ping (to measure network/message queue backlog); 00260 // we return straight away 00261 auto message_ = message; 00262 string received = message.at(1); 00263 message_.erase(message_.begin(), message_.begin() + 2); 00264 toRouters.sendMessage(fromRouter, "PONG0", received, Date::now(), message_); 00265 } 00266 else if (message[0] == "PING1") { 00267 // High-level ping (to measure whole stack backlog); 00268 // we pass through to the agent to process so we can measure 00269 // any backlog in the agent itself 00270 handlePing(fromRouter, message, onPing); 00271 } 00272 else invalid = true; 00273 break; 00274 00275 default: 00276 invalid = true; 00277 break; 00278 } 00279 00280 if (invalid) { 00281 recordHit("errorUnknownMessage"); 00282 cerr << "Unknown message: {"; 00283 for_each(message.begin(), message.end(), [&](const string& m) { 00284 cerr << m << ", "; 00285 }); 00286 cerr << "}" << endl; 00287 } 00288 } 00289 00290 namespace { 00291 00295 static string 00296 eventName(const string& name) 00297 { 00298 switch(name[0]) { 00299 case 'C': 00300 if (name == "CLICK") return "clicks"; 00301 break; 00302 00303 case 'D': 00304 if (name == "DROPPEDBID") return "droppedbids"; 00305 break; 00306 00307 case 'E': 00308 if (name == "ERROR") return "errors"; 00309 break; 00310 00311 case 'I': 00312 if (name == "INVALIDBID") return "invalidbids"; 00313 if (name == "IMPRESSION") return "impressions"; 00314 break; 00315 00316 case 'L': 00317 if (name == "LOSS") return "losses"; 00318 break; 00319 00320 case 'N': 00321 if (name == "NOBUDGET") return "nobudgets"; 00322 break; 00323 00324 case 'P': 00325 if (name == "PING1") return "ping"; 00326 break; 00327 00328 case 'T': 00329 if (name == "TOOLATE") return "toolate"; 00330 break; 00331 00332 case 'V': 00333 if (name == "VISIT") return "visits"; 00334 break; 00335 00336 case 'W': 00337 if (name == "WIN") return "wins"; 00338 break; 00339 } 00340 00341 ExcAssert(false); 00342 return "unknown"; 00343 } 00344 00345 } // anonymous namespace 00346 00347 00348 void 00349 BiddingAgent:: 00350 checkMessageSize(const std::vector<std::string>& msg, int expectedSize) 00351 { 00352 if (msg.size() >= expectedSize) 00353 return; 00354 00355 string msgStr = boost::lexical_cast<string>(msg); 00356 throw ML::Exception("Message of wrong size: size=%d, expected=%d, msg=%s", 00357 msg.size(), expectedSize, msgStr.c_str()); 00358 } 00359 00360 void 00361 BiddingAgent:: 00362 handleBidRequest(const std::string & fromRouter, 00363 const std::vector<std::string>& msg, BidRequestCbFn& callback) 00364 { 00365 ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]); 00366 if (!callback) return; 00367 00368 checkMessageSize(msg, 8); 00369 00370 double timestamp = boost::lexical_cast<double>(msg[1]); 00371 Id id(msg[2]); 00372 00373 string bidRequestSource = msg[3]; 00374 00375 std::shared_ptr<BidRequest> br( 00376 BidRequest::parse(bidRequestSource, msg[4])); 00377 00378 Json::Value imp = jsonParse(msg[5]); 00379 double timeLeftMs = boost::lexical_cast<double>(msg[6]); 00380 Json::Value augmentations = jsonParse(msg[7]); 00381 00382 Bids bids; 00383 bids.reserve(imp.size()); 00384 00385 for (size_t i = 0; i < imp.size(); ++i) { 00386 Bid bid; 00387 00388 bid.spotIndex = imp[i]["spot"].asInt(); 00389 for (const auto& creative : imp[i]["creatives"]) 00390 bid.availableCreatives.push_back(creative.asInt()); 00391 00392 bids.push_back(bid); 00393 } 00394 00395 00396 recordHit("requests"); 00397 00398 ExcCheck(!requests.count(id), "seen multiple requests with same ID"); 00399 { 00400 lock_guard<mutex> guard (requestsLock); 00401 00402 requests[id].timestamp = Date::now(); 00403 requests[id].fromRouter = fromRouter; 00404 } 00405 00406 callback(timestamp, id, br, bids, timeLeftMs, augmentations); 00407 } 00408 00409 void 00410 BiddingAgent:: 00411 handleResult(const std::vector<std::string>& msg, ResultCbFn& callback) 00412 { 00413 ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]); 00414 if (!callback) return; 00415 00416 checkMessageSize(msg, 6); 00417 00418 recordHit(eventName(msg[0])); 00419 BidResult result = BidResult::parse(msg); 00420 00421 if (result.result == BS_WIN) 00422 recordLevel(MicroUSD(result.secondPrice), "winPrice"); 00423 00424 callback(result); 00425 00426 if (result.result == BS_DROPPEDBID) { 00427 lock_guard<mutex> guard (requestsLock); 00428 requests.erase(Id(msg[3])); 00429 } 00430 } 00431 00432 void 00433 BiddingAgent:: 00434 handleError(const std::vector<std::string>& msg, ErrorCbFn& callback) 00435 { 00436 ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]); 00437 if (!callback) return; 00438 00439 double timestamp = boost::lexical_cast<double>(msg[1]); 00440 string description = msg[2]; 00441 00442 vector<string> originalMessage; 00443 copy(msg.begin()+2, msg.end(), 00444 back_insert_iterator< vector<string> >(originalMessage)); 00445 00446 callback(timestamp, description, originalMessage); 00447 } 00448 00449 void 00450 BiddingAgent:: 00451 handleDelivery(const std::vector<std::string>& msg, DeliveryCbFn& callback) 00452 { 00453 ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]); 00454 if (!callback) return; 00455 00456 checkMessageSize(msg, 12); 00457 00458 DeliveryEvent ev = DeliveryEvent::parse(msg); 00459 recordHit(eventName(ev.event)); 00460 00461 callback(ev); 00462 } 00463 00464 void 00465 BiddingAgent:: 00466 doBid(Id id, const Bids & bids, const Json::Value & jsonMeta) 00467 { 00468 Json::FastWriter jsonWriter; 00469 00470 string response = jsonWriter.write(bids.toJson()); 00471 boost::trim(response); 00472 00473 string meta = jsonWriter.write(jsonMeta); 00474 boost::trim(meta); 00475 00476 Date afterSend = Date::now(); 00477 Date beforeSend; 00478 string fromRouter; 00479 { 00480 lock_guard<mutex> guard (requestsLock); 00481 00482 auto it = requests.find(id); 00483 if (it != requests.end()) { 00484 beforeSend = it->second.timestamp; 00485 fromRouter = it->second.fromRouter; 00486 requests.erase(it); 00487 } 00488 } 00489 if (fromRouter.empty()) return; 00490 00491 recordLevel((afterSend - beforeSend) * 1000.0, "timeTakenMs"); 00492 00493 toRouterChannel.push(RouterMessage( 00494 fromRouter, "BID", { id.toString(), response, meta })); 00495 00497 for (const Bid& bid : bids) { 00498 if (bid.isNullBid()) recordHit("filtered.total"); 00499 else { 00500 recordHit("bids"); 00501 recordLevel(bid.price.value, "bidPrice." + bid.price.getCurrencyStr()); 00502 } 00503 } 00504 } 00505 00506 void 00507 BiddingAgent:: 00508 handlePing(const std::string & fromRouter, 00509 const std::vector<std::string> & msg, 00510 PingCbFn& callback) 00511 { 00512 recordHit(eventName(msg[0])); 00513 00514 Date started = Date::parseSecondsSinceEpoch(msg.at(1)); 00515 vector<string> payload(msg.begin() + 2, msg.end()); 00516 00517 if (callback) 00518 callback(fromRouter, started, payload); 00519 else 00520 doPong(fromRouter, started, Date::now(), payload); 00521 } 00522 00523 void 00524 BiddingAgent:: 00525 doPong(const std::string & fromRouter, Date sent, Date received, 00526 const std::vector<std::string> & payload) 00527 { 00528 //cerr << "doPong with payload " << payload << " sent " << sent 00529 // << " received " << received << endl; 00530 00531 vector<string> message = { 00532 to_string(sent.secondsSinceEpoch()), 00533 to_string(received.secondsSinceEpoch()) 00534 }; 00535 00536 message.insert(message.end(), payload.begin(), payload.end()); 00537 toRouterChannel.push(RouterMessage(fromRouter, "PONG1", message)); 00538 } 00539 00540 void 00541 BiddingAgent:: 00542 doConfig(const AgentConfig& config) 00543 { 00544 doConfigJson(config.toJson()); 00545 } 00546 00547 void 00548 BiddingAgent:: 00549 doConfigJson(Json::Value jsonConfig) 00550 { 00551 Json::FastWriter jsonWriter; 00552 00553 std::string newConfig = jsonWriter.write(jsonConfig); 00554 boost::trim(newConfig); 00555 00556 sendConfig(newConfig); 00557 } 00558 00559 void 00560 BiddingAgent:: 00561 sendConfig(const std::string& newConfig) 00562 { 00563 std::lock_guard<std::mutex> guard(configLock); 00564 00565 if (!newConfig.empty()) config = newConfig; 00566 if (config.empty()) return; 00567 00568 toConfigurationAgent.sendMessage("CONFIG", agentName, config); 00569 } 00570 00571 } // namespace RTBKIT