RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* auction.cc 00002 Jeremy Barnes, 6 April 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Auction implementation. 00006 */ 00007 00008 #include "rtbkit/common/auction.h" 00009 #include "jml/arch/exception.h" 00010 #include "jml/arch/format.h" 00011 #include "jml/arch/backtrace.h" 00012 #include "jml/utils/parse_context.h" 00013 #include "jml/utils/less.h" 00014 #include "jml/utils/string_functions.h" 00015 #include "jml/utils/json_parsing.h" 00016 #include "jml/db/persistent.h" 00017 00018 #include "ace/Acceptor.h" 00019 #include <ace/Timer_Heap_T.h> 00020 #include <ace/Synch.h> 00021 #include <ace/Timer_Queue_Adapters.h> 00022 #include "ace/SOCK_Acceptor.h" 00023 #include <ace/High_Res_Timer.h> 00024 #include <ace/Dev_Poll_Reactor.h> 00025 #include <set> 00026 00027 using namespace std; 00028 using namespace ML; 00029 00030 namespace RTBKIT { 00031 00032 /*****************************************************************************/ 00033 /* AUCTION */ 00034 /*****************************************************************************/ 00035 00036 Json::Value 00037 Auction::Price:: 00038 toJson() const 00039 { 00040 Json::Value result; 00041 result["maxPrice"] = maxPrice.toJson(); 00042 result["priority"] = priority; 00043 return result; 00044 } 00045 00046 std::string 00047 Auction::Price:: 00048 toJsonStr() const 00049 { 00050 return format("{\"maxPrice\":%d,\"priority\":%f}", maxPrice, priority); 00051 } 00052 00053 Auction::Price 00054 Auction::Price:: 00055 fromJson(const Json::Value& json) 00056 { 00057 Price result; 00058 00059 result.maxPrice = Amount::fromJson(json["maxPrice"]); 00060 if (json.isMember("priority")) 00061 result.priority = json["priority"].asDouble(); 00062 00063 return result; 00064 } 00065 00066 00067 std::string 00068 Auction::Response:: 00069 print(WinLoss wl) 00070 { 00071 switch (wl) { 00072 case PENDING: return "PENDING"; 00073 case WIN: return "WIN"; 00074 case LOSS: return "LOSS"; 00075 case TOOLATE: return "TOOLATE"; 00076 case INVALID: return "INVALID"; 00077 default: 00078 throw Exception("invalid WinLoss value"); 00079 } 00080 } 00081 00082 Json::Value 00083 Auction::Response:: 00084 toJson() const 00085 { 00086 Json::Value result; 00087 00088 result["price"] = price.toJson(); 00089 result["test"] = test; 00090 result["tagId"] = tagId; 00091 result["bidData"] = bidData; 00092 00093 result["agent"] = agent; 00094 result["account"] = account.toJson(); 00095 result["meta"] = meta; 00096 00097 result["creativeId"] = creativeId; 00098 result["creativeName"] = creativeName; 00099 00100 result["localStatus"] = print(localStatus); 00101 00102 return result; 00103 } 00104 00105 bool 00106 Auction::Response:: 00107 valid() const 00108 { 00109 if (price.maxPrice.value <= 0 00110 || account.empty() 00111 || agent == "" 00112 || tagId == -1) 00113 return false; 00114 return true; 00115 } 00116 00117 void 00118 Auction::Response:: 00119 serialize(DB::Store_Writer & store) const 00120 { 00121 int version = 5; 00122 store << version << price.maxPrice << price.priority 00123 << tagId << account 00124 << test << agent << bidData << meta << creativeId 00125 << creativeName << (int)localStatus << visitChannels; 00126 } 00127 00128 void 00129 Auction::Response:: 00130 reconstitute(DB::Store_Reader & store) 00131 { 00132 int version, localStatusi; 00133 store >> version; 00134 if (version == 1) { 00135 string campaign, strategy; 00136 string tag, click_url; 00137 store >> price.maxPrice >> price.priority 00138 >> tag >> click_url >> tagId >> strategy >> campaign 00139 >> test >> agent >> bidData >> meta >> creativeId 00140 >> creativeName >> localStatusi; 00141 account = { campaign, strategy }; 00142 } 00143 else if (version == 2) { 00144 string campaign, strategy; 00145 int maxPriceUSDMicrosCPM; 00146 store >> maxPriceUSDMicrosCPM >> price.priority 00147 >> tagId >> strategy >> campaign 00148 >> test >> agent >> bidData >> meta >> creativeId 00149 >> creativeName >> localStatusi; 00150 price.maxPrice = MicroUSD_CPM(maxPriceUSDMicrosCPM); 00151 account = { campaign, strategy }; 00152 } 00153 else if (version == 3) { 00154 string campaign, strategy; 00155 store >> price.maxPrice >> price.priority 00156 >> tagId >> strategy >> campaign 00157 >> test >> agent >> bidData >> meta >> creativeId 00158 >> creativeName >> localStatusi; 00159 account = { campaign, strategy }; 00160 } 00161 else if (version == 4) { 00162 store >> price.maxPrice >> price.priority 00163 >> tagId >> account 00164 >> test >> agent >> bidData >> meta >> creativeId 00165 >> creativeName >> localStatusi; 00166 } 00167 else if (version == 5) { 00168 store >> price.maxPrice >> price.priority 00169 >> tagId >> account 00170 >> test >> agent >> bidData >> meta >> creativeId 00171 >> creativeName >> localStatusi >> visitChannels; 00172 } 00173 else throw ML::Exception("reconstituting wrong version"); 00174 localStatus = (WinLoss)localStatusi; 00175 } 00176 00177 Auction:: 00178 Auction() 00179 : isZombie(false), exchangeConnector(nullptr), data(new Data()) 00180 { 00181 } 00182 00183 Auction:: 00184 Auction(ExchangeConnector * exchangeConnector, 00185 HandleAuction handleAuction, 00186 std::shared_ptr<BidRequest> request, 00187 const std::string & requestStr, 00188 const std::string & requestStrFormat, 00189 Date start, 00190 Date expiry) 00191 : isZombie(false), start(start), expiry(expiry), 00192 request(request), 00193 requestStr(requestStr), 00194 requestStrFormat(requestStrFormat), 00195 exchangeConnector(exchangeConnector), 00196 handleAuction(handleAuction), 00197 data(new Data(numSpots())) 00198 { 00199 ML::atomic_add(created, 1); 00200 00201 this->id = request->auctionId; 00202 this->requestSerialized = request->serializeToString(); 00203 } 00204 00205 Auction:: 00206 ~Auction() 00207 { 00208 // Clean up the chain of data pointers 00209 Data * d = data; 00210 while (d) { 00211 Data * d2 = d->oldData; 00212 delete d; 00213 d = d2; 00214 } 00215 00216 ML::atomic_add(destroyed, 1); 00217 } 00218 00219 long long Auction::created = 0; 00220 long long Auction::destroyed = 0; 00221 00222 double 00223 Auction:: 00224 timeAvailable(Date now) const 00225 { 00226 return now.secondsUntil(expiry); 00227 } 00228 00229 double 00230 Auction:: 00231 timeUsed(Date now) const 00232 { 00233 return start.secondsUntil(now); 00234 } 00235 00236 Auction::WinLoss 00237 Auction:: 00238 setResponse(int spotNum, Response newResponse) 00239 { 00240 Data * current = this->data; 00241 00242 if (spotNum < 0 || spotNum >= current->responses.size()) 00243 throw ML::Exception("invalid spot number in response"); 00244 00245 if (newResponse.price.maxPrice.isNegative() 00246 || newResponse.agent == "" 00247 || newResponse.tagId == -1) 00248 return INVALID; 00249 00250 if (current->tooLate) 00251 return TOOLATE; 00252 00253 00254 WinLoss result; 00255 00256 auto_ptr<Data> newData(new Data()); 00257 00258 for (;;) { 00259 if (current->tooLate) 00260 return TOOLATE; 00261 00262 if (current->hasValidResponse(spotNum) 00263 && newResponse.price.priority 00264 <= current->winningResponse(spotNum).price.priority) 00265 return LOSS; 00266 00267 *newData = *current; 00268 00269 bool hasExisting = current->hasValidResponse(spotNum); 00270 00271 result = newResponse.localStatus = PENDING; 00272 newData->responses[spotNum].push_back(newResponse); 00273 00274 if (hasExisting) { 00275 newData->responses[spotNum][0].localStatus = LOSS; 00276 std::swap(newData->responses[spotNum].front(), 00277 newData->responses[spotNum].back()); 00278 } 00279 00280 newData->oldData = current; 00281 00282 if (!ML::cmp_xchg(this->data, current, newData.get())) continue; 00283 newData.release(); 00284 return result; 00285 } 00286 } 00287 00288 const std::vector<std::vector<Auction::Response> > & 00289 Auction:: 00290 getResponses() const 00291 { 00292 const Data * current = this->data; 00293 return current->responses; 00294 } 00295 00296 void 00297 Auction:: 00298 addDataSources(const std::set<std::string> & sources) 00299 { 00300 if (sources.empty()) return; 00301 00302 Data * current = this->data; 00303 unique_ptr<Data> newData(new Data); 00304 00305 for (;;) { 00306 00307 std::set<std::string> newSources = current->dataSources; 00308 newSources.insert(sources.begin(), sources.end()); 00309 00310 // Nothing new was added, just bail. 00311 if (newSources.size() == current->dataSources.size()) return; 00312 00313 if (!newData) newData.reset(new Data); 00314 *newData = *current; 00315 std::swap(newData->dataSources, newSources); 00316 00317 if (!ML::cmp_xchg(this->data, current, newData.get())) continue; 00318 newData.release(); 00319 return; 00320 } 00321 } 00322 00323 const std::set<std::string> & 00324 Auction:: 00325 getDataSources() const 00326 { 00327 const Data * current = this->data; 00328 return current->dataSources; 00329 } 00330 00331 bool 00332 Auction:: 00333 finish() 00334 { 00335 Data * current = this->data; 00336 00337 for (;;) { 00338 if (current->tooLate) 00339 return false; 00340 00341 auto_ptr<Data> newData(new Data(*current)); 00342 00343 for (unsigned spotNum = 0; spotNum < numSpots(); ++spotNum) { 00344 if (newData->hasValidResponse(spotNum)) 00345 newData->responses[spotNum][0].localStatus = WIN; 00346 00347 for (unsigned i = 1; i < newData->responses[spotNum].size(); ++i) 00348 newData->responses[spotNum][i].localStatus = LOSS; 00349 } 00350 00351 newData->oldData = current; 00352 newData->tooLate = true; 00353 00354 if (!ML::cmp_xchg(this->data, current, newData.get())) continue; 00355 newData.release(); 00356 break; 00357 } 00358 00359 handleAuction(shared_from_this()); 00360 00361 return true; 00362 } 00363 00364 bool 00365 Auction:: 00366 setError(const std::string & error, const std::string & details) 00367 { 00368 Data * current = this->data; 00369 00370 for (;;) { 00371 if (current->tooLate) 00372 return false; 00373 00374 auto_ptr<Data> newData(new Data(*current)); 00375 00376 newData->error = error; 00377 newData->details = details; 00378 00379 for (unsigned spotNum = 0; spotNum < numSpots(); ++spotNum) { 00380 for (unsigned i = 0; i < newData->responses[spotNum].size(); ++i) { 00381 newData->responses[spotNum][i].localStatus = LOSS; 00382 } 00383 } 00384 newData->oldData = current; 00385 newData->tooLate = true; 00386 00387 if (!ML::cmp_xchg(this->data, current, newData.get())) continue; 00388 newData.release(); 00389 break; 00390 } 00391 00392 handleAuction(shared_from_this()); 00393 00394 return true; 00395 } 00396 00397 bool 00398 Auction:: 00399 tooLate() 00400 { 00401 return data->tooLate; 00402 } 00403 00404 std::string 00405 Auction:: 00406 status() const 00407 { 00408 Data * current = this->data; 00409 00410 string result = ML::format("Auction: %d imp", (int)numSpots()); 00411 if (current->tooLate) result += " tooLate"; 00412 if (!current->error.empty()) result += " error: " + current->error; 00413 00414 result += " ["; 00415 for (int spotNum = 0; spotNum < numSpots(); ++spotNum) { 00416 if (spotNum != 0) result += "; "; 00417 if (current->hasValidResponse(spotNum)) 00418 result += " winner: " 00419 + current->winningResponse(spotNum).toJson().toString(); 00420 } 00421 result += "]"; 00422 00423 return result; 00424 } 00425 00426 Json::Value 00427 Auction:: 00428 getResponseJson(int spotNum) const 00429 { 00430 Json::Value result; 00431 00432 Data * current = this->data; 00433 00434 if (!current->error.empty()) { 00435 result["error"] = current->error; 00436 result["details"] = current->details; 00437 return result; 00438 } 00439 00440 for (unsigned spotNum = 0; spotNum < numSpots(); ++spotNum) { 00441 if (current->hasValidResponse(spotNum)) 00442 result[spotNum] = current->winningResponse(spotNum).toJson(); 00443 } 00444 return result; 00445 } 00446 00447 const Auction::Price Auction::NONE; 00448 00449 } // namespace RTBKIT 00450