RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* http_named_endpoint.h -*- C++ -*- 00002 Jeremy Barnes, 9 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 */ 00005 00006 #pragma once 00007 00008 #include "soa/service/http_endpoint.h" 00009 #include "jml/utils/vector_utils.h" 00010 #include "named_endpoint.h" 00011 #include "http_rest_proxy.h" 00012 #include <boost/make_shared.hpp> 00013 00014 00015 namespace Datacratic { 00016 00017 00018 /*****************************************************************************/ 00019 /* HTTP NAMED ENDPOINT */ 00020 /*****************************************************************************/ 00021 00024 struct HttpNamedEndpoint : public NamedEndpoint, public HttpEndpoint { 00025 00026 HttpNamedEndpoint() 00027 : HttpEndpoint("HttpNamedEndpoint") 00028 { 00029 } 00030 00031 void init(std::shared_ptr<ConfigurationService> config, 00032 const std::string & endpointName) 00033 { 00034 NamedEndpoint::init(config, endpointName); 00035 } 00036 00044 std::string 00045 bindTcpAddress(const std::string & address) 00046 { 00047 using namespace std; 00048 auto pos = address.find(':'); 00049 if (pos == string::npos) { 00050 // No port specification; take any port 00051 return bindTcp(PortRange(12000, 12999), address); 00052 } 00053 string hostPart(address, 0, pos); 00054 string portPart(address, pos + 1); 00055 00056 if (portPart.empty()) 00057 throw ML::Exception("invalid port " + portPart + " in address " 00058 + address); 00059 00060 if (portPart[portPart.size() - 1] == '+') { 00061 unsigned port = boost::lexical_cast<unsigned>(string(portPart, 0, portPart.size() - 1)); 00062 if(port < 65536) { 00063 unsigned last = port + 999; 00064 return bindTcp(PortRange(port, last), hostPart); 00065 } 00066 00067 throw ML::Exception("invalid port " + port); 00068 } 00069 00070 return bindTcp(boost::lexical_cast<int>(portPart), hostPart); 00071 } 00072 00078 std::string 00079 bindTcpFixed(std::string host, int port) 00080 { 00081 return bindTcp(port, host); 00082 } 00083 00089 std::string 00090 bindTcp(PortRange const & portRange, std::string host = "") 00091 { 00092 using namespace std; 00093 00094 // TODO: generalize this... 00095 if (host == "" || host == "*") 00096 host = "0.0.0.0"; 00097 00098 // TODO: really scan ports 00099 int port = HttpEndpoint::listen(portRange, host, false /* name lookup */); 00100 00101 cerr << "bound tcp for http port " << port << endl; 00102 00103 auto getUri = [&] (const std::string & host) 00104 { 00105 return "http://" + host + ":" + to_string(port); 00106 }; 00107 00108 Json::Value config; 00109 00110 auto addEntry = [&] (const std::string & addr, 00111 const std::string & hostScope, 00112 const std::string & uri) 00113 { 00114 Json::Value & entry = config[config.size()]; 00115 entry["httpUri"] = uri; 00116 00117 Json::Value & transports = entry["transports"]; 00118 transports[0]["name"] = "tcp"; 00119 transports[0]["addr"] = addr; 00120 transports[0]["hostScope"] = hostScope; 00121 transports[0]["port"] = port; 00122 transports[1]["name"] = "http"; 00123 transports[1]["uri"] = uri; 00124 }; 00125 00126 if (host == "0.0.0.0") { 00127 auto interfaces = getInterfaces({AF_INET}); 00128 for (unsigned i = 0; i < interfaces.size(); ++i) { 00129 addEntry(interfaces[i].addr, 00130 interfaces[i].hostScope, 00131 getUri(interfaces[i].addr)); 00132 } 00133 publishAddress("tcp", config); 00134 return getUri(host); 00135 } 00136 else { 00137 string host2 = addrToIp(host); 00138 string uri = getUri(host2); 00139 // TODO: compute host scope 00140 addEntry(host2, "*", uri); 00141 publishAddress("tcp", config); 00142 return uri; 00143 } 00144 } 00145 00146 struct RestConnectionHandler: public HttpConnectionHandler { 00147 RestConnectionHandler(HttpNamedEndpoint * endpoint) 00148 : endpoint(endpoint) 00149 { 00150 } 00151 00152 HttpNamedEndpoint * endpoint; 00153 00154 virtual void 00155 handleHttpPayload(const HttpHeader & header, 00156 const std::string & payload) 00157 { 00158 try { 00159 endpoint->onRequest(this, header, payload); 00160 } 00161 catch(const std::exception& ex) { 00162 Json::Value response; 00163 response["error"] = 00164 "exception processing request " 00165 + header.verb + " " + header.resource; 00166 00167 response["exception"] = ex.what(); 00168 sendErrorResponse(400, response); 00169 } 00170 catch(...) { 00171 Json::Value response; 00172 response["error"] = 00173 "exception processing request " 00174 + header.verb + " " + header.resource; 00175 00176 sendErrorResponse(400, response); 00177 } 00178 } 00179 00180 void sendErrorResponse(int code, const std::string & error) 00181 { 00182 Json::Value val; 00183 val["error"] = error; 00184 sendErrorResponse(code, val); 00185 } 00186 00187 void sendErrorResponse(int code, const Json::Value & error) 00188 { 00189 std::string encodedError = error.toString(); 00190 send(ML::format("HTTP/1.1 %d Pants are on fire\r\n" 00191 "Content-Type: application/json\r\n" 00192 "Access-Control-Allow-Origin: *\r\n" 00193 "Content-Length: %zd\r\n" 00194 "\r\n" 00195 "%s", 00196 code, 00197 encodedError.length(), 00198 encodedError.c_str()), 00199 NEXT_CLOSE); 00200 } 00201 00202 void sendResponse(int code, 00203 const Json::Value & response, 00204 const std::string & contentType = "application/json") 00205 { 00206 std::string body = response.toStyledString(); 00207 return sendResponse(code, body, contentType); 00208 } 00209 00210 void sendResponse(int code, 00211 const std::string & body, 00212 const std::string & contentType) 00213 { 00214 auto onSendFinished = [=] { 00215 this->transport().associateWhenHandlerFinished 00216 (std::make_shared<RestConnectionHandler>(endpoint), 00217 "sendResponse"); 00218 }; 00219 00220 send(ML::format("HTTP/1.1 %d %s\r\n" 00221 "Content-Type: %s\r\n" 00222 "Access-Control-Allow-Origin: *\r\n" 00223 "Content-Length: %zd\r\n" 00224 "Connection: Keep-Alive\r\n" 00225 "\r\n" 00226 "%s", 00227 code, 00228 getResponseReasonPhrase(code).c_str(), 00229 contentType.c_str(), 00230 body.length(), 00231 body.c_str()), 00232 NEXT_CONTINUE, 00233 onSendFinished); 00234 } 00235 00236 }; 00237 00238 typedef std::function<void (RestConnectionHandler * connection, 00239 const HttpHeader & header, 00240 const std::string & payload)> OnRequest; 00241 00242 OnRequest onRequest; 00243 00244 00245 virtual std::shared_ptr<ConnectionHandler> 00246 makeNewHandler() 00247 { 00248 return std::make_shared<RestConnectionHandler>(this); 00249 } 00250 }; 00251 00252 00253 /*****************************************************************************/ 00254 /* HTTP NAMED REST PROXY */ 00255 /*****************************************************************************/ 00256 00259 struct HttpNamedRestProxy: public HttpRestProxy { 00260 00261 HttpNamedRestProxy() 00262 { 00263 } 00264 00265 void init(std::shared_ptr<ConfigurationService> config) 00266 { 00267 this->config = config; 00268 } 00269 00270 bool connectToServiceClass(const std::string & serviceClass, 00271 const std::string & endpointName) 00272 { 00273 this->serviceClass = serviceClass; 00274 this->endpointName = endpointName; 00275 00276 std::vector<std::string> children 00277 = config->getChildren("serviceClass/" + serviceClass); 00278 00279 for (auto c : children) { 00280 std::string key = "serviceClass/" + serviceClass + "/" + c; 00281 //cerr << "getting " << key << endl; 00282 Json::Value value = config->getJson(key); 00283 std::string name = value["serviceName"].asString(); 00284 std::string path = value["servicePath"].asString(); 00285 00286 //cerr << "name = " << name << " path = " << path << endl; 00287 if (connect(path + "/" + endpointName)) 00288 break; 00289 } 00290 00291 return connected; 00292 } 00293 00294 bool connect(const std::string & endpointName) 00295 { 00296 using namespace std; 00297 00298 auto onChange = std::bind(&HttpNamedRestProxy::onConfigChange, this, 00299 std::placeholders::_1, 00300 std::placeholders::_2, 00301 std::placeholders::_3); 00302 00303 connected = false; 00304 00305 // 2. Iterate over all of the connection possibilities until we 00306 // find one that works. 00307 auto onConnection = [&] (const std::string & key, 00308 const Json::Value & epConfig) -> bool 00309 { 00310 if (connected) 00311 return false; 00312 //cerr << "epConfig for " << key << " is " << epConfig 00313 //<< endl; 00314 00315 for (auto & entry: epConfig) { 00316 00317 //cerr << "entry is " << entry << endl; 00318 00319 if (!entry.isMember("httpUri")) 00320 return true; 00321 00322 string uri = entry["httpUri"].asString(); 00323 00324 cerr << "uri = " << uri << endl; 00325 00326 auto hs = entry["transports"][0]["hostScope"]; 00327 if (!hs) 00328 continue; 00329 00330 // TODO: allow localhost connections on localhost 00331 string hostScope = hs.asString(); 00332 if (hs != "*") { 00333 utsname name; 00334 if (uname(&name)) 00335 throw ML::Exception(errno, "uname"); 00336 if (hostScope != name.nodename) 00337 continue; // wrong host scope 00338 } 00339 00340 serviceUri = uri; 00341 00342 cerr << "connected to " << uri << endl; 00343 connected = true; 00344 00345 // Continue the connection in the onConfigChange function 00346 onConfigChange(ConfigurationService::VALUE_CHANGED, 00347 key, 00348 epConfig); 00349 return false; 00350 } 00351 00352 return false; 00353 }; 00354 00355 config->forEachEntry(onConnection, endpointName); 00356 return connected; 00357 } 00358 00360 bool onConfigChange(ConfigurationService::ChangeType change, 00361 const std::string & key, 00362 const Json::Value & newValue) 00363 { 00364 using namespace std; 00365 00366 cerr << "config for " << key << " has changed" << endl; 00367 00368 #if 0 00369 // 3. Find an appropriate entry to connect to 00370 for (auto & entries: newValue) { 00371 // TODO: connect 00372 cerr << "got entries " << entries << endl; 00373 } 00374 #endif 00375 00376 return true; 00377 } 00378 00379 00380 private: 00381 std::shared_ptr<ConfigurationService> config; 00382 00383 bool connected; 00384 std::string serviceClass; 00385 std::string endpointName; 00386 }; 00387 00388 } // namespace Datacratic 00389