RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
plugins/augmentor/augmentor_base.h
00001 /* augmentor_base.h                                                  -*- C++ -*-
00002    Jeremy Barnes, 3 March 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Base class for bid request augmentors.
00006 */
00007 
00008 #ifndef __rtb__augmentor_base_h__
00009 #define __rtb__augmentor_base_h__
00010 
00011 #include "soa/service/zmq.hpp"
00012 #include "soa/types/id.h"
00013 #include "rtbkit/common/auction.h"
00014 #include "rtbkit/common/augmentation.h"
00015 #include "soa/service/service_base.h"
00016 #include "soa/service/zmq_utils.h"
00017 #include "soa/service/socket_per_thread.h"
00018 #include "soa/service/typed_message_channel.h"
00019 #include "jml/arch/futex.h"
00020 #include "jml/utils/ring_buffer.h"
00021 #include "soa/service/zmq_endpoint.h"
00022 
00023 #include <boost/make_shared.hpp>
00024 #include <boost/function.hpp>
00025 #include <boost/thread.hpp>
00026 
00027 
00028 namespace RTBKIT {
00029 
00030 /******************************************************************************/
00031 /* AUGMENTATION REQUEST                                                       */
00032 /******************************************************************************/
00033 
00040 struct AugmentationRequest
00041 {
00042     std::string augmentor;                    // Name of the augmentor
00043     std::string router;                       // Router to respond to
00044     Id id;                                    // Auction id
00045     std::shared_ptr<BidRequest> bidRequest;   // Bid request to augment
00046     std::vector<std::string> agents;          // Agents availble to bid
00047     double timeAvailableMs;                   // Time to respond
00048     Date startTime;                           // Start of the latency timer
00049 };
00050 
00051 
00052 /*****************************************************************************/
00053 /* AUGMENTOR BASE                                                            */
00054 /*****************************************************************************/
00055 
00060 struct Augmentor : public ServiceBase, public MessageLoop {
00061 
00062     Augmentor(const std::string & augmentorName,
00063                   const std::string & serviceName,
00064                   std::shared_ptr<ServiceProxies> proxies);
00065 
00066     Augmentor(const std::string & augmentorName,
00067                   const std::string & serviceName,
00068                   ServiceBase & parent);
00069 
00070     ~Augmentor();
00071 
00072     void init();
00073     void start();
00074     void shutdown();
00075 
00079     void configureAndWait();
00080 
00082     typedef boost::function<void (const AugmentationRequest &)> OnRequest;
00083 
00085     OnRequest onRequest;
00086 
00088     void respond(const AugmentationRequest & request,
00089                  const AugmentationList & response);
00090 
00091 private:
00092     std::string augmentorName; // This can differ from the servicenName!
00093 
00094     ZmqMultipleNamedClientBusProxy toRouters;
00095 
00096     typedef std::pair<AugmentationRequest, AugmentationList> Response;
00097     TypedMessageSink<Response> responseQueue;
00098 
00099     void handleRouterMessage(const std::string & router,
00100                              const std::vector<std::string> & message);
00101 };
00102 
00103 
00104 /*****************************************************************************/
00105 /* MULTI THREADED AUGMENTOR                                                   */
00106 /*****************************************************************************/
00107 
00110 struct MultiThreadedAugmentor : public Augmentor {
00111 
00112     MultiThreadedAugmentor(const std::string & augmentorName,
00113                                const std::string & serviceName,
00114                                std::shared_ptr<ServiceProxies> proxies);
00115 
00116     MultiThreadedAugmentor(const std::string & augmentorName,
00117                                const std::string & serviceName,
00118                                ServiceBase & parent);
00119 
00120     ~MultiThreadedAugmentor();
00121 
00122     void init(int numThreads);
00123     void shutdown();
00124 
00125 protected:
00126 
00127     virtual void doRequestImpl(const AugmentationRequest &) = 0;
00128 
00129 private:
00130     uint64_t numWithInfo;
00131 
00132     ML::RingBufferSWMR<AugmentationRequest> ringBuffer;
00133 
00134     boost::thread_group workers;
00135 
00136     void runWorker();
00137 
00138     boost::thread_group workerThreads;
00139 
00140     int numThreadsCreated;
00141 
00142     volatile bool shutdown_;
00143 
00144     void pushRequest(const AugmentationRequest &);
00145 };
00146 
00147 
00148 /******************************************************************************/
00149 /* SYNC AUGMENTOR BASE                                                        */
00150 /******************************************************************************/
00151 
00158 struct SyncAugmentor : public MultiThreadedAugmentor
00159 {
00160 
00161     SyncAugmentor(const std::string & augmentorName,
00162                       const std::string& serviceName,
00163                       std::shared_ptr<ServiceProxies> proxies)
00164             : MultiThreadedAugmentor(augmentorName, serviceName, proxies)
00165     {
00166         doRequest = boost::bind(&SyncAugmentor::onRequest, this, _1);
00167     }
00168 
00169     SyncAugmentor(const std::string & augmentorName,
00170                       const std::string & serviceName,
00171                       ServiceBase& parent)
00172         : MultiThreadedAugmentor(augmentorName, serviceName, parent)
00173     {
00174         doRequest = boost::bind(&SyncAugmentor::onRequest, this, _1);
00175     }
00176 
00177     boost::function<AugmentationList(const AugmentationRequest &) > doRequest;
00178 
00179     virtual AugmentationList
00180     onRequest(const AugmentationRequest &)
00181     {
00182         throw ML::Exception("onRequest or doRequest must be overridden");
00183     }
00184 
00185 protected:
00186 
00187     virtual void
00188     doRequestImpl(const AugmentationRequest & request)
00189     {
00190         AugmentationList response = doRequest(request);
00191         respond(request, response);
00192     }
00193 };
00194 
00195 
00196 /******************************************************************************/
00197 /* ASYNC AUGMENTOR BASE                                                       */
00198 /******************************************************************************/
00199 
00206 struct AsyncAugmentor : public MultiThreadedAugmentor
00207 {
00208 
00209     AsyncAugmentor(const std::string & augmentorName,
00210                        const std::string & serviceName,
00211                        std::shared_ptr<ServiceProxies> proxies) :
00212         MultiThreadedAugmentor(augmentorName, serviceName, proxies)
00213     {
00214         doRequest = boost::bind(&AsyncAugmentor::onRequest, this, _1, _2);
00215     }
00216 
00217     AsyncAugmentor(const std::string & augmentorName,
00218                        const std::string & serviceName,
00219                        ServiceBase& parent) :
00220         MultiThreadedAugmentor(augmentorName, serviceName, parent)
00221     {
00222         doRequest = boost::bind(&AsyncAugmentor::onRequest, this, _1, _2);
00223     }
00224 
00225     typedef std::function<void (const AugmentationList &)> SendResponseCB;
00226 
00227     boost::function<void (const AugmentationRequest &, SendResponseCB) >
00228         doRequest;
00229 
00230     virtual void
00231     onRequest(const AugmentationRequest & request, SendResponseCB sendResponse)
00232     {
00233         throw ML::Exception("onRequest or doRequest must be overridden");
00234     };
00235 
00236 protected:
00237 
00238     virtual void
00239     doRequestImpl(const AugmentationRequest & request)
00240     {
00241         auto sendResponse = [=](const AugmentationList & response) {
00242             respond(request, response);
00243         };
00244         doRequest(request, sendResponse);
00245     }
00246 };
00247 
00248 } // namespace RTBKIT
00249 
00250 
00251 #endif /* __rtb__augmentor_base_h__ */
00252 
00253 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator