RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* http_exchange_connector.cc 00002 Jeremy Barnes, 31 January 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Auction endpoint class. 00006 */ 00007 00008 #include "http_exchange_connector.h" 00009 #include "http_auction_handler.h" 00010 #include "jml/arch/exception.h" 00011 #include "jml/arch/format.h" 00012 #include "jml/arch/backtrace.h" 00013 #include "jml/utils/guard.h" 00014 #include "jml/utils/set_utils.h" 00015 #include "jml/utils/vector_utils.h" 00016 #include "jml/arch/timers.h" 00017 #include "rtbkit/core/router/router.h" 00018 #include <set> 00019 00020 #include <boost/foreach.hpp> 00021 00022 00023 using namespace std; 00024 using namespace ML; 00025 00026 00027 namespace RTBKIT { 00028 00029 00030 /*****************************************************************************/ 00031 /* HTTP EXCHANGE CONNECTOR */ 00032 /*****************************************************************************/ 00033 00034 HttpExchangeConnector:: 00035 HttpExchangeConnector(const std::string & name, 00036 ServiceBase & parent) 00037 : ExchangeConnector(name, parent), 00038 HttpEndpoint(name) 00039 { 00040 postConstructorInit(); 00041 } 00042 00043 HttpExchangeConnector:: 00044 HttpExchangeConnector(const std::string & name, 00045 std::shared_ptr<ServiceProxies> proxies) 00046 : ExchangeConnector(name, proxies), 00047 HttpEndpoint(name) 00048 { 00049 postConstructorInit(); 00050 } 00051 00052 void 00053 HttpExchangeConnector:: 00054 postConstructorInit() 00055 { 00056 numThreads = 8; 00057 listenPort = 10001; 00058 bindHost = "*"; 00059 performNameLookup = true; 00060 backlog = DEF_BACKLOG; 00061 pingTimeUnknownHostsMs = 20; 00062 00063 numServingRequest_ = 0; 00064 acceptAuctionProbability = 1.0; 00065 00066 // Link up events 00067 onTransportOpen = [=] (TransportBase *) 00068 { 00069 this->recordHit("auctionNewConnection"); 00070 }; 00071 00072 onTransportClose = [=] (TransportBase *) 00073 { 00074 this->recordHit("auctionClosedConnection"); 00075 }; 00076 00077 handlerFactory = [=] () { return new HttpAuctionHandler(); }; 00078 } 00079 00080 HttpExchangeConnector:: 00081 ~HttpExchangeConnector() 00082 { 00083 shutdown(); 00084 } 00085 00086 void 00087 HttpExchangeConnector:: 00088 configure(const Json::Value & parameters) 00089 { 00090 getParam(parameters, numThreads, "numThreads"); 00091 getParam(parameters, listenPort, "listenPort"); 00092 getParam(parameters, bindHost, "bindHost"); 00093 getParam(parameters, performNameLookup, "performNameLookup"); 00094 getParam(parameters, backlog, "connectionBacklog"); 00095 getParam(parameters, auctionResource, "auctionResource"); 00096 getParam(parameters, auctionVerb, "auctionVerb"); 00097 getParam(parameters, pingTimesByHostMs, "pingTimesByHostMs"); 00098 getParam(parameters, pingTimeUnknownHostsMs, "pingTimeUnknownHostsMs"); 00099 } 00100 00101 void 00102 HttpExchangeConnector:: 00103 configureHttp(int numThreads, 00104 const PortRange & listenPort, 00105 const std::string & bindHost, 00106 bool performNameLookup, 00107 int backlog, 00108 const std::string & auctionResource, 00109 const std::string & auctionVerb) 00110 { 00111 this->numThreads = numThreads; 00112 this->listenPort = listenPort; 00113 this->bindHost = bindHost; 00114 this->performNameLookup = performNameLookup; 00115 this->backlog = backlog; 00116 this->auctionResource = auctionResource; 00117 this->auctionVerb = auctionVerb; 00118 } 00119 00120 void 00121 HttpExchangeConnector:: 00122 start() 00123 { 00124 PassiveEndpoint::init(listenPort, bindHost, numThreads, true, 00125 performNameLookup, backlog); 00126 } 00127 00128 void 00129 HttpExchangeConnector:: 00130 shutdown() 00131 { 00132 HttpEndpoint::shutdown(); 00133 ExchangeConnector::shutdown(); 00134 } 00135 00136 std::shared_ptr<ConnectionHandler> 00137 HttpExchangeConnector:: 00138 makeNewHandler() 00139 { 00140 return makeNewHandlerShared(); 00141 } 00142 00143 std::shared_ptr<HttpAuctionHandler> 00144 HttpExchangeConnector:: 00145 makeNewHandlerShared() 00146 { 00147 if (!handlerFactory) 00148 throw ML::Exception("need to initialize handler factory"); 00149 00150 HttpAuctionHandler * handler = handlerFactory(); 00151 std::shared_ptr<HttpAuctionHandler> handlerSp(handler); 00152 { 00153 Guard guard(handlersLock); 00154 handlers.insert(handlerSp); 00155 } 00156 00157 return handlerSp; 00158 } 00159 00160 void 00161 HttpExchangeConnector:: 00162 finishedWithHandler(std::shared_ptr<HttpAuctionHandler> handler) 00163 { 00164 Guard guard(handlersLock); 00165 handlers.erase(handler); 00166 } 00167 00168 Json::Value 00169 HttpExchangeConnector:: 00170 getServiceStatus() const 00171 { 00172 Json::Value result; 00173 00174 result["numConnections"] = numConnections(); 00175 result["activeConnections"] = numServingRequest(); 00176 result["connectionLoadFactor"] 00177 = xdiv<float>(numServingRequest(), 00178 numConnections()); 00179 00180 map<string, int> peerCounts = numConnectionsByHost(); 00181 00182 BOOST_FOREACH(auto cnt, peerCounts) 00183 result["hostConnections"][cnt.first] = cnt.second; 00184 00185 return result; 00186 } 00187 00188 std::shared_ptr<BidRequest> 00189 HttpExchangeConnector:: 00190 parseBidRequest(HttpAuctionHandler & connection, 00191 const HttpHeader & header, 00192 const std::string & payload) 00193 { 00194 throw ML::Exception("need to override HttpExchangeConnector::parseBidRequest"); 00195 } 00196 00197 double 00198 HttpExchangeConnector:: 00199 getTimeAvailableMs(HttpAuctionHandler & connection, 00200 const HttpHeader & header, 00201 const std::string & payload) 00202 { 00203 throw ML::Exception("need to override HttpExchangeConnector::getTimeAvailableMs"); 00204 } 00205 00206 double 00207 HttpExchangeConnector:: 00208 getRoundTripTimeMs(HttpAuctionHandler & connection, 00209 const HttpHeader & header) 00210 { 00211 string peerName = connection.transport().getPeerName(); 00212 00213 auto it = pingTimesByHostMs.find(peerName); 00214 if (it == pingTimesByHostMs.end()) 00215 return pingTimeUnknownHostsMs; 00216 return it->second; 00217 } 00218 00219 HttpResponse 00220 HttpExchangeConnector:: 00221 getResponse(const HttpAuctionHandler & connection, 00222 const HttpHeader & requestHeader, 00223 const Auction & auction) const 00224 { 00225 throw ML::Exception("need to override HttpExchangeConnector::getResponse"); 00226 } 00227 00228 HttpResponse 00229 HttpExchangeConnector:: 00230 getDroppedAuctionResponse(const HttpAuctionHandler & connection, 00231 const Auction & auction, 00232 const std::string & reason) const 00233 { 00234 // Default for when dropped auction == no bid 00235 return getResponse(connection, connection.header, auction); 00236 } 00237 00238 HttpResponse 00239 HttpExchangeConnector:: 00240 getErrorResponse(const HttpAuctionHandler & connection, 00241 const Auction & auction, 00242 const std::string & errorMessage) const 00243 { 00244 // Default for when error == no bid 00245 return getResponse(connection, connection.header, auction); 00246 } 00247 00248 void 00249 HttpExchangeConnector:: 00250 handleUnknownRequest(HttpAuctionHandler & connection, 00251 const HttpHeader & header, 00252 const std::string & payload) const 00253 { 00254 // Deal with the "/ready" request 00255 00256 if (header.resource == "/ready") { 00257 connection.putResponseOnWire(HttpResponse(200, "text/plain", "1")); 00258 return; 00259 } 00260 00261 // Otherwise, it's an error 00262 00263 connection.sendErrorResponse("unknown resource " + header.resource); 00264 } 00265 00266 ExchangeConnector::ExchangeCompatibility 00267 HttpExchangeConnector:: 00268 getCampaignCompatibility(const AgentConfig & config, 00269 bool includeReasons) const 00270 { 00271 return ExchangeConnector 00272 ::getCampaignCompatibility(config, includeReasons); 00273 } 00274 00275 ExchangeConnector::ExchangeCompatibility 00276 HttpExchangeConnector:: 00277 getCreativeCompatibility(const Creative & creative, 00278 bool includeReasons) const 00279 { 00280 return ExchangeConnector 00281 ::getCreativeCompatibility(creative, includeReasons); 00282 } 00283 00284 } // namespace RTBKIT