RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
plugins/bidding_agent/bidding_agent.cc
00001 /* bidding_agent.cc                                                   -*- C++ -*-
00002    RĂ©mi Attab, 14 December 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Implementation details of the router proxy.
00006 */
00007 
00008 #include "rtbkit/plugins/bidding_agent/bidding_agent.h"
00009 #include "rtbkit/core/agent_configuration/agent_config.h"
00010 
00011 #include "jml/arch/exception.h"
00012 #include "jml/arch/timers.h"
00013 #include "jml/utils/vector_utils.h"
00014 #include "jml/utils/exc_check.h"
00015 #include "jml/utils/exc_assert.h"
00016 #include "jml/arch/futex.h"
00017 #include "soa/service/zmq_utils.h"
00018 #include "soa/service/process_stats.h"
00019 
00020 #include <boost/lexical_cast.hpp>
00021 #include <boost/algorithm/string.hpp>
00022 #include <iostream>
00023 
00024 using namespace std;
00025 using namespace Datacratic;
00026 using namespace RTBKIT;
00027 using namespace ML;
00028 
00029 namespace RTBKIT {
00030 
00031 
00032 /******************************************************************************/
00033 /* OVERLOADED UTILITIES                                                       */
00034 /******************************************************************************/
00035 
00036 inline void
00037 sendMesg(
00038         zmq::socket_t & sock,
00039         const Id & id,
00040         int options = 0)
00041 {
00042     Datacratic::sendMesg(sock, id.toString(), options);
00043 }
00044 
00045 static Json::Value
00046 jsonParse(const std::string & str)
00047 {
00048     if (str.empty()) return Json::Value();
00049     return Json::parse(str);
00050 }
00051 
00052 /******************************************************************************/
00053 /* ROUTER PROXY                                                               */
00054 /******************************************************************************/
00055 
00056 BiddingAgent::
00057 BiddingAgent(std::shared_ptr<ServiceProxies> proxies,
00058             const std::string & name)
00059     : ServiceBase(name, proxies),
00060       agentName(name + "_" + to_string(getpid())),
00061       toRouters(getZmqContext()),
00062       toPostAuctionServices(getZmqContext()),
00063       toConfigurationAgent(getZmqContext()),
00064       toRouterChannel(65536),
00065       requiresAllCB(true)
00066 {
00067 }
00068 
00069 BiddingAgent::
00070 BiddingAgent(ServiceBase& parent,
00071             const std::string & name)
00072     : ServiceBase(name, parent),
00073       agentName(name + "_" + to_string(getpid())),
00074       toRouters(getZmqContext()),
00075       toPostAuctionServices(getZmqContext()),
00076       toConfigurationAgent(getZmqContext()),
00077       toRouterChannel(65536),
00078       requiresAllCB(true)
00079 {
00080 }
00081 
00082 BiddingAgent::
00083 ~BiddingAgent()
00084 {
00085     shutdown();
00086 }
00087 
00088 void
00089 BiddingAgent::
00090 init()
00091 {
00092     auto messageHandler = [=] (
00093             const string & service, const vector<string>& msg)
00094         {
00095             try {
00096                 handleRouterMessage(service, msg);
00097             }
00098             catch (const std::exception& ex) {
00099                 recordHit("error");
00100                 cerr << "Error handling auction message " << ex.what() << endl;
00101                 for (size_t i = 0; i < msg.size(); ++i)
00102                     cerr << "\t" << i << ": " << msg[i] << endl;
00103                 cerr << endl;
00104             }
00105         };
00106 
00107     toRouters.messageHandler = messageHandler;
00108 
00109     toPostAuctionServices.messageHandler = messageHandler;
00110     toConfigurationAgent.init(getServices()->config, agentName);
00111     toConfigurationAgent.connectToServiceClass
00112             ("rtbAgentConfiguration", "agents");
00113 
00114     toConfigurationAgent.connectHandler = [&] (const std::string&)
00115         {
00116             sendConfig();
00117         };
00118 
00119     toRouters.init(getServices()->config, agentName);
00120     toRouters.connectHandler = [=] (const std::string & connectedTo)
00121         {
00122             std::stringstream ss;
00123             ss << "BiddingAgent is connected to router "
00124                  << connectedTo << endl;
00125             cerr << ss.str() ;
00126             toRouters.sendMessage(connectedTo, "CONFIG", agentName);
00127         };
00128     toRouters.connectAllServiceProviders("rtbRequestRouter", "agents");
00129     toRouterChannel.onEvent = [=] (const RouterMessage & msg)
00130         {
00131             toRouters.sendMessage(msg.toRouter, msg.type, msg.payload);
00132         };
00133     toPostAuctionServices.init(getServices()->config, agentName);
00134     toPostAuctionServices.connectHandler = [=] (const std::string & connectedTo)
00135         {
00136             cerr << "BiddingAgent is connected to post auction service "
00137                  << connectedTo << endl;
00138             //toPostAuctionServices.sendMessage(connectedTo, "CONFIG", agentName);
00139         };
00140     toPostAuctionServices.connectAllServiceProviders("rtbPostAuctionService",
00141                                                      "agents");
00142 
00143     addSource("BiddingAgent::toRouters", toRouters);
00144     addSource("BiddingAgent::toPostAuctionServices", toPostAuctionServices);
00145     addSource("BiddingAgent::toConfigurationAgent", toConfigurationAgent);
00146     addSource("BiddingAgent::toRouterChannel", toRouterChannel);
00147 
00148     MessageLoop::init();
00149 }
00150 
00151 void
00152 BiddingAgent::
00153 shutdown()
00154 {
00155     MessageLoop::shutdown();
00156 
00157     toConfigurationAgent.shutdown();
00158     toRouters.shutdown();
00159     //toPostAuctionService.shutdown();
00160 }
00161 
00162 void
00163 BiddingAgent::
00164 handleRouterMessage(const std::string & fromRouter,
00165                     const std::vector<std::string> & message)
00166 {
00167     if (message.empty()) {
00168         cerr << "invalid empty message received" << endl;
00169         recordHit("errorEmptyMessage");
00170         return;
00171     }
00172     recordHit(message[0]);
00173     if (message[0].empty()) {
00174         cerr << "invalid message with empty type received" << endl;
00175         recordHit("errorEmptyMessageType");
00176         return;
00177     }
00178 
00179     bool invalid = false;
00180 
00181     switch (message[0][0]) {
00182     case 'A':
00183         if (message[0] == "AUCTION")
00184             handleBidRequest(fromRouter, message, onBidRequest);
00185         else invalid = true;
00186         break;
00187 
00188     case 'W':
00189         if (message[0] == "WIN")
00190             handleResult(message, onWin);
00191         else invalid = true;
00192         break;
00193 
00194     case 'L':
00195         if (message[0] == "LOSS")
00196             handleResult(message, onLoss);
00197         else invalid = true;
00198         break;
00199 
00200     case 'N':
00201         if (message[0] == "NOBUDGET")
00202             handleResult(message, onNoBudget);
00203         else if (message[0] == "NEEDCONFIG") sendConfig();
00204         else invalid = true;
00205         break;
00206 
00207     case 'T':
00208         if (message[0] == "TOOLATE")
00209             handleResult(message, onTooLate);
00210         else invalid = true;
00211         break;
00212 
00213     case 'I':
00214         if (message[0] == "INVALID")
00215             handleResult(message, onInvalidBid);
00216         else if (message[0] == "IMPRESSION")
00217             handleDelivery(message, onImpression);
00218         else invalid = true;
00219         break;
00220 
00221     case 'D':
00222         if (message[0] == "DROPPEDBID")
00223             handleResult(message, onDroppedBid);
00224         else invalid = true;
00225         break;
00226 
00227     case 'G':
00228         if (message[0] == "GOTCONFIG") { /* no-op */ }
00229         else invalid = true;
00230         break;
00231 
00232     case 'E':
00233         if (message[0] == "ERROR")
00234             handleError(message, onError);
00235         else invalid = true;
00236         break;
00237 
00238     case 'B':
00239         if (message[0] == "BYEBYE")   { /*no-op*/ }
00240         else invalid = true;
00241         break;
00242 
00243     case 'C':
00244         if (message[0] == "CLICK")
00245             handleDelivery(message, onClick);
00246         else invalid = true;
00247         break;
00248 
00249     case 'V':
00250         if (message[0] == "VISIT")
00251             handleDelivery(message, onVisit);
00252         else invalid = true;
00253         break;
00254 
00255     case 'P':
00256         if (message[0] == "PING0") {
00257             //cerr << "ping0: message " << message << endl;
00258 
00259             // Low-level ping (to measure network/message queue backlog);
00260             // we return straight away
00261             auto message_ = message;
00262             string received = message.at(1);
00263             message_.erase(message_.begin(), message_.begin() + 2);
00264             toRouters.sendMessage(fromRouter, "PONG0", received, Date::now(), message_);
00265         }
00266         else if (message[0] == "PING1") {
00267             // High-level ping (to measure whole stack backlog);
00268             // we pass through to the agent to process so we can measure
00269             // any backlog in the agent itself
00270             handlePing(fromRouter, message, onPing);
00271         }
00272         else invalid = true;
00273         break;
00274 
00275     default:
00276         invalid = true;
00277         break;
00278     }
00279 
00280     if (invalid) {
00281         recordHit("errorUnknownMessage");
00282         cerr << "Unknown message: {";
00283         for_each(message.begin(), message.end(), [&](const string& m) {
00284                     cerr << m << ", ";
00285                 });
00286         cerr << "}" << endl;
00287     }
00288 }
00289 
00290 namespace {
00291 
00295 static string
00296 eventName(const string& name)
00297 {
00298     switch(name[0]) {
00299     case 'C':
00300         if (name == "CLICK") return "clicks";
00301         break;
00302 
00303     case 'D':
00304         if (name == "DROPPEDBID") return "droppedbids";
00305         break;
00306 
00307     case 'E':
00308         if (name == "ERROR") return "errors";
00309         break;
00310 
00311     case 'I':
00312         if (name == "INVALIDBID") return "invalidbids";
00313         if (name == "IMPRESSION") return "impressions";
00314         break;
00315 
00316     case 'L':
00317         if (name == "LOSS") return "losses";
00318         break;
00319 
00320     case 'N':
00321         if (name == "NOBUDGET") return "nobudgets";
00322         break;
00323 
00324     case 'P':
00325         if (name == "PING1") return "ping";
00326         break;
00327 
00328     case 'T':
00329         if (name == "TOOLATE") return "toolate";
00330         break;
00331 
00332     case 'V':
00333         if (name == "VISIT") return "visits";
00334         break;
00335 
00336     case 'W':
00337         if (name == "WIN") return "wins";
00338         break;
00339     }
00340 
00341     ExcAssert(false);
00342     return "unknown";
00343 }
00344 
00345 } // anonymous namespace
00346 
00347 
00348 void
00349 BiddingAgent::
00350 checkMessageSize(const std::vector<std::string>& msg, int expectedSize)
00351 {
00352     if (msg.size() >= expectedSize)
00353         return;
00354 
00355     string msgStr = boost::lexical_cast<string>(msg);
00356     throw ML::Exception("Message of wrong size: size=%d, expected=%d, msg=%s",
00357             msg.size(), expectedSize, msgStr.c_str());
00358 }
00359 
00360 void
00361 BiddingAgent::
00362 handleBidRequest(const std::string & fromRouter,
00363                  const std::vector<std::string>& msg, BidRequestCbFn& callback)
00364 {
00365     ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]);
00366     if (!callback) return;
00367 
00368     checkMessageSize(msg, 8);
00369 
00370     double timestamp = boost::lexical_cast<double>(msg[1]);
00371     Id id(msg[2]);
00372 
00373     string bidRequestSource = msg[3];
00374 
00375     std::shared_ptr<BidRequest> br(
00376             BidRequest::parse(bidRequestSource, msg[4]));
00377 
00378     Json::Value imp = jsonParse(msg[5]);
00379     double timeLeftMs = boost::lexical_cast<double>(msg[6]);
00380     Json::Value augmentations = jsonParse(msg[7]);
00381 
00382     Bids bids;
00383     bids.reserve(imp.size());
00384 
00385     for (size_t i = 0; i < imp.size(); ++i) {
00386         Bid bid;
00387 
00388         bid.spotIndex = imp[i]["spot"].asInt();
00389         for (const auto& creative : imp[i]["creatives"])
00390             bid.availableCreatives.push_back(creative.asInt());
00391 
00392         bids.push_back(bid);
00393     }
00394 
00395 
00396     recordHit("requests");
00397 
00398     ExcCheck(!requests.count(id), "seen multiple requests with same ID");
00399     {
00400         lock_guard<mutex> guard (requestsLock);
00401 
00402         requests[id].timestamp = Date::now();
00403         requests[id].fromRouter = fromRouter;
00404     }
00405 
00406     callback(timestamp, id, br, bids, timeLeftMs, augmentations);
00407 }
00408 
00409 void
00410 BiddingAgent::
00411 handleResult(const std::vector<std::string>& msg, ResultCbFn& callback)
00412 {
00413     ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]);
00414     if (!callback) return;
00415 
00416     checkMessageSize(msg, 6);
00417 
00418     recordHit(eventName(msg[0]));
00419     BidResult result = BidResult::parse(msg);
00420 
00421     if (result.result == BS_WIN)
00422         recordLevel(MicroUSD(result.secondPrice), "winPrice");
00423 
00424     callback(result);
00425 
00426     if (result.result == BS_DROPPEDBID) {
00427         lock_guard<mutex> guard (requestsLock);
00428         requests.erase(Id(msg[3]));
00429     }
00430 }
00431 
00432 void
00433 BiddingAgent::
00434 handleError(const std::vector<std::string>& msg, ErrorCbFn& callback)
00435 {
00436     ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]);
00437     if (!callback) return;
00438 
00439     double timestamp = boost::lexical_cast<double>(msg[1]);
00440     string description = msg[2];
00441 
00442     vector<string> originalMessage;
00443     copy(msg.begin()+2, msg.end(),
00444             back_insert_iterator< vector<string> >(originalMessage));
00445 
00446     callback(timestamp, description, originalMessage);
00447 }
00448 
00449 void
00450 BiddingAgent::
00451 handleDelivery(const std::vector<std::string>& msg, DeliveryCbFn& callback)
00452 {
00453     ExcCheck(!requiresAllCB || callback, "Null callback for " + msg[0]);
00454     if (!callback) return;
00455 
00456     checkMessageSize(msg, 12);
00457 
00458     DeliveryEvent ev = DeliveryEvent::parse(msg);
00459     recordHit(eventName(ev.event));
00460 
00461     callback(ev);
00462 }
00463 
00464 void
00465 BiddingAgent::
00466 doBid(Id id, const Bids & bids, const Json::Value & jsonMeta)
00467 {
00468     Json::FastWriter jsonWriter;
00469 
00470     string response = jsonWriter.write(bids.toJson());
00471     boost::trim(response);
00472 
00473     string meta = jsonWriter.write(jsonMeta);
00474     boost::trim(meta);
00475 
00476     Date afterSend = Date::now();
00477     Date beforeSend;
00478     string fromRouter;
00479     {
00480         lock_guard<mutex> guard (requestsLock);
00481 
00482         auto it = requests.find(id);
00483         if (it != requests.end()) {
00484             beforeSend = it->second.timestamp;
00485             fromRouter = it->second.fromRouter;
00486             requests.erase(it);
00487         }
00488     }
00489     if (fromRouter.empty()) return;
00490 
00491     recordLevel((afterSend - beforeSend) * 1000.0, "timeTakenMs");
00492 
00493     toRouterChannel.push(RouterMessage(
00494                     fromRouter, "BID", { id.toString(), response, meta }));
00495 
00497     for (const Bid& bid : bids) {
00498         if (bid.isNullBid()) recordHit("filtered.total");
00499         else {
00500             recordHit("bids");
00501             recordLevel(bid.price.value, "bidPrice." + bid.price.getCurrencyStr());
00502         }
00503     }
00504 }
00505 
00506 void
00507 BiddingAgent::
00508 handlePing(const std::string & fromRouter,
00509            const std::vector<std::string> & msg,
00510            PingCbFn& callback)
00511 {
00512     recordHit(eventName(msg[0]));
00513 
00514     Date started = Date::parseSecondsSinceEpoch(msg.at(1));
00515     vector<string> payload(msg.begin() + 2, msg.end());
00516 
00517     if (callback)
00518         callback(fromRouter, started, payload);
00519     else
00520         doPong(fromRouter, started, Date::now(), payload);
00521 }
00522 
00523 void
00524 BiddingAgent::
00525 doPong(const std::string & fromRouter, Date sent, Date received,
00526        const std::vector<std::string> & payload)
00527 {
00528     //cerr << "doPong with payload " << payload << " sent " << sent
00529     //     << " received " << received << endl;
00530 
00531     vector<string> message = {
00532         to_string(sent.secondsSinceEpoch()),
00533         to_string(received.secondsSinceEpoch())
00534     };
00535 
00536     message.insert(message.end(), payload.begin(), payload.end());
00537     toRouterChannel.push(RouterMessage(fromRouter, "PONG1", message));
00538 }
00539 
00540 void
00541 BiddingAgent::
00542 doConfig(const AgentConfig& config)
00543 {
00544     doConfigJson(config.toJson());
00545 }
00546 
00547 void
00548 BiddingAgent::
00549 doConfigJson(Json::Value jsonConfig)
00550 {
00551     Json::FastWriter jsonWriter;
00552 
00553     std::string newConfig = jsonWriter.write(jsonConfig);
00554     boost::trim(newConfig);
00555 
00556     sendConfig(newConfig);
00557 }
00558 
00559 void
00560 BiddingAgent::
00561 sendConfig(const std::string& newConfig)
00562 {
00563     std::lock_guard<std::mutex> guard(configLock);
00564 
00565     if (!newConfig.empty()) config = newConfig;
00566     if (config.empty()) return;
00567 
00568     toConfigurationAgent.sendMessage("CONFIG", agentName, config);
00569 }
00570 
00571 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator