RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
plugins/augmentor/augmentor_base.cc
00001 /* augmentor_base.cc
00002 
00003    Jeremy Barnes, 4 March 2012
00004    Copyright (c) 2012 Datacratic.  All rights reserved.
00005 
00006    Object that handles doing augmented bid requests.
00007 */
00008 
00009 #include "augmentor_base.h"
00010 #include "soa/service/zmq_utils.h"
00011 #include "jml/arch/timers.h"
00012 #include "jml/utils/vector_utils.h"
00013 #include "jml/arch/futex.h"
00014 
00015 
00016 using namespace std;
00017 using namespace ML;
00018 
00019 
00020 namespace RTBKIT {
00021 
00022 
00023 /*****************************************************************************/
00024 /* AUGMENTOR                                                                 */
00025 /*****************************************************************************/
00026 
00027 // Determined via a very scientific method: 2^16 should be enough... right?
00028 enum { QueueSize = 65536 };
00029 
00030 Augmentor::
00031 Augmentor(const std::string & augmentorName,
00032           const std::string & serviceName,
00033           std::shared_ptr<ServiceProxies> proxies)
00034     : ServiceBase(serviceName, proxies),
00035       augmentorName(augmentorName),
00036       toRouters(getZmqContext()),
00037       responseQueue(QueueSize)
00038 {
00039 }
00040 
00041 Augmentor::
00042 Augmentor(const std::string & augmentorName,
00043           const std::string & serviceName,
00044           ServiceBase& parent)
00045     : ServiceBase(serviceName, parent),
00046       augmentorName(augmentorName),
00047       toRouters(getZmqContext()),
00048       responseQueue(QueueSize)
00049 {
00050 }
00051 
00052 Augmentor::
00053 ~Augmentor()
00054 {
00055     shutdown();
00056 }
00057 
00058 void
00059 Augmentor::
00060 init()
00061 {
00062     responseQueue.onEvent = [=] (const Response& resp)
00063         {
00064             const AugmentationRequest& request = resp.first;
00065             const AugmentationList& response = resp.second;
00066 
00067             toRouters.sendMessage(
00068                     request.router,
00069                     "RESPONSE",
00070                     "1.0",
00071                     request.startTime,
00072                     request.id.toString(),
00073                     request.augmentor,
00074                     chomp(response.toJson().toString()));
00075 
00076             recordHit("messages.RESPONSE");
00077         };
00078 
00079     addSource("Augmentor::responseQueue", responseQueue);
00080 
00081     toRouters.init(getServices()->config, serviceName());
00082 
00083     toRouters.connectHandler = [=] (const std::string & newRouter)
00084         {
00085             toRouters.sendMessage(newRouter,
00086                                   "CONFIG",
00087                                   "1.0",
00088                                   augmentorName);
00089 
00090             recordHit("messages.CONFIG");
00091         };
00092 
00093     toRouters.disconnectHandler = [=] (const std::string & oldRouter)
00094         {
00095             cerr << "disconnected from router " << oldRouter << endl;
00096         };
00097 
00098     toRouters.messageHandler = [=] (const std::string & router,
00099                                     const std::vector<std::string> & message)
00100         {
00101             handleRouterMessage(router, message);
00102         };
00103 
00104 
00105     toRouters.connectAllServiceProviders("rtbRouterAugmentation", "augmentors");
00106 
00107     addSource("Augmentor::toRouters", toRouters);
00108 
00109 
00110     double lastSleepTime = 0;
00111     addPeriodic("Augmentor::dutyCycle", 1.0, [=] (uint64_t) mutable {
00112                 double sleepTime = totalSleepSeconds();
00113                 recordLevel((sleepTime - lastSleepTime) * 1000.0, "sleepTime");
00114                 lastSleepTime = sleepTime;
00115             });
00116 }
00117 
00118 void
00119 Augmentor::
00120 start()
00121 {
00122     MessageLoop::start();
00123 }
00124 
00125 void
00126 Augmentor::
00127 shutdown()
00128 {
00129     MessageLoop::shutdown();
00130     toRouters.shutdown();
00131 }
00132 
00133 void
00134 Augmentor::
00135 configureAndWait()
00136 {
00137 #if 0
00138     sendMessage(toRouter,
00139                 "CONFIG",
00140                 "1.0",
00141                 this->serviceName());
00142 #endif
00143 
00144     throw ML::Exception("configureAndWait not re-implemented");
00145 }
00146 
00147 void
00148 Augmentor::
00149 respond(const AugmentationRequest & request, const AugmentationList & response)
00150 {
00151     if (responseQueue.tryPush(make_pair(request, response)))
00152         return;
00153 
00154     cerr << "Dropping augmentation response: response queue is full" << endl;
00155 }
00156 
00157 void
00158 Augmentor::
00159 handleRouterMessage(const std::string & router,
00160                     const std::vector<std::string> & message)
00161 {
00162    try {
00163         const std::string & type = message.at(0);
00164         recordHit("messages." + type);
00165 
00166         //cerr << "got augmentor message of type " << type << endl;
00167         if (type == "CONFIGOK") {
00168 #if 0
00169             if (!awaitingConfig.empty()) {
00170                 for (unsigned i = 0;  i < awaitingConfig.size();  ++i)
00171                     sendMessage(control, awaitingConfig[i],
00172                                 "CONFIGOK");
00173             }
00174             awaitingConfig.clear();
00175 #endif
00176         }
00177         else if (type == "AUGMENT") {
00178             const string & version = message.at(1);
00179 
00180             if (version != "1.0")
00181                 throw ML::Exception("unexpected version in augment");
00182 
00183             AugmentationRequest request;
00184             request.router = router;
00185             request.timeAvailableMs = 0.05;
00186             request.augmentor = message.at(2);
00187             request.id = Id(message.at(3));
00188 
00189             const string & bidRequestSource = message.at(4);
00190             const string & bidRequestStr = message.at(5);
00191 
00192             istringstream agentsStr(message.at(6));
00193             ML::DB::Store_Reader reader(agentsStr);
00194             reader.load(request.agents);
00195 
00196             const string & startTimeStr = message.at(7);
00197             request.startTime
00198                 = Date::fromSecondsSinceEpoch(strtod(startTimeStr.c_str(), 0));
00199 
00200             if (onRequest) {
00201                 request.bidRequest.reset(
00202                         BidRequest::parse(bidRequestSource, bidRequestStr));
00203 
00204                 onRequest(request);
00205             }
00206             else respond(request, AugmentationList());
00207         }
00208         else throw ML::Exception("unknown router message");
00209 
00210     } catch (const std::exception & exc) {
00211         cerr << "error handling augmentor message " << message
00212              << ": " << exc.what() << endl;
00213     }
00214 }
00215 
00216 
00217 /*****************************************************************************/
00218 /* MULTI THREADED AUGMENTOR                                                   */
00219 /*****************************************************************************/
00220 
00221 MultiThreadedAugmentor::
00222 MultiThreadedAugmentor(const std::string & augmentorName,
00223                        const std::string & serviceName,
00224                        std::shared_ptr<ServiceProxies> proxies)
00225     : Augmentor(augmentorName, serviceName, proxies),
00226       numWithInfo(0),
00227       ringBuffer(102400)
00228 {
00229     Augmentor::onRequest
00230         = boost::bind(&MultiThreadedAugmentor::pushRequest, this, _1);
00231     numThreadsCreated = 0;
00232 }
00233 
00234 MultiThreadedAugmentor::
00235 MultiThreadedAugmentor(const std::string & augmentorName,
00236                        const std::string & serviceName,
00237                        ServiceBase& parent)
00238     : Augmentor(augmentorName, serviceName, parent),
00239       numWithInfo(0),
00240       ringBuffer(102400)
00241 {
00242     Augmentor::onRequest
00243         = boost::bind(&MultiThreadedAugmentor::pushRequest, this, _1);
00244     numThreadsCreated = 0;
00245 }
00246 
00247 MultiThreadedAugmentor::
00248 ~MultiThreadedAugmentor()
00249 {
00250     shutdown();
00251 }
00252 
00253 void
00254 MultiThreadedAugmentor::
00255 init(int numThreads)
00256 {
00257     if (numThreadsCreated)
00258         throw ML::Exception("double init of augmentor");
00259 
00260     Augmentor::init();
00261 
00262     shutdown_ = false;
00263 
00264     for (unsigned i = 0;  i < numThreads;  ++i)
00265         workers.create_thread([=] () { this->runWorker(); });
00266 
00267     numThreadsCreated = numThreads;
00268 }
00269 
00270 void
00271 MultiThreadedAugmentor::
00272 shutdown()
00273 {
00274     shutdown_ = true;
00275 
00276     ML::memory_barrier();
00277 
00278     for (unsigned i = 0;  i < numThreadsCreated;  ++i)
00279         pushRequest(AugmentationRequest());
00280 
00281     workers.join_all();
00282 
00283     numThreadsCreated = 0;
00284 
00285     Augmentor::shutdown();
00286 }
00287 
00288 void
00289 MultiThreadedAugmentor::
00290 runWorker()
00291 {
00292     while (!shutdown_) {
00293         try {
00294             auto req = ringBuffer.pop();
00295             if (shutdown_)
00296                 return;
00297             doRequestImpl(req);
00298         } catch (const std::exception & exc) {
00299             std::cerr << "exception handling aug request: "
00300                       << exc.what() << std::endl;
00301         }
00302     }
00303 }
00304 
00305 void
00306 MultiThreadedAugmentor::
00307 pushRequest(const AugmentationRequest & request)
00308 {
00309     ringBuffer.push(request);
00310 }
00311 
00312 } // namespace RTBKIT
00313 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator