![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* json_service_endpoint.h -*- C++ -*- 00002 Jeremy Barnes, 9 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #ifndef __service__zmq_json_endpoint_h__ 00008 #define __service__zmq_json_endpoint_h__ 00009 00010 #include "zmq_endpoint.h" 00011 #include "jml/utils/vector_utils.h" 00012 #include "http_named_endpoint.h" 00013 #include "city.h" 00014 00015 00016 namespace Datacratic { 00017 00018 00019 /*****************************************************************************/ 00020 /* REST REQUEST */ 00021 /*****************************************************************************/ 00022 00023 struct RestRequest { 00024 RestRequest() 00025 { 00026 } 00027 00028 RestRequest(const HttpHeader & header, 00029 const std::string & payload) 00030 : verb(header.verb), 00031 resource(header.resource), 00032 params(header.queryParams), 00033 payload(payload) 00034 { 00035 } 00036 00037 RestRequest(const std::string & verb, 00038 const std::string & resource, 00039 const RestParams & params, 00040 const std::string & payload) 00041 : verb(verb), resource(resource), params(params), payload(payload) 00042 { 00043 } 00044 00045 std::string verb; 00046 std::string resource; 00047 RestParams params; 00048 std::string payload; 00049 }; 00050 00051 std::ostream & operator << (std::ostream & stream, const RestRequest & request); 00052 00053 00054 /*****************************************************************************/ 00055 /* REST SERVICE ENDPOINT */ 00056 /*****************************************************************************/ 00057 00068 struct RestServiceEndpoint: public MessageLoop { 00069 00074 RestServiceEndpoint(std::shared_ptr<zmq::context_t> context) 00075 : zmqEndpoint(context) 00076 { 00077 } 00078 00079 virtual ~RestServiceEndpoint() 00080 { 00081 shutdown(); 00082 } 00083 00084 void shutdown() 00085 { 00086 // 1. Shut down the http endpoint, since it needs our threads to 00087 // complete its shutdown 00088 httpEndpoint.shutdown(); 00089 00090 // 2. Shut down the message loop 00091 MessageLoop::shutdown(); 00092 00093 // 3. Shut down the zmq endpoint now we know that the message loop is not using 00094 // it. 00095 zmqEndpoint.shutdown(); 00096 } 00097 00102 struct ConnectionId { 00104 ConnectionId(const std::string & zmqAddress, 00105 const std::string & requestId, 00106 RestServiceEndpoint * endpoint) 00107 : itl(new Itl(zmqAddress, requestId, endpoint)) 00108 { 00109 } 00110 00112 ConnectionId(HttpNamedEndpoint::RestConnectionHandler * http, 00113 const std::string & requestId, 00114 RestServiceEndpoint * endpoint) 00115 : itl(new Itl(http, requestId, endpoint)) 00116 { 00117 } 00118 00119 struct Itl { 00120 Itl(HttpNamedEndpoint::RestConnectionHandler * http, 00121 const std::string & requestId, 00122 RestServiceEndpoint * endpoint) 00123 : requestId(requestId), 00124 http(http), 00125 endpoint(endpoint), 00126 responseSent(false), 00127 startDate(Date::now()) 00128 { 00129 } 00130 00131 Itl(const std::string & zmqAddress, 00132 const std::string & requestId, 00133 RestServiceEndpoint * endpoint) 00134 : zmqAddress(zmqAddress), 00135 requestId(requestId), 00136 http(0), 00137 endpoint(endpoint), 00138 responseSent(false), 00139 startDate(Date::now()) 00140 { 00141 } 00142 00143 ~Itl() 00144 { 00145 if (!responseSent) 00146 throw ML::Exception("no response sent on connection"); 00147 } 00148 00149 std::string zmqAddress; 00150 std::string requestId; 00151 HttpNamedEndpoint::RestConnectionHandler * http; 00152 RestServiceEndpoint * endpoint; 00153 bool responseSent; 00154 Date startDate; 00155 }; 00156 00157 std::shared_ptr<Itl> itl; 00158 00159 void sendResponse(int responseCode, 00160 const char * response, 00161 const std::string & contentType) const 00162 { 00163 return sendResponse(responseCode, std::string(response), 00164 contentType); 00165 } 00166 00168 void sendResponse(int responseCode, 00169 const std::string & response, 00170 const std::string & contentType) const 00171 { 00172 if (itl->responseSent) 00173 throw ML::Exception("response already sent"); 00174 00175 if (itl->endpoint->logResponse) 00176 itl->endpoint->logResponse(*this, responseCode, response, 00177 contentType); 00178 00179 if (itl->http) 00180 itl->http->sendResponse(responseCode, response, contentType); 00181 else { 00182 std::vector<std::string> message; 00183 message.push_back(itl->zmqAddress); 00184 message.push_back(itl->requestId); 00185 message.push_back(std::to_string(responseCode)); 00186 message.push_back(response); 00187 00188 //std::cerr << "sending response to " << itl->requestId 00189 // << std::endl; 00190 itl->endpoint->zmqEndpoint.sendMessage(message); 00191 } 00192 00193 itl->responseSent = true; 00194 } 00195 00197 void sendResponse(int responseCode, 00198 const Json::Value & response, 00199 const std::string & contentType = "application/json") const 00200 { 00201 using namespace std; 00202 //cerr << "sent response " << responseCode << " " << response 00203 // << endl; 00204 00205 if (itl->responseSent) 00206 throw ML::Exception("response already sent"); 00207 00208 if (itl->endpoint->logResponse) 00209 itl->endpoint->logResponse(*this, responseCode, response.toString(), 00210 contentType); 00211 00212 if (itl->http) 00213 itl->http->sendResponse(responseCode, response, contentType); 00214 else { 00215 std::vector<std::string> message; 00216 message.push_back(itl->zmqAddress); 00217 message.push_back(itl->requestId); 00218 message.push_back(std::to_string(responseCode)); 00219 message.push_back(response.toString()); 00220 itl->endpoint->zmqEndpoint.sendMessage(message); 00221 } 00222 00223 itl->responseSent = true; 00224 } 00225 00226 void sendResponse(int responseCode) const 00227 { 00228 return sendResponse(responseCode, "", ""); 00229 } 00230 00232 void sendErrorResponse(int responseCode, 00233 const std::string & error, 00234 const std::string & contentType) const 00235 { 00236 using namespace std; 00237 cerr << "sent error response " << responseCode << " " << error 00238 << endl; 00239 00240 if (itl->responseSent) 00241 throw ML::Exception("response already sent"); 00242 00243 00244 if (itl->endpoint->logResponse) 00245 itl->endpoint->logResponse(*this, responseCode, error, 00246 contentType); 00247 00248 if (itl->http) 00249 itl->http->sendResponse(responseCode, error); 00250 else { 00251 std::vector<std::string> message; 00252 message.push_back(itl->zmqAddress); 00253 message.push_back(itl->requestId); 00254 message.push_back(std::to_string(responseCode)); 00255 message.push_back(error); 00256 itl->endpoint->zmqEndpoint.sendMessage(message); 00257 } 00258 00259 itl->responseSent = true; 00260 } 00261 00262 void sendErrorResponse(int responseCode, const char * error, 00263 const std::string & contentType) const 00264 { 00265 sendErrorResponse(responseCode, std::string(error), "application/json"); 00266 } 00267 00268 void sendErrorResponse(int responseCode, const Json::Value & error) const 00269 { 00270 using namespace std; 00271 cerr << "sent error response " << responseCode << " " << error 00272 << endl; 00273 00274 if (itl->responseSent) 00275 throw ML::Exception("response already sent"); 00276 00277 if (itl->endpoint->logResponse) 00278 itl->endpoint->logResponse(*this, responseCode, error.toString(), 00279 "application/json"); 00280 00281 if (itl->http) 00282 itl->http->sendResponse(responseCode, error); 00283 else { 00284 std::vector<std::string> message; 00285 message.push_back(itl->zmqAddress); 00286 message.push_back(itl->requestId); 00287 message.push_back(std::to_string(responseCode)); 00288 message.push_back(error.toString()); 00289 itl->endpoint->zmqEndpoint.sendMessage(message); 00290 } 00291 00292 itl->responseSent = true; 00293 } 00294 }; 00295 00296 void init(std::shared_ptr<ConfigurationService> config, 00297 const std::string & endpointName, 00298 double maxAddedLatency = 0.005, 00299 int numThreads = 1) 00300 { 00301 MessageLoop::init(numThreads, maxAddedLatency); 00302 zmqEndpoint.init(config, ZMQ_XREP, endpointName + "/zeromq"); 00303 httpEndpoint.init(config, endpointName + "/http"); 00304 00305 auto zmqHandler = [=] (std::vector<std::string> && message) 00306 { 00307 using namespace std; 00308 00309 if (message.size() < 6) { 00310 cerr << "ignored message with invalid number of members:" 00311 << message.size() 00312 << endl; 00313 return; 00314 } 00315 //cerr << "got REST message at " << this << " " << message << endl; 00316 this->doHandleRequest(ConnectionId(message.at(0), 00317 message.at(1), 00318 this), 00319 RestRequest(message.at(2), 00320 message.at(3), 00321 RestParams::fromBinary(message.at(4)), 00322 message.at(5))); 00323 }; 00324 00325 zmqEndpoint.messageHandler = zmqHandler; 00326 00327 httpEndpoint.onRequest 00328 = [=] (HttpNamedEndpoint::RestConnectionHandler * connection, 00329 const HttpHeader & header, 00330 const std::string & payload) 00331 { 00332 std::string requestId = this->getHttpRequestId(); 00333 this->doHandleRequest(ConnectionId(connection, requestId, this), 00334 RestRequest(header, payload)); 00335 }; 00336 00337 addSource("RestServiceEndpoint::zmqEndpoint", zmqEndpoint); 00338 addSource("RestServiceEndpoint::httpEndpoint", httpEndpoint); 00339 00340 } 00341 00345 std::pair<std::string, std::string> 00346 bindTcp(PortRange const & zmqRange = PortRange(), PortRange const & httpRange = PortRange(), std::string host = "") 00347 { 00348 std::string httpAddr = httpEndpoint.bindTcp(httpRange, host); 00349 std::string zmqAddr = zmqEndpoint.bindTcp(zmqRange, host); 00350 return std::make_pair(zmqAddr, httpAddr); 00351 } 00352 00358 std::string bindFixedHttpAddress(std::string host, int port) 00359 { 00360 return httpEndpoint.bindTcpFixed(host, port); 00361 } 00362 00363 std::string bindFixedHttpAddress(std::string address) 00364 { 00365 return httpEndpoint.bindTcpAddress(address); 00366 } 00367 00369 typedef std::function<void (ConnectionId connection, 00370 RestRequest request)> OnHandleRequest; 00371 00372 OnHandleRequest onHandleRequest; 00373 00377 virtual void handleRequest(const ConnectionId & connection, 00378 const RestRequest & request) const 00379 { 00380 using namespace std; 00381 00382 //cerr << "got request " << request << endl; 00383 if (onHandleRequest) { 00384 onHandleRequest(connection, request); 00385 } 00386 else { 00387 throw ML::Exception("need to override handleRequest or assign to " 00388 "onHandleRequest"); 00389 } 00390 } 00391 00392 ZmqNamedEndpoint zmqEndpoint; 00393 HttpNamedEndpoint httpEndpoint; 00394 00395 std::function<void (const ConnectionId & conn, const RestRequest & req) > logRequest; 00396 std::function<void (const ConnectionId & conn, 00397 int code, 00398 const std::string & resp, 00399 const std::string & contentType) > logResponse; 00400 00401 void doHandleRequest(const ConnectionId & connection, 00402 const RestRequest & request) 00403 { 00404 if (logRequest) 00405 logRequest(connection, request); 00406 00407 handleRequest(connection, request); 00408 } 00409 00410 // Create a random request ID for an HTTP request 00411 std::string getHttpRequestId() const 00412 { 00413 std::string s = Date::now().print(9) + ML::format("%d", random()); 00414 uint64_t jobId = CityHash64(s.c_str(), s.size()); 00415 return ML::format("%016llx", jobId); 00416 } 00417 00418 }; 00419 00420 } // namespace Datacratic 00421 00422 #endif /* __service__zmq_json_endpoint_h__ */
1.7.6.1