![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* augmentor_base.h -*- C++ -*- 00002 Jeremy Barnes, 3 March 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Base class for bid request augmentors. 00006 */ 00007 00008 #ifndef __rtb__augmentor_base_h__ 00009 #define __rtb__augmentor_base_h__ 00010 00011 #include "soa/service/zmq.hpp" 00012 #include "soa/types/id.h" 00013 #include "rtbkit/common/auction.h" 00014 #include "rtbkit/common/augmentation.h" 00015 #include "soa/service/service_base.h" 00016 #include "soa/service/zmq_utils.h" 00017 #include "soa/service/socket_per_thread.h" 00018 #include "soa/service/typed_message_channel.h" 00019 #include "jml/arch/futex.h" 00020 #include "jml/utils/ring_buffer.h" 00021 #include "soa/service/zmq_endpoint.h" 00022 00023 #include <boost/make_shared.hpp> 00024 #include <boost/function.hpp> 00025 #include <boost/thread.hpp> 00026 00027 00028 namespace RTBKIT { 00029 00030 /******************************************************************************/ 00031 /* AUGMENTATION REQUEST */ 00032 /******************************************************************************/ 00033 00040 struct AugmentationRequest 00041 { 00042 std::string augmentor; // Name of the augmentor 00043 std::string router; // Router to respond to 00044 Id id; // Auction id 00045 std::shared_ptr<BidRequest> bidRequest; // Bid request to augment 00046 std::vector<std::string> agents; // Agents availble to bid 00047 double timeAvailableMs; // Time to respond 00048 Date startTime; // Start of the latency timer 00049 }; 00050 00051 00052 /*****************************************************************************/ 00053 /* AUGMENTOR BASE */ 00054 /*****************************************************************************/ 00055 00060 struct Augmentor : public ServiceBase, public MessageLoop { 00061 00062 Augmentor(const std::string & augmentorName, 00063 const std::string & serviceName, 00064 std::shared_ptr<ServiceProxies> proxies); 00065 00066 Augmentor(const std::string & augmentorName, 00067 const std::string & serviceName, 00068 ServiceBase & parent); 00069 00070 ~Augmentor(); 00071 00072 void init(); 00073 void start(); 00074 void shutdown(); 00075 00079 void configureAndWait(); 00080 00082 typedef boost::function<void (const AugmentationRequest &)> OnRequest; 00083 00085 OnRequest onRequest; 00086 00088 void respond(const AugmentationRequest & request, 00089 const AugmentationList & response); 00090 00091 private: 00092 std::string augmentorName; // This can differ from the servicenName! 00093 00094 ZmqMultipleNamedClientBusProxy toRouters; 00095 00096 typedef std::pair<AugmentationRequest, AugmentationList> Response; 00097 TypedMessageSink<Response> responseQueue; 00098 00099 void handleRouterMessage(const std::string & router, 00100 const std::vector<std::string> & message); 00101 }; 00102 00103 00104 /*****************************************************************************/ 00105 /* MULTI THREADED AUGMENTOR */ 00106 /*****************************************************************************/ 00107 00110 struct MultiThreadedAugmentor : public Augmentor { 00111 00112 MultiThreadedAugmentor(const std::string & augmentorName, 00113 const std::string & serviceName, 00114 std::shared_ptr<ServiceProxies> proxies); 00115 00116 MultiThreadedAugmentor(const std::string & augmentorName, 00117 const std::string & serviceName, 00118 ServiceBase & parent); 00119 00120 ~MultiThreadedAugmentor(); 00121 00122 void init(int numThreads); 00123 void shutdown(); 00124 00125 protected: 00126 00127 virtual void doRequestImpl(const AugmentationRequest &) = 0; 00128 00129 private: 00130 uint64_t numWithInfo; 00131 00132 ML::RingBufferSWMR<AugmentationRequest> ringBuffer; 00133 00134 boost::thread_group workers; 00135 00136 void runWorker(); 00137 00138 boost::thread_group workerThreads; 00139 00140 int numThreadsCreated; 00141 00142 volatile bool shutdown_; 00143 00144 void pushRequest(const AugmentationRequest &); 00145 }; 00146 00147 00148 /******************************************************************************/ 00149 /* SYNC AUGMENTOR BASE */ 00150 /******************************************************************************/ 00151 00158 struct SyncAugmentor : public MultiThreadedAugmentor 00159 { 00160 00161 SyncAugmentor(const std::string & augmentorName, 00162 const std::string& serviceName, 00163 std::shared_ptr<ServiceProxies> proxies) 00164 : MultiThreadedAugmentor(augmentorName, serviceName, proxies) 00165 { 00166 doRequest = boost::bind(&SyncAugmentor::onRequest, this, _1); 00167 } 00168 00169 SyncAugmentor(const std::string & augmentorName, 00170 const std::string & serviceName, 00171 ServiceBase& parent) 00172 : MultiThreadedAugmentor(augmentorName, serviceName, parent) 00173 { 00174 doRequest = boost::bind(&SyncAugmentor::onRequest, this, _1); 00175 } 00176 00177 boost::function<AugmentationList(const AugmentationRequest &) > doRequest; 00178 00179 virtual AugmentationList 00180 onRequest(const AugmentationRequest &) 00181 { 00182 throw ML::Exception("onRequest or doRequest must be overridden"); 00183 } 00184 00185 protected: 00186 00187 virtual void 00188 doRequestImpl(const AugmentationRequest & request) 00189 { 00190 AugmentationList response = doRequest(request); 00191 respond(request, response); 00192 } 00193 }; 00194 00195 00196 /******************************************************************************/ 00197 /* ASYNC AUGMENTOR BASE */ 00198 /******************************************************************************/ 00199 00206 struct AsyncAugmentor : public MultiThreadedAugmentor 00207 { 00208 00209 AsyncAugmentor(const std::string & augmentorName, 00210 const std::string & serviceName, 00211 std::shared_ptr<ServiceProxies> proxies) : 00212 MultiThreadedAugmentor(augmentorName, serviceName, proxies) 00213 { 00214 doRequest = boost::bind(&AsyncAugmentor::onRequest, this, _1, _2); 00215 } 00216 00217 AsyncAugmentor(const std::string & augmentorName, 00218 const std::string & serviceName, 00219 ServiceBase& parent) : 00220 MultiThreadedAugmentor(augmentorName, serviceName, parent) 00221 { 00222 doRequest = boost::bind(&AsyncAugmentor::onRequest, this, _1, _2); 00223 } 00224 00225 typedef std::function<void (const AugmentationList &)> SendResponseCB; 00226 00227 boost::function<void (const AugmentationRequest &, SendResponseCB) > 00228 doRequest; 00229 00230 virtual void 00231 onRequest(const AugmentationRequest & request, SendResponseCB sendResponse) 00232 { 00233 throw ML::Exception("onRequest or doRequest must be overridden"); 00234 }; 00235 00236 protected: 00237 00238 virtual void 00239 doRequestImpl(const AugmentationRequest & request) 00240 { 00241 auto sendResponse = [=](const AugmentationList & response) { 00242 respond(request, response); 00243 }; 00244 doRequest(request, sendResponse); 00245 } 00246 }; 00247 00248 } // namespace RTBKIT 00249 00250 00251 #endif /* __rtb__augmentor_base_h__ */ 00252 00253
1.7.6.1