RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/rest_proxy.cc
00001 /* rest_proxy.cc
00002    Jeremy Barnes, 14 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #include "rest_proxy.h"
00008 
00009 using namespace std;
00010 using namespace ML;
00011 
00012 namespace Datacratic {
00013 
00014 /*****************************************************************************/
00015 /* REST PROXY                                                                */
00016 /*****************************************************************************/
00017 
00018 RestProxy::
00019 RestProxy()
00020     : operationQueue(1024),
00021       numMessagesOutstanding_(0),
00022       currentOpId(1)
00023 {
00024     // What to do when we get a new entry in the queue?
00025     operationQueue.onEvent = std::bind(&RestProxy::handleOperation,
00026                                        this, std::placeholders::_1);
00027     
00028 }
00029 
00030 RestProxy::
00031 RestProxy(const std::shared_ptr<zmq::context_t> & context)
00032     : operationQueue(1024),
00033       connection(context),
00034       numMessagesOutstanding_(0),
00035       currentOpId(1)
00036 {
00037     // What to do when we get a new entry in the queue?
00038     operationQueue.onEvent = std::bind(&RestProxy::handleOperation,
00039                                        this, std::placeholders::_1);
00040     
00041 }
00042 
00043 RestProxy::
00044 ~RestProxy()
00045 {
00046     shutdown();
00047 }
00048 
00049 void
00050 RestProxy::
00051 sleepUntilIdle()
00052 {
00053     for (;;) {
00054         int o = numMessagesOutstanding_;
00055         //cerr << "numMessagesOustanding = " << o << endl;
00056         if (!o)
00057             return;
00058         ML::futex_wait(numMessagesOutstanding_, o, 0.01);
00059     }
00060 }
00061 
00062 void
00063 RestProxy::
00064 shutdown()
00065 {
00066     // Stop processing messages
00067     MessageLoop::shutdown();
00068 
00069     connection.shutdown();
00070 }
00071 
00072 void
00073 RestProxy::
00074 init(std::shared_ptr<ConfigurationService> config,
00075      const std::string & serviceName)
00076 {
00077     serviceName_ = serviceName;
00078 
00079     connection.init(config, ZMQ_XREQ);
00080     connection.connect(serviceName + "/zeromq");
00081     
00082     addSource("RestProxy::operationQueue", operationQueue);
00083 
00084     // What to do when we get something back from zeromq?
00085     addSource("RestProxy::handleZmqResponse",
00086               std::make_shared<ZmqEventSource>
00087               (connection.socket(),
00088                std::bind(&RestProxy::handleZmqResponse,
00089                          this,
00090                          std::placeholders::_1)));
00091 }
00092 
00093 void
00094 RestProxy::
00095 initServiceClass(std::shared_ptr<ConfigurationService> config,
00096                  const std::string & serviceClass,
00097                  const std::string & serviceEndpoint)
00098 {
00099     connection.init(config, ZMQ_XREQ);
00100     connection.connectToServiceClass(serviceClass, serviceEndpoint);
00101     
00102     addSource("RestProxy::operationQueue", operationQueue);
00103 
00104     // What to do when we get something back from zeromq?
00105     addSource("RestProxy::handleZmqResponse",
00106               std::make_shared<ZmqEventSource>
00107               (connection.socket(),
00108                std::bind(&RestProxy::handleZmqResponse,
00109                          this,
00110                          std::placeholders::_1)));
00111 }
00112 
00113 void
00114 RestProxy::
00115 push(const RestRequest & request, const OnDone & onDone)
00116 {
00117     Operation op;
00118     op.request = request;
00119     op.onDone = onDone;
00120     if (operationQueue.tryPush(std::move(op)))
00121         ML::atomic_inc(numMessagesOutstanding_);
00122     else
00123         throw ML::Exception("queue is full");
00124 }
00125 
00126 void
00127 RestProxy::
00128 push(const OnDone & onDone,
00129      const std::string & method,
00130      const std::string & resource,
00131      const RestParams & params,
00132      const std::string & payload)
00133 {
00134     RestRequest request(method, resource, params, payload);
00135     push(request, onDone);
00136 }
00137 
00138 void
00139 RestProxy::
00140 handleOperation(const Operation & op)
00141 {
00142     // Gets called when someone calls our API to make something happen;
00143     // this is run by the main worker thread to actually do the work.
00144     // It forwards the request off to the master banker.
00145     uint64_t opId = 0;
00146     if (op.onDone)
00147         opId = currentOpId++;
00148 
00149     //cerr << "sending with payload " << op.request.payload
00150     //     << " and response id " << opId << endl;
00151 
00152     if (trySendMessage(connection.socket(),
00153                        std::to_string(opId),
00154                        op.request.verb,
00155                        op.request.resource,
00156                        op.request.params.toBinary(),
00157                        op.request.payload)) {
00158         if (opId)
00159             outstanding[opId] = op.onDone;
00160         else {
00161             int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1);
00162             if (no == 0)
00163                 futex_wake(numMessagesOutstanding_);
00164         }
00165     }
00166     else {
00167         if (op.onDone) {
00168             string exc_msg = ("connection to '" + serviceName_
00169                               + "' is unavailable");
00170             op.onDone(make_exception_ptr<ML::Exception>(exc_msg), 0, "");
00171         }
00172         int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1);
00173         if (no == 0)
00174             futex_wake(numMessagesOutstanding_);
00175     }
00176 }
00177 
00178 void
00179 RestProxy::
00180 handleZmqResponse(const std::vector<std::string> & message)
00181 {
00182     // Gets called when we get a response back from the master banker in
00183     // response to one of our calls.
00184 
00185     // We call the callback associated with this code.
00186 
00187     //cerr << "response is " << message << endl;
00188 
00189     uint64_t opId = boost::lexical_cast<uint64_t>(message.at(0));
00190     int responseCode = boost::lexical_cast<int>(message.at(1));
00191     std::string body = message.at(2);
00192 
00193     ExcAssert(opId);
00194 
00195     auto it = outstanding.find(opId);
00196     if (it == outstanding.end()) {
00197         cerr << "unknown op ID " << endl;
00198         return;
00199     }
00200     try {
00201         if (responseCode >= 200 && responseCode < 300) 
00202             it->second(nullptr, responseCode, body);
00203         else
00204             it->second(std::make_exception_ptr(ML::Exception(body)),
00205                        responseCode, "");
00206     } catch (const std::exception & exc) {
00207         cerr << "warning: exception handling banker result: "
00208              << exc.what() << endl;
00209     } catch (...) {
00210         cerr << "warning: unknown exception handling banker result"
00211              << endl;
00212     }
00213 
00214     outstanding.erase(it);
00215     
00216     ML::atomic_dec(numMessagesOutstanding_);
00217 }
00218 
00219 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator