RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* augmentor_base.cc 00002 00003 Jeremy Barnes, 4 March 2012 00004 Copyright (c) 2012 Datacratic. All rights reserved. 00005 00006 Object that handles doing augmented bid requests. 00007 */ 00008 00009 #include "augmentor_base.h" 00010 #include "soa/service/zmq_utils.h" 00011 #include "jml/arch/timers.h" 00012 #include "jml/utils/vector_utils.h" 00013 #include "jml/arch/futex.h" 00014 00015 00016 using namespace std; 00017 using namespace ML; 00018 00019 00020 namespace RTBKIT { 00021 00022 00023 /*****************************************************************************/ 00024 /* AUGMENTOR */ 00025 /*****************************************************************************/ 00026 00027 // Determined via a very scientific method: 2^16 should be enough... right? 00028 enum { QueueSize = 65536 }; 00029 00030 Augmentor:: 00031 Augmentor(const std::string & augmentorName, 00032 const std::string & serviceName, 00033 std::shared_ptr<ServiceProxies> proxies) 00034 : ServiceBase(serviceName, proxies), 00035 augmentorName(augmentorName), 00036 toRouters(getZmqContext()), 00037 responseQueue(QueueSize) 00038 { 00039 } 00040 00041 Augmentor:: 00042 Augmentor(const std::string & augmentorName, 00043 const std::string & serviceName, 00044 ServiceBase& parent) 00045 : ServiceBase(serviceName, parent), 00046 augmentorName(augmentorName), 00047 toRouters(getZmqContext()), 00048 responseQueue(QueueSize) 00049 { 00050 } 00051 00052 Augmentor:: 00053 ~Augmentor() 00054 { 00055 shutdown(); 00056 } 00057 00058 void 00059 Augmentor:: 00060 init() 00061 { 00062 responseQueue.onEvent = [=] (const Response& resp) 00063 { 00064 const AugmentationRequest& request = resp.first; 00065 const AugmentationList& response = resp.second; 00066 00067 toRouters.sendMessage( 00068 request.router, 00069 "RESPONSE", 00070 "1.0", 00071 request.startTime, 00072 request.id.toString(), 00073 request.augmentor, 00074 chomp(response.toJson().toString())); 00075 00076 recordHit("messages.RESPONSE"); 00077 }; 00078 00079 addSource("Augmentor::responseQueue", responseQueue); 00080 00081 toRouters.init(getServices()->config, serviceName()); 00082 00083 toRouters.connectHandler = [=] (const std::string & newRouter) 00084 { 00085 toRouters.sendMessage(newRouter, 00086 "CONFIG", 00087 "1.0", 00088 augmentorName); 00089 00090 recordHit("messages.CONFIG"); 00091 }; 00092 00093 toRouters.disconnectHandler = [=] (const std::string & oldRouter) 00094 { 00095 cerr << "disconnected from router " << oldRouter << endl; 00096 }; 00097 00098 toRouters.messageHandler = [=] (const std::string & router, 00099 const std::vector<std::string> & message) 00100 { 00101 handleRouterMessage(router, message); 00102 }; 00103 00104 00105 toRouters.connectAllServiceProviders("rtbRouterAugmentation", "augmentors"); 00106 00107 addSource("Augmentor::toRouters", toRouters); 00108 00109 00110 double lastSleepTime = 0; 00111 addPeriodic("Augmentor::dutyCycle", 1.0, [=] (uint64_t) mutable { 00112 double sleepTime = totalSleepSeconds(); 00113 recordLevel((sleepTime - lastSleepTime) * 1000.0, "sleepTime"); 00114 lastSleepTime = sleepTime; 00115 }); 00116 } 00117 00118 void 00119 Augmentor:: 00120 start() 00121 { 00122 MessageLoop::start(); 00123 } 00124 00125 void 00126 Augmentor:: 00127 shutdown() 00128 { 00129 MessageLoop::shutdown(); 00130 toRouters.shutdown(); 00131 } 00132 00133 void 00134 Augmentor:: 00135 configureAndWait() 00136 { 00137 #if 0 00138 sendMessage(toRouter, 00139 "CONFIG", 00140 "1.0", 00141 this->serviceName()); 00142 #endif 00143 00144 throw ML::Exception("configureAndWait not re-implemented"); 00145 } 00146 00147 void 00148 Augmentor:: 00149 respond(const AugmentationRequest & request, const AugmentationList & response) 00150 { 00151 if (responseQueue.tryPush(make_pair(request, response))) 00152 return; 00153 00154 cerr << "Dropping augmentation response: response queue is full" << endl; 00155 } 00156 00157 void 00158 Augmentor:: 00159 handleRouterMessage(const std::string & router, 00160 const std::vector<std::string> & message) 00161 { 00162 try { 00163 const std::string & type = message.at(0); 00164 recordHit("messages." + type); 00165 00166 //cerr << "got augmentor message of type " << type << endl; 00167 if (type == "CONFIGOK") { 00168 #if 0 00169 if (!awaitingConfig.empty()) { 00170 for (unsigned i = 0; i < awaitingConfig.size(); ++i) 00171 sendMessage(control, awaitingConfig[i], 00172 "CONFIGOK"); 00173 } 00174 awaitingConfig.clear(); 00175 #endif 00176 } 00177 else if (type == "AUGMENT") { 00178 const string & version = message.at(1); 00179 00180 if (version != "1.0") 00181 throw ML::Exception("unexpected version in augment"); 00182 00183 AugmentationRequest request; 00184 request.router = router; 00185 request.timeAvailableMs = 0.05; 00186 request.augmentor = message.at(2); 00187 request.id = Id(message.at(3)); 00188 00189 const string & bidRequestSource = message.at(4); 00190 const string & bidRequestStr = message.at(5); 00191 00192 istringstream agentsStr(message.at(6)); 00193 ML::DB::Store_Reader reader(agentsStr); 00194 reader.load(request.agents); 00195 00196 const string & startTimeStr = message.at(7); 00197 request.startTime 00198 = Date::fromSecondsSinceEpoch(strtod(startTimeStr.c_str(), 0)); 00199 00200 if (onRequest) { 00201 request.bidRequest.reset( 00202 BidRequest::parse(bidRequestSource, bidRequestStr)); 00203 00204 onRequest(request); 00205 } 00206 else respond(request, AugmentationList()); 00207 } 00208 else throw ML::Exception("unknown router message"); 00209 00210 } catch (const std::exception & exc) { 00211 cerr << "error handling augmentor message " << message 00212 << ": " << exc.what() << endl; 00213 } 00214 } 00215 00216 00217 /*****************************************************************************/ 00218 /* MULTI THREADED AUGMENTOR */ 00219 /*****************************************************************************/ 00220 00221 MultiThreadedAugmentor:: 00222 MultiThreadedAugmentor(const std::string & augmentorName, 00223 const std::string & serviceName, 00224 std::shared_ptr<ServiceProxies> proxies) 00225 : Augmentor(augmentorName, serviceName, proxies), 00226 numWithInfo(0), 00227 ringBuffer(102400) 00228 { 00229 Augmentor::onRequest 00230 = boost::bind(&MultiThreadedAugmentor::pushRequest, this, _1); 00231 numThreadsCreated = 0; 00232 } 00233 00234 MultiThreadedAugmentor:: 00235 MultiThreadedAugmentor(const std::string & augmentorName, 00236 const std::string & serviceName, 00237 ServiceBase& parent) 00238 : Augmentor(augmentorName, serviceName, parent), 00239 numWithInfo(0), 00240 ringBuffer(102400) 00241 { 00242 Augmentor::onRequest 00243 = boost::bind(&MultiThreadedAugmentor::pushRequest, this, _1); 00244 numThreadsCreated = 0; 00245 } 00246 00247 MultiThreadedAugmentor:: 00248 ~MultiThreadedAugmentor() 00249 { 00250 shutdown(); 00251 } 00252 00253 void 00254 MultiThreadedAugmentor:: 00255 init(int numThreads) 00256 { 00257 if (numThreadsCreated) 00258 throw ML::Exception("double init of augmentor"); 00259 00260 Augmentor::init(); 00261 00262 shutdown_ = false; 00263 00264 for (unsigned i = 0; i < numThreads; ++i) 00265 workers.create_thread([=] () { this->runWorker(); }); 00266 00267 numThreadsCreated = numThreads; 00268 } 00269 00270 void 00271 MultiThreadedAugmentor:: 00272 shutdown() 00273 { 00274 shutdown_ = true; 00275 00276 ML::memory_barrier(); 00277 00278 for (unsigned i = 0; i < numThreadsCreated; ++i) 00279 pushRequest(AugmentationRequest()); 00280 00281 workers.join_all(); 00282 00283 numThreadsCreated = 0; 00284 00285 Augmentor::shutdown(); 00286 } 00287 00288 void 00289 MultiThreadedAugmentor:: 00290 runWorker() 00291 { 00292 while (!shutdown_) { 00293 try { 00294 auto req = ringBuffer.pop(); 00295 if (shutdown_) 00296 return; 00297 doRequestImpl(req); 00298 } catch (const std::exception & exc) { 00299 std::cerr << "exception handling aug request: " 00300 << exc.what() << std::endl; 00301 } 00302 } 00303 } 00304 00305 void 00306 MultiThreadedAugmentor:: 00307 pushRequest(const AugmentationRequest & request) 00308 { 00309 ringBuffer.push(request); 00310 } 00311 00312 } // namespace RTBKIT 00313