RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
plugins/exchange/http_exchange_connector.cc
00001 /* http_exchange_connector.cc
00002    Jeremy Barnes, 31 January 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Auction endpoint class.
00006 */
00007 
00008 #include "http_exchange_connector.h"
00009 #include "http_auction_handler.h"
00010 #include "jml/arch/exception.h"
00011 #include "jml/arch/format.h"
00012 #include "jml/arch/backtrace.h"
00013 #include "jml/utils/guard.h"
00014 #include "jml/utils/set_utils.h"
00015 #include "jml/utils/vector_utils.h"
00016 #include "jml/arch/timers.h"
00017 #include "rtbkit/core/router/router.h"
00018 #include <set>
00019 
00020 #include <boost/foreach.hpp>
00021 
00022 
00023 using namespace std;
00024 using namespace ML;
00025 
00026 
00027 namespace RTBKIT {
00028 
00029 
00030 /*****************************************************************************/
00031 /* HTTP EXCHANGE CONNECTOR                                                   */
00032 /*****************************************************************************/
00033 
00034 HttpExchangeConnector::
00035 HttpExchangeConnector(const std::string & name,
00036                       ServiceBase & parent)
00037     : ExchangeConnector(name, parent),
00038       HttpEndpoint(name)
00039 {
00040     postConstructorInit();
00041 }
00042 
00043 HttpExchangeConnector::
00044 HttpExchangeConnector(const std::string & name,
00045                       std::shared_ptr<ServiceProxies> proxies)
00046     : ExchangeConnector(name, proxies),
00047       HttpEndpoint(name)
00048 {
00049     postConstructorInit();
00050 }
00051 
00052 void
00053 HttpExchangeConnector::
00054 postConstructorInit()
00055 {
00056     numThreads = 8;
00057     listenPort = 10001;
00058     bindHost = "*";
00059     performNameLookup = true;
00060     backlog = DEF_BACKLOG;
00061     pingTimeUnknownHostsMs = 20;
00062 
00063     numServingRequest_ = 0;
00064     acceptAuctionProbability = 1.0;
00065 
00066     // Link up events
00067     onTransportOpen = [=] (TransportBase *)
00068         {
00069             this->recordHit("auctionNewConnection");
00070         };
00071 
00072     onTransportClose = [=] (TransportBase *)
00073         {
00074             this->recordHit("auctionClosedConnection");
00075         };
00076 
00077     handlerFactory = [=] () { return new HttpAuctionHandler(); };
00078 }
00079 
00080 HttpExchangeConnector::
00081 ~HttpExchangeConnector()
00082 {
00083     shutdown();
00084 }
00085 
00086 void
00087 HttpExchangeConnector::
00088 configure(const Json::Value & parameters)
00089 {
00090     getParam(parameters, numThreads, "numThreads");
00091     getParam(parameters, listenPort, "listenPort");
00092     getParam(parameters, bindHost, "bindHost");
00093     getParam(parameters, performNameLookup, "performNameLookup");
00094     getParam(parameters, backlog, "connectionBacklog");
00095     getParam(parameters, auctionResource, "auctionResource");
00096     getParam(parameters, auctionVerb, "auctionVerb");
00097     getParam(parameters, pingTimesByHostMs, "pingTimesByHostMs");
00098     getParam(parameters, pingTimeUnknownHostsMs, "pingTimeUnknownHostsMs");
00099 }
00100 
00101 void
00102 HttpExchangeConnector::
00103 configureHttp(int numThreads,
00104               const PortRange & listenPort,
00105               const std::string & bindHost,
00106               bool performNameLookup,
00107               int backlog,
00108               const std::string & auctionResource,
00109               const std::string & auctionVerb)
00110 {
00111     this->numThreads = numThreads;
00112     this->listenPort = listenPort;
00113     this->bindHost = bindHost;
00114     this->performNameLookup = performNameLookup;
00115     this->backlog = backlog;
00116     this->auctionResource = auctionResource;
00117     this->auctionVerb = auctionVerb;
00118 }
00119 
00120 void
00121 HttpExchangeConnector::
00122 start()
00123 {
00124     PassiveEndpoint::init(listenPort, bindHost, numThreads, true,
00125                           performNameLookup, backlog);
00126 }
00127 
00128 void
00129 HttpExchangeConnector::
00130 shutdown()
00131 {
00132     HttpEndpoint::shutdown();
00133     ExchangeConnector::shutdown();
00134 }
00135 
00136 std::shared_ptr<ConnectionHandler>
00137 HttpExchangeConnector::
00138 makeNewHandler()
00139 {
00140     return makeNewHandlerShared();
00141 }
00142 
00143 std::shared_ptr<HttpAuctionHandler>
00144 HttpExchangeConnector::
00145 makeNewHandlerShared()
00146 {
00147     if (!handlerFactory)
00148         throw ML::Exception("need to initialize handler factory");
00149 
00150     HttpAuctionHandler * handler = handlerFactory();
00151     std::shared_ptr<HttpAuctionHandler> handlerSp(handler);
00152     {
00153         Guard guard(handlersLock);
00154         handlers.insert(handlerSp);
00155     }
00156 
00157     return handlerSp;
00158 }
00159 
00160 void
00161 HttpExchangeConnector::
00162 finishedWithHandler(std::shared_ptr<HttpAuctionHandler> handler)
00163 {
00164     Guard guard(handlersLock);
00165     handlers.erase(handler);
00166 }
00167 
00168 Json::Value
00169 HttpExchangeConnector::
00170 getServiceStatus() const
00171 {
00172     Json::Value result;
00173 
00174     result["numConnections"] = numConnections();
00175     result["activeConnections"] = numServingRequest();
00176     result["connectionLoadFactor"]
00177         = xdiv<float>(numServingRequest(),
00178                       numConnections());
00179     
00180     map<string, int> peerCounts = numConnectionsByHost();
00181     
00182     BOOST_FOREACH(auto cnt, peerCounts)
00183         result["hostConnections"][cnt.first] = cnt.second;
00184 
00185     return result;
00186 }
00187 
00188 std::shared_ptr<BidRequest>
00189 HttpExchangeConnector::
00190 parseBidRequest(HttpAuctionHandler & connection,
00191                 const HttpHeader & header,
00192                 const std::string & payload)
00193 {
00194     throw ML::Exception("need to override HttpExchangeConnector::parseBidRequest");
00195 }
00196 
00197 double
00198 HttpExchangeConnector::
00199 getTimeAvailableMs(HttpAuctionHandler & connection,
00200                    const HttpHeader & header,
00201                    const std::string & payload)
00202 {
00203     throw ML::Exception("need to override HttpExchangeConnector::getTimeAvailableMs");
00204 }
00205 
00206 double
00207 HttpExchangeConnector::
00208 getRoundTripTimeMs(HttpAuctionHandler & connection,
00209                    const HttpHeader & header)
00210 {
00211     string peerName = connection.transport().getPeerName();
00212     
00213     auto it = pingTimesByHostMs.find(peerName);
00214     if (it == pingTimesByHostMs.end())
00215         return pingTimeUnknownHostsMs;
00216     return it->second;
00217 }
00218 
00219 HttpResponse
00220 HttpExchangeConnector::
00221 getResponse(const HttpAuctionHandler & connection,
00222             const HttpHeader & requestHeader,
00223             const Auction & auction) const
00224 {
00225     throw ML::Exception("need to override HttpExchangeConnector::getResponse");
00226 }
00227 
00228 HttpResponse
00229 HttpExchangeConnector::
00230 getDroppedAuctionResponse(const HttpAuctionHandler & connection,
00231                           const Auction & auction,
00232                           const std::string & reason) const
00233 {
00234     // Default for when dropped auction == no bid
00235     return getResponse(connection, connection.header, auction);
00236 }
00237 
00238 HttpResponse
00239 HttpExchangeConnector::
00240 getErrorResponse(const HttpAuctionHandler & connection,
00241                  const Auction & auction,
00242                  const std::string & errorMessage) const
00243 {
00244     // Default for when error == no bid
00245     return getResponse(connection, connection.header, auction);
00246 }
00247 
00248 void
00249 HttpExchangeConnector::
00250 handleUnknownRequest(HttpAuctionHandler & connection,
00251                      const HttpHeader & header,
00252                      const std::string & payload) const
00253 {
00254     // Deal with the "/ready" request
00255     
00256     if (header.resource == "/ready") {
00257         connection.putResponseOnWire(HttpResponse(200, "text/plain", "1"));
00258         return;
00259     }
00260 
00261     // Otherwise, it's an error
00262 
00263     connection.sendErrorResponse("unknown resource " + header.resource);
00264 }
00265 
00266 ExchangeConnector::ExchangeCompatibility
00267 HttpExchangeConnector::
00268 getCampaignCompatibility(const AgentConfig & config,
00269                          bool includeReasons) const
00270 {
00271     return ExchangeConnector
00272         ::getCampaignCompatibility(config, includeReasons);
00273 }
00274 
00275 ExchangeConnector::ExchangeCompatibility
00276 HttpExchangeConnector::
00277 getCreativeCompatibility(const Creative & creative,
00278                          bool includeReasons) const
00279 {
00280     return ExchangeConnector
00281         ::getCreativeCompatibility(creative, includeReasons);
00282 }
00283 
00284 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator