RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rest_proxy.cc 00002 Jeremy Barnes, 14 November 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #include "rest_proxy.h" 00008 00009 using namespace std; 00010 using namespace ML; 00011 00012 namespace Datacratic { 00013 00014 /*****************************************************************************/ 00015 /* REST PROXY */ 00016 /*****************************************************************************/ 00017 00018 RestProxy:: 00019 RestProxy() 00020 : operationQueue(1024), 00021 numMessagesOutstanding_(0), 00022 currentOpId(1) 00023 { 00024 // What to do when we get a new entry in the queue? 00025 operationQueue.onEvent = std::bind(&RestProxy::handleOperation, 00026 this, std::placeholders::_1); 00027 00028 } 00029 00030 RestProxy:: 00031 RestProxy(const std::shared_ptr<zmq::context_t> & context) 00032 : operationQueue(1024), 00033 connection(context), 00034 numMessagesOutstanding_(0), 00035 currentOpId(1) 00036 { 00037 // What to do when we get a new entry in the queue? 00038 operationQueue.onEvent = std::bind(&RestProxy::handleOperation, 00039 this, std::placeholders::_1); 00040 00041 } 00042 00043 RestProxy:: 00044 ~RestProxy() 00045 { 00046 shutdown(); 00047 } 00048 00049 void 00050 RestProxy:: 00051 sleepUntilIdle() 00052 { 00053 for (;;) { 00054 int o = numMessagesOutstanding_; 00055 //cerr << "numMessagesOustanding = " << o << endl; 00056 if (!o) 00057 return; 00058 ML::futex_wait(numMessagesOutstanding_, o, 0.01); 00059 } 00060 } 00061 00062 void 00063 RestProxy:: 00064 shutdown() 00065 { 00066 // Stop processing messages 00067 MessageLoop::shutdown(); 00068 00069 connection.shutdown(); 00070 } 00071 00072 void 00073 RestProxy:: 00074 init(std::shared_ptr<ConfigurationService> config, 00075 const std::string & serviceName) 00076 { 00077 serviceName_ = serviceName; 00078 00079 connection.init(config, ZMQ_XREQ); 00080 connection.connect(serviceName + "/zeromq"); 00081 00082 addSource("RestProxy::operationQueue", operationQueue); 00083 00084 // What to do when we get something back from zeromq? 00085 addSource("RestProxy::handleZmqResponse", 00086 std::make_shared<ZmqEventSource> 00087 (connection.socket(), 00088 std::bind(&RestProxy::handleZmqResponse, 00089 this, 00090 std::placeholders::_1))); 00091 } 00092 00093 void 00094 RestProxy:: 00095 initServiceClass(std::shared_ptr<ConfigurationService> config, 00096 const std::string & serviceClass, 00097 const std::string & serviceEndpoint) 00098 { 00099 connection.init(config, ZMQ_XREQ); 00100 connection.connectToServiceClass(serviceClass, serviceEndpoint); 00101 00102 addSource("RestProxy::operationQueue", operationQueue); 00103 00104 // What to do when we get something back from zeromq? 00105 addSource("RestProxy::handleZmqResponse", 00106 std::make_shared<ZmqEventSource> 00107 (connection.socket(), 00108 std::bind(&RestProxy::handleZmqResponse, 00109 this, 00110 std::placeholders::_1))); 00111 } 00112 00113 void 00114 RestProxy:: 00115 push(const RestRequest & request, const OnDone & onDone) 00116 { 00117 Operation op; 00118 op.request = request; 00119 op.onDone = onDone; 00120 if (operationQueue.tryPush(std::move(op))) 00121 ML::atomic_inc(numMessagesOutstanding_); 00122 else 00123 throw ML::Exception("queue is full"); 00124 } 00125 00126 void 00127 RestProxy:: 00128 push(const OnDone & onDone, 00129 const std::string & method, 00130 const std::string & resource, 00131 const RestParams & params, 00132 const std::string & payload) 00133 { 00134 RestRequest request(method, resource, params, payload); 00135 push(request, onDone); 00136 } 00137 00138 void 00139 RestProxy:: 00140 handleOperation(const Operation & op) 00141 { 00142 // Gets called when someone calls our API to make something happen; 00143 // this is run by the main worker thread to actually do the work. 00144 // It forwards the request off to the master banker. 00145 uint64_t opId = 0; 00146 if (op.onDone) 00147 opId = currentOpId++; 00148 00149 //cerr << "sending with payload " << op.request.payload 00150 // << " and response id " << opId << endl; 00151 00152 if (trySendMessage(connection.socket(), 00153 std::to_string(opId), 00154 op.request.verb, 00155 op.request.resource, 00156 op.request.params.toBinary(), 00157 op.request.payload)) { 00158 if (opId) 00159 outstanding[opId] = op.onDone; 00160 else { 00161 int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1); 00162 if (no == 0) 00163 futex_wake(numMessagesOutstanding_); 00164 } 00165 } 00166 else { 00167 if (op.onDone) { 00168 string exc_msg = ("connection to '" + serviceName_ 00169 + "' is unavailable"); 00170 op.onDone(make_exception_ptr<ML::Exception>(exc_msg), 0, ""); 00171 } 00172 int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1); 00173 if (no == 0) 00174 futex_wake(numMessagesOutstanding_); 00175 } 00176 } 00177 00178 void 00179 RestProxy:: 00180 handleZmqResponse(const std::vector<std::string> & message) 00181 { 00182 // Gets called when we get a response back from the master banker in 00183 // response to one of our calls. 00184 00185 // We call the callback associated with this code. 00186 00187 //cerr << "response is " << message << endl; 00188 00189 uint64_t opId = boost::lexical_cast<uint64_t>(message.at(0)); 00190 int responseCode = boost::lexical_cast<int>(message.at(1)); 00191 std::string body = message.at(2); 00192 00193 ExcAssert(opId); 00194 00195 auto it = outstanding.find(opId); 00196 if (it == outstanding.end()) { 00197 cerr << "unknown op ID " << endl; 00198 return; 00199 } 00200 try { 00201 if (responseCode >= 200 && responseCode < 300) 00202 it->second(nullptr, responseCode, body); 00203 else 00204 it->second(std::make_exception_ptr(ML::Exception(body)), 00205 responseCode, ""); 00206 } catch (const std::exception & exc) { 00207 cerr << "warning: exception handling banker result: " 00208 << exc.what() << endl; 00209 } catch (...) { 00210 cerr << "warning: unknown exception handling banker result" 00211 << endl; 00212 } 00213 00214 outstanding.erase(it); 00215 00216 ML::atomic_dec(numMessagesOutstanding_); 00217 } 00218 00219 } // namespace Datacratic