RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/http_named_endpoint.h
00001 /* http_named_endpoint.h                                           -*- C++ -*-
00002    Jeremy Barnes, 9 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 */
00005 
00006 #pragma once
00007 
00008 #include "soa/service/http_endpoint.h"
00009 #include "jml/utils/vector_utils.h"
00010 #include "named_endpoint.h"
00011 #include "http_rest_proxy.h"
00012 #include <boost/make_shared.hpp>
00013 
00014 
00015 namespace Datacratic {
00016 
00017 
00018 /*****************************************************************************/
00019 /* HTTP NAMED ENDPOINT                                                       */
00020 /*****************************************************************************/
00021 
00024 struct HttpNamedEndpoint : public NamedEndpoint, public HttpEndpoint {
00025 
00026     HttpNamedEndpoint()
00027         : HttpEndpoint("HttpNamedEndpoint")
00028     {
00029     }
00030 
00031     void init(std::shared_ptr<ConfigurationService> config,
00032               const std::string & endpointName)
00033     {
00034         NamedEndpoint::init(config, endpointName);
00035     }
00036 
00044     std::string
00045     bindTcpAddress(const std::string & address)
00046     {
00047         using namespace std;
00048         auto pos = address.find(':');
00049         if (pos == string::npos) {
00050             // No port specification; take any port
00051             return bindTcp(PortRange(12000, 12999), address);
00052         }
00053         string hostPart(address, 0, pos);
00054         string portPart(address, pos + 1);
00055 
00056         if (portPart.empty())
00057             throw ML::Exception("invalid port " + portPart + " in address "
00058                                 + address);
00059 
00060         if (portPart[portPart.size() - 1] == '+') {
00061             unsigned port = boost::lexical_cast<unsigned>(string(portPart, 0, portPart.size() - 1));
00062             if(port < 65536) {
00063                 unsigned last = port + 999;
00064                 return bindTcp(PortRange(port, last), hostPart);
00065             }
00066 
00067             throw ML::Exception("invalid port " + port);
00068         }
00069 
00070         return bindTcp(boost::lexical_cast<int>(portPart), hostPart);
00071     }
00072 
00078     std::string
00079     bindTcpFixed(std::string host, int port)
00080     {
00081         return bindTcp(port, host);
00082     }
00083 
00089     std::string
00090     bindTcp(PortRange const & portRange, std::string host = "")
00091     {
00092         using namespace std;
00093 
00094         // TODO: generalize this...
00095         if (host == "" || host == "*")
00096             host = "0.0.0.0";
00097 
00098         // TODO: really scan ports
00099         int port = HttpEndpoint::listen(portRange, host, false /* name lookup */);
00100 
00101         cerr << "bound tcp for http port " << port << endl;
00102 
00103         auto getUri = [&] (const std::string & host)
00104             {
00105                 return "http://" + host + ":" + to_string(port);
00106             };
00107 
00108         Json::Value config;
00109 
00110         auto addEntry = [&] (const std::string & addr,
00111                              const std::string & hostScope,
00112                              const std::string & uri)
00113             {
00114                 Json::Value & entry = config[config.size()];
00115                 entry["httpUri"] = uri;
00116 
00117                 Json::Value & transports = entry["transports"];
00118                 transports[0]["name"] = "tcp";
00119                 transports[0]["addr"] = addr;
00120                 transports[0]["hostScope"] = hostScope;
00121                 transports[0]["port"] = port;
00122                 transports[1]["name"] = "http";
00123                 transports[1]["uri"] = uri;
00124             };
00125 
00126         if (host == "0.0.0.0") {
00127             auto interfaces = getInterfaces({AF_INET});
00128             for (unsigned i = 0;  i < interfaces.size();  ++i) {
00129                 addEntry(interfaces[i].addr,
00130                          interfaces[i].hostScope,
00131                          getUri(interfaces[i].addr));
00132             }
00133             publishAddress("tcp", config);
00134             return getUri(host);
00135         }
00136         else {
00137             string host2 = addrToIp(host);
00138             string uri = getUri(host2);
00139             // TODO: compute host scope
00140             addEntry(host2, "*", uri);
00141             publishAddress("tcp", config);
00142             return uri;
00143         }
00144     }
00145 
00146     struct RestConnectionHandler: public HttpConnectionHandler {
00147         RestConnectionHandler(HttpNamedEndpoint * endpoint)
00148             : endpoint(endpoint)
00149         {
00150         }
00151 
00152         HttpNamedEndpoint * endpoint;
00153 
00154         virtual void
00155         handleHttpPayload(const HttpHeader & header,
00156                           const std::string & payload)
00157         {
00158             try {
00159                 endpoint->onRequest(this, header, payload);
00160             }
00161             catch(const std::exception& ex) {
00162                 Json::Value response;
00163                 response["error"] =
00164                     "exception processing request "
00165                     + header.verb + " " + header.resource;
00166 
00167                 response["exception"] = ex.what();
00168                 sendErrorResponse(400, response);
00169             }
00170             catch(...) {
00171                 Json::Value response;
00172                 response["error"] =
00173                     "exception processing request "
00174                     + header.verb + " " + header.resource;
00175 
00176                 sendErrorResponse(400, response);
00177             }
00178         }
00179 
00180         void sendErrorResponse(int code, const std::string & error)
00181         {
00182             Json::Value val;
00183             val["error"] = error;
00184             sendErrorResponse(code, val);
00185         }
00186 
00187         void sendErrorResponse(int code, const Json::Value & error)
00188         {
00189             std::string encodedError = error.toString();
00190             send(ML::format("HTTP/1.1 %d Pants are on fire\r\n"
00191                             "Content-Type: application/json\r\n"
00192                             "Access-Control-Allow-Origin: *\r\n"
00193                             "Content-Length: %zd\r\n"
00194                             "\r\n"
00195                             "%s",
00196                             code,
00197                             encodedError.length(),
00198                             encodedError.c_str()),
00199                  NEXT_CLOSE);
00200         }
00201 
00202         void sendResponse(int code,
00203                           const Json::Value & response,
00204                           const std::string & contentType = "application/json")
00205         {
00206             std::string body = response.toStyledString();
00207             return sendResponse(code, body, contentType);
00208         }
00209 
00210         void sendResponse(int code,
00211                           const std::string & body,
00212                           const std::string & contentType)
00213         {
00214             auto onSendFinished = [=] {
00215                 this->transport().associateWhenHandlerFinished
00216                 (std::make_shared<RestConnectionHandler>(endpoint),
00217                  "sendResponse");
00218             };
00219 
00220             send(ML::format("HTTP/1.1 %d %s\r\n"
00221                             "Content-Type: %s\r\n"
00222                             "Access-Control-Allow-Origin: *\r\n"
00223                             "Content-Length: %zd\r\n"
00224                             "Connection: Keep-Alive\r\n"
00225                             "\r\n"
00226                             "%s",
00227                             code,
00228                             getResponseReasonPhrase(code).c_str(),
00229                             contentType.c_str(),
00230                             body.length(),
00231                             body.c_str()),
00232                  NEXT_CONTINUE,
00233                  onSendFinished);
00234         }
00235 
00236     };
00237 
00238     typedef std::function<void (RestConnectionHandler * connection,
00239                                 const HttpHeader & header,
00240                                 const std::string & payload)> OnRequest;
00241 
00242     OnRequest onRequest;
00243 
00244 
00245     virtual std::shared_ptr<ConnectionHandler>
00246     makeNewHandler()
00247     {
00248         return std::make_shared<RestConnectionHandler>(this);
00249     }
00250 };
00251 
00252 
00253 /*****************************************************************************/
00254 /* HTTP NAMED REST PROXY                                                     */
00255 /*****************************************************************************/
00256 
00259 struct HttpNamedRestProxy: public HttpRestProxy {
00260 
00261     HttpNamedRestProxy()
00262     {
00263     }
00264 
00265     void init(std::shared_ptr<ConfigurationService> config)
00266     {
00267         this->config = config;
00268     }
00269 
00270     bool connectToServiceClass(const std::string & serviceClass,
00271                                const std::string & endpointName)
00272     {
00273         this->serviceClass = serviceClass;
00274         this->endpointName = endpointName;
00275 
00276         std::vector<std::string> children
00277             = config->getChildren("serviceClass/" + serviceClass);
00278 
00279         for (auto c : children) {
00280             std::string key = "serviceClass/" + serviceClass + "/" + c;
00281             //cerr << "getting " << key << endl;
00282             Json::Value value = config->getJson(key);
00283             std::string name = value["serviceName"].asString();
00284             std::string path = value["servicePath"].asString();
00285 
00286             //cerr << "name = " << name << " path = " << path << endl;
00287             if (connect(path + "/" + endpointName))
00288                 break;
00289         }
00290 
00291         return connected;
00292     }
00293 
00294     bool connect(const std::string & endpointName)
00295     {
00296         using namespace std;
00297 
00298         auto onChange = std::bind(&HttpNamedRestProxy::onConfigChange, this,
00299                                   std::placeholders::_1,
00300                                   std::placeholders::_2,
00301                                   std::placeholders::_3);
00302 
00303         connected = false;
00304 
00305         // 2.  Iterate over all of the connection possibilities until we
00306         //     find one that works.
00307         auto onConnection = [&] (const std::string & key,
00308                                  const Json::Value & epConfig) -> bool
00309             {
00310                 if (connected)
00311                     return false;
00312                 //cerr << "epConfig for " << key << " is " << epConfig
00313                 //<< endl;
00314                 
00315                 for (auto & entry: epConfig) {
00316 
00317                     //cerr << "entry is " << entry << endl;
00318 
00319                     if (!entry.isMember("httpUri"))
00320                         return true;
00321 
00322                     string uri = entry["httpUri"].asString();
00323 
00324                     cerr << "uri = " << uri << endl;
00325 
00326                     auto hs = entry["transports"][0]["hostScope"];
00327                     if (!hs)
00328                         continue;
00329 
00330                     // TODO: allow localhost connections on localhost
00331                     string hostScope = hs.asString();
00332                     if (hs != "*") {
00333                         utsname name;
00334                         if (uname(&name))
00335                             throw ML::Exception(errno, "uname");
00336                         if (hostScope != name.nodename)
00337                             continue;  // wrong host scope
00338                     }
00339 
00340                     serviceUri = uri;
00341 
00342                     cerr << "connected to " << uri << endl;
00343                     connected = true;
00344 
00345                     // Continue the connection in the onConfigChange function
00346                     onConfigChange(ConfigurationService::VALUE_CHANGED,
00347                                    key,
00348                                    epConfig);
00349                     return false;
00350                 }
00351 
00352                 return false;
00353             };
00354 
00355         config->forEachEntry(onConnection, endpointName);
00356         return connected;
00357     }
00358 
00360     bool onConfigChange(ConfigurationService::ChangeType change,
00361                         const std::string & key,
00362                         const Json::Value & newValue)
00363     {
00364         using namespace std;
00365 
00366         cerr << "config for " << key << " has changed" << endl;
00367 
00368 #if 0
00369         // 3.  Find an appropriate entry to connect to
00370         for (auto & entries: newValue) {
00371             // TODO: connect
00372             cerr << "got entries " << entries << endl;
00373         }
00374 #endif
00375 
00376         return true;
00377     }
00378 
00379 
00380 private:
00381     std::shared_ptr<ConfigurationService> config;
00382 
00383     bool connected;
00384     std::string serviceClass;
00385     std::string endpointName;
00386 };
00387 
00388 } // namespace Datacratic
00389 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator