RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/rest_proxy.h
00001 /* rest_proxy.h                                                    -*- C++ -*-
00002    Jeremy Banres, 14 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #pragma once
00008 
00009 #include "soa/service/zmq_endpoint.h"
00010 #include "soa/service/typed_message_channel.h"
00011 #include "soa/service/rest_service_endpoint.h"
00012 
00013 namespace Datacratic {
00014 
00015 
00016 /*****************************************************************************/
00017 /* HELPER FUNCTIONS                                                          */
00018 /*****************************************************************************/
00019 
00022 template<typename Result>
00023 void decodeRestResponseJson(const std::string & functionName,
00024                             std::exception_ptr exc,
00025                             int resultCode,
00026                             const std::string & body,
00027                             std::function<void (std::exception_ptr,
00028                                                 Result &&)> onDone)
00029 {
00030     Result result;
00031 
00032     try {
00033         if (exc) {
00034             onDone(exc, std::move(result));
00035             return;
00036         }
00037         else if (resultCode < 200 || resultCode >= 300) {
00038             onDone(std::make_exception_ptr
00039                    (ML::Exception("%s REST request failed: %d: %s",
00040                                   functionName.c_str(),
00041                                   resultCode,
00042                                   body.c_str())),
00043                    std::move(result));
00044             return;
00045         }
00046         else {
00047             onDone(nullptr, std::move(static_cast<Result>(Result::fromJson(Json::parse(body)))));
00048         }
00049     } catch (...) {
00050         onDone(std::current_exception(), std::move(result));
00051     }
00052 }
00053 
00054 template<typename Result>
00055 std::function<void (std::exception_ptr, int, std::string)>
00056 makeRestResponseJsonDecoder(std::string functionName,
00057                             std::function<void (std::exception_ptr,
00058                                                 Result &&)> onDone)
00059 {
00060     return [=] (std::exception_ptr exc,
00061                 int resultCode,
00062                 std::string body)
00063         {
00064             if (!onDone)
00065                 return;
00066             decodeRestResponseJson<Result>(functionName,
00067                                            exc, resultCode, body,
00068                                            onDone);
00069         };
00070 }
00071 
00072 
00073 /*****************************************************************************/
00074 /* REST PROXY                                                                */
00075 /*****************************************************************************/
00076 
00081 struct RestProxy: public MessageLoop {
00082 
00083     RestProxy();
00084 
00085     RestProxy(const std::shared_ptr<zmq::context_t> & context);
00086     
00087     ~RestProxy();
00088 
00089     void sleepUntilIdle();
00090 
00091     void shutdown();
00092 
00094     void init(std::shared_ptr<ConfigurationService> config,
00095               const std::string & serviceName);
00096     
00098     void initServiceClass(std::shared_ptr<ConfigurationService> config,
00099                           const std::string & serviceClass,
00100                           const std::string & endpointName);
00101     
00102     typedef std::function<void (std::exception_ptr,
00103                                 int responseCode, const std::string &)> OnDone;
00104 
00108     void push(const RestRequest & request, const OnDone & onDone);
00109 
00111     void push(const OnDone & onDone,
00112               const std::string & method,
00113               const std::string & resource,
00114               const RestParams & params = RestParams(),
00115               const std::string & payload = "");
00116 
00117     size_t numMessagesOutstanding() const
00118     {
00119         return numMessagesOutstanding_;
00120     }
00121 
00122 protected:
00123     std::string serviceName_;
00124 
00125     struct Operation {
00126         RestRequest request;
00127         OnDone onDone;
00128     };
00129 
00130     TypedMessageSink<Operation> operationQueue;
00131     ZmqNamedProxy connection;
00132 
00133     std::map<uint64_t, OnDone> outstanding;
00134     int numMessagesOutstanding_;  // atomic so can be read with no lock
00135     uint64_t currentOpId;
00136 
00137     void handleOperation(const Operation & op);
00138     void handleZmqResponse(const std::vector<std::string> & message);
00139 };
00140 
00141 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator