RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/rest_service_endpoint.h
00001 /* json_service_endpoint.h                                         -*- C++ -*-
00002    Jeremy Barnes, 9 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #ifndef __service__zmq_json_endpoint_h__
00008 #define __service__zmq_json_endpoint_h__
00009 
00010 #include "zmq_endpoint.h"
00011 #include "jml/utils/vector_utils.h"
00012 #include "http_named_endpoint.h"
00013 #include "city.h"
00014 
00015 
00016 namespace Datacratic {
00017 
00018 
00019 /*****************************************************************************/
00020 /* REST REQUEST                                                              */
00021 /*****************************************************************************/
00022 
00023 struct RestRequest {
00024     RestRequest()
00025     {
00026     }
00027 
00028     RestRequest(const HttpHeader & header,
00029                 const std::string & payload)
00030         : verb(header.verb),
00031           resource(header.resource),
00032           params(header.queryParams),
00033           payload(payload)
00034     {
00035     }
00036 
00037     RestRequest(const std::string & verb,
00038                 const std::string & resource,
00039                 const RestParams & params,
00040                 const std::string & payload)
00041         : verb(verb), resource(resource), params(params), payload(payload)
00042     {
00043     }
00044 
00045     std::string verb;
00046     std::string resource;
00047     RestParams params;
00048     std::string payload;
00049 };
00050 
00051 std::ostream & operator << (std::ostream & stream, const RestRequest & request);
00052 
00053 
00054 /*****************************************************************************/
00055 /* REST SERVICE ENDPOINT                                                     */
00056 /*****************************************************************************/
00057 
00068 struct RestServiceEndpoint: public MessageLoop {
00069 
00074     RestServiceEndpoint(std::shared_ptr<zmq::context_t> context)
00075         : zmqEndpoint(context)
00076     {
00077     }
00078 
00079     virtual ~RestServiceEndpoint()
00080     {
00081         shutdown();
00082     }
00083 
00084     void shutdown()
00085     {
00086         // 1.  Shut down the http endpoint, since it needs our threads to
00087         //     complete its shutdown
00088         httpEndpoint.shutdown();
00089 
00090         // 2.  Shut down the message loop
00091         MessageLoop::shutdown();
00092 
00093         // 3.  Shut down the zmq endpoint now we know that the message loop is not using
00094         //     it.
00095         zmqEndpoint.shutdown();
00096     }
00097 
00102     struct ConnectionId {
00104         ConnectionId(const std::string & zmqAddress,
00105                      const std::string & requestId,
00106                      RestServiceEndpoint * endpoint)
00107             : itl(new Itl(zmqAddress, requestId, endpoint))
00108         {
00109         }
00110 
00112         ConnectionId(HttpNamedEndpoint::RestConnectionHandler * http,
00113                      const std::string & requestId,
00114                      RestServiceEndpoint * endpoint)
00115             : itl(new Itl(http, requestId, endpoint))
00116         {
00117         }
00118 
00119         struct Itl {
00120             Itl(HttpNamedEndpoint::RestConnectionHandler * http,
00121                 const std::string & requestId,
00122                 RestServiceEndpoint * endpoint)
00123                 : requestId(requestId),
00124                   http(http),
00125                   endpoint(endpoint),
00126                   responseSent(false),
00127                   startDate(Date::now())
00128             {
00129             }
00130 
00131             Itl(const std::string & zmqAddress,
00132                 const std::string & requestId,
00133                 RestServiceEndpoint * endpoint)
00134                 : zmqAddress(zmqAddress),
00135                   requestId(requestId),
00136                   http(0),
00137                   endpoint(endpoint),
00138                   responseSent(false),
00139                   startDate(Date::now())
00140             {
00141             }
00142 
00143             ~Itl()
00144             {
00145                 if (!responseSent)
00146                     throw ML::Exception("no response sent on connection");
00147             }
00148 
00149             std::string zmqAddress;
00150             std::string requestId;
00151             HttpNamedEndpoint::RestConnectionHandler * http;
00152             RestServiceEndpoint * endpoint;
00153             bool responseSent;
00154             Date startDate;
00155         };
00156 
00157         std::shared_ptr<Itl> itl;
00158 
00159         void sendResponse(int responseCode,
00160                           const char * response,
00161                           const std::string & contentType) const
00162         {
00163             return sendResponse(responseCode, std::string(response),
00164                                 contentType);
00165         }
00166 
00168         void sendResponse(int responseCode,
00169                           const std::string & response,
00170                           const std::string & contentType) const
00171         {
00172             if (itl->responseSent)
00173                 throw ML::Exception("response already sent");
00174 
00175             if (itl->endpoint->logResponse)
00176                 itl->endpoint->logResponse(*this, responseCode, response,
00177                                       contentType);
00178 
00179             if (itl->http)
00180                 itl->http->sendResponse(responseCode, response, contentType);
00181             else {
00182                 std::vector<std::string> message;
00183                 message.push_back(itl->zmqAddress);
00184                 message.push_back(itl->requestId);
00185                 message.push_back(std::to_string(responseCode));
00186                 message.push_back(response);
00187 
00188                 //std::cerr << "sending response to " << itl->requestId
00189                 //          << std::endl;
00190                 itl->endpoint->zmqEndpoint.sendMessage(message);
00191             }
00192 
00193             itl->responseSent = true;
00194         }
00195 
00197         void sendResponse(int responseCode,
00198                           const Json::Value & response,
00199                           const std::string & contentType = "application/json") const
00200         {
00201             using namespace std;
00202             //cerr << "sent response " << responseCode << " " << response
00203             //     << endl;
00204 
00205             if (itl->responseSent)
00206                 throw ML::Exception("response already sent");
00207 
00208             if (itl->endpoint->logResponse)
00209                 itl->endpoint->logResponse(*this, responseCode, response.toString(),
00210                                       contentType);
00211 
00212             if (itl->http)
00213                 itl->http->sendResponse(responseCode, response, contentType);
00214             else {
00215                 std::vector<std::string> message;
00216                 message.push_back(itl->zmqAddress);
00217                 message.push_back(itl->requestId);
00218                 message.push_back(std::to_string(responseCode));
00219                 message.push_back(response.toString());
00220                 itl->endpoint->zmqEndpoint.sendMessage(message);
00221             }
00222 
00223             itl->responseSent = true;
00224         }
00225 
00226         void sendResponse(int responseCode) const
00227         {
00228             return sendResponse(responseCode, "", "");
00229         }
00230 
00232         void sendErrorResponse(int responseCode,
00233                                const std::string & error,
00234                                const std::string & contentType) const
00235         {
00236             using namespace std;
00237             cerr << "sent error response " << responseCode << " " << error
00238                  << endl;
00239 
00240             if (itl->responseSent)
00241                 throw ML::Exception("response already sent");
00242 
00243 
00244             if (itl->endpoint->logResponse)
00245                 itl->endpoint->logResponse(*this, responseCode, error,
00246                                       contentType);
00247             
00248             if (itl->http)
00249                 itl->http->sendResponse(responseCode, error);
00250             else {
00251                 std::vector<std::string> message;
00252                 message.push_back(itl->zmqAddress);
00253                 message.push_back(itl->requestId);
00254                 message.push_back(std::to_string(responseCode));
00255                 message.push_back(error);
00256                 itl->endpoint->zmqEndpoint.sendMessage(message);
00257             }
00258 
00259             itl->responseSent = true;
00260         }
00261 
00262         void sendErrorResponse(int responseCode, const char * error,
00263                                const std::string & contentType) const
00264         {
00265             sendErrorResponse(responseCode, std::string(error), "application/json");
00266         }
00267 
00268         void sendErrorResponse(int responseCode, const Json::Value & error) const
00269         {
00270             using namespace std;
00271             cerr << "sent error response " << responseCode << " " << error
00272                  << endl;
00273 
00274             if (itl->responseSent)
00275                 throw ML::Exception("response already sent");
00276 
00277             if (itl->endpoint->logResponse)
00278                 itl->endpoint->logResponse(*this, responseCode, error.toString(),
00279                                            "application/json");
00280 
00281             if (itl->http)
00282                 itl->http->sendResponse(responseCode, error);
00283             else {
00284                 std::vector<std::string> message;
00285                 message.push_back(itl->zmqAddress);
00286                 message.push_back(itl->requestId);
00287                 message.push_back(std::to_string(responseCode));
00288                 message.push_back(error.toString());
00289                 itl->endpoint->zmqEndpoint.sendMessage(message);
00290             }
00291 
00292             itl->responseSent = true;
00293         }
00294     };
00295 
00296     void init(std::shared_ptr<ConfigurationService> config,
00297               const std::string & endpointName,
00298               double maxAddedLatency = 0.005,
00299               int numThreads = 1)
00300     {
00301         MessageLoop::init(numThreads, maxAddedLatency);
00302         zmqEndpoint.init(config, ZMQ_XREP, endpointName + "/zeromq");
00303         httpEndpoint.init(config, endpointName + "/http");
00304 
00305         auto zmqHandler = [=] (std::vector<std::string> && message)
00306             {
00307                 using namespace std;
00308 
00309                 if (message.size() < 6) {
00310                     cerr << "ignored message with invalid number of members:"
00311                          << message.size()
00312                          << endl;
00313                     return;
00314                 }
00315                 //cerr << "got REST message at " << this << " " << message << endl;
00316                 this->doHandleRequest(ConnectionId(message.at(0),
00317                                                    message.at(1),
00318                                                    this),
00319                                       RestRequest(message.at(2),
00320                                                   message.at(3),
00321                                                   RestParams::fromBinary(message.at(4)),
00322                                                   message.at(5)));
00323             };
00324         
00325         zmqEndpoint.messageHandler = zmqHandler;
00326         
00327         httpEndpoint.onRequest
00328             = [=] (HttpNamedEndpoint::RestConnectionHandler * connection,
00329                    const HttpHeader & header,
00330                    const std::string & payload)
00331             {
00332                 std::string requestId = this->getHttpRequestId();
00333                 this->doHandleRequest(ConnectionId(connection, requestId, this),
00334                                       RestRequest(header, payload));
00335             };
00336         
00337         addSource("RestServiceEndpoint::zmqEndpoint", zmqEndpoint);
00338         addSource("RestServiceEndpoint::httpEndpoint", httpEndpoint);
00339 
00340     }
00341 
00345     std::pair<std::string, std::string>
00346     bindTcp(PortRange const & zmqRange = PortRange(), PortRange const & httpRange = PortRange(), std::string host = "")
00347     {
00348         std::string httpAddr = httpEndpoint.bindTcp(httpRange, host);
00349         std::string zmqAddr = zmqEndpoint.bindTcp(zmqRange, host);
00350         return std::make_pair(zmqAddr, httpAddr);
00351     }
00352 
00358     std::string bindFixedHttpAddress(std::string host, int port)
00359     {
00360         return httpEndpoint.bindTcpFixed(host, port);
00361     }
00362 
00363     std::string bindFixedHttpAddress(std::string address)
00364     {
00365         return httpEndpoint.bindTcpAddress(address);
00366     }
00367 
00369     typedef std::function<void (ConnectionId connection,
00370                                 RestRequest request)> OnHandleRequest;
00371 
00372     OnHandleRequest onHandleRequest;
00373 
00377     virtual void handleRequest(const ConnectionId & connection,
00378                                const RestRequest & request) const
00379     {
00380         using namespace std;
00381 
00382         //cerr << "got request " << request << endl;
00383         if (onHandleRequest) {
00384             onHandleRequest(connection, request);
00385         }
00386         else {
00387             throw ML::Exception("need to override handleRequest or assign to "
00388                                 "onHandleRequest");
00389         }
00390     }
00391 
00392     ZmqNamedEndpoint zmqEndpoint;
00393     HttpNamedEndpoint httpEndpoint;
00394 
00395     std::function<void (const ConnectionId & conn, const RestRequest & req) > logRequest;
00396     std::function<void (const ConnectionId & conn,
00397                         int code,
00398                         const std::string & resp,
00399                         const std::string & contentType) > logResponse;
00400 
00401     void doHandleRequest(const ConnectionId & connection,
00402                          const RestRequest & request)
00403     {
00404         if (logRequest)
00405             logRequest(connection, request);
00406 
00407         handleRequest(connection, request);
00408     }
00409     
00410     // Create a random request ID for an HTTP request
00411     std::string getHttpRequestId() const
00412     {
00413         std::string s = Date::now().print(9) + ML::format("%d", random());
00414         uint64_t jobId = CityHash64(s.c_str(), s.size());
00415         return ML::format("%016llx", jobId);
00416     }
00417     
00418 };
00419 
00420 } // namespace Datacratic
00421 
00422 #endif /* __service__zmq_json_endpoint_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator