![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rest_proxy.h -*- C++ -*- 00002 Jeremy Banres, 14 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #pragma once 00008 00009 #include "soa/service/zmq_endpoint.h" 00010 #include "soa/service/typed_message_channel.h" 00011 #include "soa/service/rest_service_endpoint.h" 00012 00013 namespace Datacratic { 00014 00015 00016 /*****************************************************************************/ 00017 /* HELPER FUNCTIONS */ 00018 /*****************************************************************************/ 00019 00022 template<typename Result> 00023 void decodeRestResponseJson(const std::string & functionName, 00024 std::exception_ptr exc, 00025 int resultCode, 00026 const std::string & body, 00027 std::function<void (std::exception_ptr, 00028 Result &&)> onDone) 00029 { 00030 Result result; 00031 00032 try { 00033 if (exc) { 00034 onDone(exc, std::move(result)); 00035 return; 00036 } 00037 else if (resultCode < 200 || resultCode >= 300) { 00038 onDone(std::make_exception_ptr 00039 (ML::Exception("%s REST request failed: %d: %s", 00040 functionName.c_str(), 00041 resultCode, 00042 body.c_str())), 00043 std::move(result)); 00044 return; 00045 } 00046 else { 00047 onDone(nullptr, std::move(static_cast<Result>(Result::fromJson(Json::parse(body))))); 00048 } 00049 } catch (...) { 00050 onDone(std::current_exception(), std::move(result)); 00051 } 00052 } 00053 00054 template<typename Result> 00055 std::function<void (std::exception_ptr, int, std::string)> 00056 makeRestResponseJsonDecoder(std::string functionName, 00057 std::function<void (std::exception_ptr, 00058 Result &&)> onDone) 00059 { 00060 return [=] (std::exception_ptr exc, 00061 int resultCode, 00062 std::string body) 00063 { 00064 if (!onDone) 00065 return; 00066 decodeRestResponseJson<Result>(functionName, 00067 exc, resultCode, body, 00068 onDone); 00069 }; 00070 } 00071 00072 00073 /*****************************************************************************/ 00074 /* REST PROXY */ 00075 /*****************************************************************************/ 00076 00081 struct RestProxy: public MessageLoop { 00082 00083 RestProxy(); 00084 00085 RestProxy(const std::shared_ptr<zmq::context_t> & context); 00086 00087 ~RestProxy(); 00088 00089 void sleepUntilIdle(); 00090 00091 void shutdown(); 00092 00094 void init(std::shared_ptr<ConfigurationService> config, 00095 const std::string & serviceName); 00096 00098 void initServiceClass(std::shared_ptr<ConfigurationService> config, 00099 const std::string & serviceClass, 00100 const std::string & endpointName); 00101 00102 typedef std::function<void (std::exception_ptr, 00103 int responseCode, const std::string &)> OnDone; 00104 00108 void push(const RestRequest & request, const OnDone & onDone); 00109 00111 void push(const OnDone & onDone, 00112 const std::string & method, 00113 const std::string & resource, 00114 const RestParams & params = RestParams(), 00115 const std::string & payload = ""); 00116 00117 size_t numMessagesOutstanding() const 00118 { 00119 return numMessagesOutstanding_; 00120 } 00121 00122 protected: 00123 std::string serviceName_; 00124 00125 struct Operation { 00126 RestRequest request; 00127 OnDone onDone; 00128 }; 00129 00130 TypedMessageSink<Operation> operationQueue; 00131 ZmqNamedProxy connection; 00132 00133 std::map<uint64_t, OnDone> outstanding; 00134 int numMessagesOutstanding_; // atomic so can be read with no lock 00135 uint64_t currentOpId; 00136 00137 void handleOperation(const Operation & op); 00138 void handleZmqResponse(const std::vector<std::string> & message); 00139 }; 00140 00141 } // namespace Datacratic