![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* bid_aggregator_test.cc 00002 Jeremy Barnes, 31 January 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Class to aggregate up bidding data and allow basic querying and feature 00006 vector generation. 00007 00008 Time: 00009 - Filter by time period 00010 - Generate time series over the time period up to the minute granularity 00011 00012 - Per-minute rollups / time resolution 00013 00014 - Data segmented on 00015 - campaign 00016 - strategy 00017 - exchange 00018 - hour (implicitly) 00019 00020 - Primary attributes (indexed): 00021 - exchange 00022 - format 00023 - creative 00024 - domain 00025 - hour 00026 - minute 00027 - weekday 00028 - url 00029 - country 00030 - state 00031 - city 00032 00033 - Secondary attributes (non-indexed): 00034 - data profile 00035 - browser 00036 - OS 00037 - Anything else in the bid request, bid, click or impression... 00038 - timezone 00039 00040 - Primary metrics 00041 - bid price (mean, median, min, max, std) 00042 - win price (mean, median, min, max, std) 00043 - surplus (mean, median, min, max, std) 00044 - wins 00045 - losses 00046 - impressions (ie, 1 for each row) 00047 - clicks 00048 - eventually, anything that can be expressed as a behaviour 00049 00050 - Aggregators 00051 - sum 00052 - count 00053 - average 00054 - weighted average (x, y) 00055 - ratio (x / y) 00056 00057 Operations: 00058 - PivotTable 00059 - Create a pivot table over the given attributes with the given filter 00060 - TimeSeries 00061 - Create a time series over the given attributes (date + metrics) 00062 - Range 00063 - Dictionary 00064 - Return the possible values and ranges of the given attributes and their overall count (used for ui generation) 00065 - ReturnData 00066 - Return the full bid requests for the given data 00067 00068 Data sizing 00069 - 120,000 MATCHEDWINs are 160MB uncompressed and 28MB gzip compressed 00070 - Assume with attribute memoization and cleanup it's 60GB 00071 - That is $100 per day, ie a small campaign 00072 - Assume 50% wins and 50% losses, so 320MB overall or 60MB compressed 00073 - 30 days = 1.8GB compressed to scan or 10GB uncompressed 00074 - At $1 CPM, 1GB for each $1000 spent 00075 00076 Index format 00077 - sorted heap data structure 00078 - segment on campaign, strategy, hour 00079 - sort by primary attributes with low entropy ones first 00080 - categorical attributes 00081 - first store index of key -> offset 00082 - then store the data itself: either next level or leaf data 00083 00084 - Leaf nodes associated with index: metrics + bid request offset 00085 00086 00087 zcat MATCHEDALL-2012-02-17.log.gz | awk '{ counts[$1] += 1; } END { for (i in counts) print i, counts[i]; }' 00088 MATCHEDWIN 646806 00089 MATCHEDIMPRESSION 597248 00090 MATCHEDCLICK 8 00091 MATCHEDLOSS 2124058 00092 jeremy@ag1:~/projects/platform$ ls -l MATCHEDALL-2012-02-17.log.gz 00093 -rw-r--r-- 1 jeremy jeremy 983665573 2012-02-18 14:04 MATCHEDALL-2012-02-17.log.gz 00094 */ 00095 00096 00097 00098 #define BOOST_TEST_MAIN 00099 #define BOOST_TEST_DYN_LINK 00100 00101 #include <boost/test/unit_test.hpp> 00102 #include "soa/logger/logger.h" 00103 #include "soa/logger/log_message_splitter.h" 00104 #include "rtbkit/common/bid_request.h" 00105 #include "jml/utils/smart_ptr_utils.h" 00106 #include "jml/utils/string_functions.h" 00107 #include "jml/utils/lightweight_hash.h" 00108 #include "jml/utils/json_parsing.h" 00109 #include <unordered_map> 00110 #include "jml/arch/bitops.h" 00111 #include "jml/arch/bit_range_ops.h" 00112 #include "soa/types/id.h" 00113 #include <boost/tuple/tuple.hpp> 00114 00115 #include <boost/iostreams/filtering_stream.hpp> 00116 #include <boost/iostreams/filter/bzip2.hpp> 00117 #include <boost/iostreams/filter/gzip.hpp> 00118 #include <boost/iostreams/device/file.hpp> 00119 #include <boost/iostreams/device/file_descriptor.hpp> 00120 #include <boost/make_shared.hpp> 00121 00122 00123 using namespace std; 00124 using namespace Datacratic; 00125 using namespace ML; 00126 00127 00128 struct StringMap : public ValueMap { 00129 std::unordered_map<string, int> toInt; 00130 std::vector<std::string> toString; 00131 00132 int operator [] (const std::string & str) 00133 { 00134 auto res = toInt.insert(make_pair(str, toInt.size())); 00135 if (res.second) 00136 toString.push_back(str); 00137 return res.first->second; 00138 } 00139 00140 const std::string & operator [] (int i) const 00141 { 00142 if (i >= 0 && i < toString.size()) 00143 return toString[i]; 00144 throw ML::Exception("no stringmap"); 00145 } 00146 00150 virtual int parse(const std::string & str) 00151 { 00152 return operator [] (str); 00153 } 00154 00158 virtual std::string print(int val) const 00159 { 00160 return operator [] (val); 00161 } 00162 00164 virtual size_t size() const 00165 { 00166 return toInt.size(); 00167 } 00168 00170 virtual bool has(const std::string & str) const 00171 { 00172 return toInt.count(str); 00173 } 00174 00178 virtual std::string identifier() const 00179 { 00180 return id; 00181 } 00182 00183 std::string id; 00184 }; 00185 00186 struct FileKey { 00187 string exchange; 00188 string strategy; 00189 string campaign; 00190 00191 bool operator == (const FileKey & other) const 00192 { 00193 return exchange == other.exchange 00194 && strategy == other.strategy 00195 && campaign == other.campaign; 00196 } 00197 00198 bool operator != (const FileKey & other) const 00199 { 00200 return ! operator == (other); 00201 } 00202 00203 bool operator < (const FileKey & other) const 00204 { 00205 return ML::less_all(exchange, other.exchange, 00206 strategy, other.strategy, 00207 campaign, other.campaign); 00208 } 00209 00210 std::string print() const 00211 { 00212 return campaign + "|" + strategy + "|" + exchange; 00213 } 00214 }; 00215 00216 inline std::ostream & operator << (std::ostream & stream, const FileKey & key) 00217 { 00218 return stream << key.print(); 00219 } 00220 00221 using namespace boost::iostreams; 00222 00223 struct SchemaInferrer { 00224 00225 std::shared_ptr<const TypeHandler> currentHandler; 00226 std::vector<std::shared_ptr<const TypeHandler> > oldHandlers; 00227 std::vector<std::string> taxonomy; 00228 00229 SchemaInferrer() 00230 : entries(0), bytesIn(0), bytesOut(0) 00231 { 00232 buffer.reset(new CompressedBuffer()); 00233 } 00234 00235 ~SchemaInferrer() 00236 { 00237 } 00238 00239 std::string stats() 00240 { 00241 return ML::format("%8zd vals, %2zd br, %8zd in, %8zd out, %8zd cmp, %5.2f%% r1, %5.2f%% r2", 00242 entries, taxonomy.size(), 00243 bytesIn, bytesOut, bytesCompressed(), 00244 100.0 * bytesOut / bytesIn, 00245 100.0 * bytesCompressed() / bytesIn); 00246 } 00247 00248 struct CompressedBuffer { 00249 CompressedBuffer() 00250 { 00251 filter.push(gzip_compressor(6)); 00252 filter.push(sink); 00253 } 00254 00255 ~CompressedBuffer() 00256 { 00257 filter.reset(); 00258 } 00259 00260 void close() 00261 { 00262 if (filter) 00263 boost::iostreams::close(filter); 00264 } 00265 00266 void flush() 00267 { 00268 boost::iostreams::flush(filter); 00269 } 00270 00271 size_t bytesOut() const 00272 { 00273 return sink.str().size(); 00274 } 00275 00276 ostringstream sink; 00277 filtering_ostream filter; 00278 }; 00279 00280 std::shared_ptr<CompressedBuffer> buffer; 00281 00282 size_t entries; 00283 size_t bytesIn; 00284 size_t bytesOut; 00285 size_t bytesCompressed() const { return buffer->bytesOut(); } 00286 00287 void close() 00288 { 00289 buffer->close(); 00290 } 00291 00292 void flush() 00293 { 00294 buffer->flush(); 00295 } 00296 00297 Value accept(const std::string & field, ValueManager * owner, 00298 HandlerContext & hContext) 00299 { 00300 ML::Parse_Context context(field, field.data(), field.size()); 00301 00302 try { 00303 std::shared_ptr<const TypeHandler> newHandler; 00304 Value value; 00305 boost::tie(value, newHandler) 00306 = parseJston(context, currentHandler, owner, hContext); 00307 00308 if (newHandler != currentHandler) { 00309 taxonomy.push_back(field); 00310 oldHandlers.push_back(currentHandler); 00311 //cerr << "field " << fields[10] << " induced new handler " 00312 // << handler->typeToJson().toString() << endl; 00313 } 00314 00315 currentHandler = newHandler; 00316 00317 string s = value.toString(); 00318 00319 boost::iostreams::write(buffer->filter, s.c_str(), s.size()); 00320 00321 ++entries; 00322 bytesIn += field.length(); 00323 bytesOut += s.size(); 00324 00325 return value; 00326 } catch (...) { 00327 cerr << "parsing field " << field << endl; 00328 throw; 00329 } 00330 } 00331 }; 00332 00333 struct BidRequestEncoder { 00334 }; 00335 00336 #if 0 00337 struct ValueConstructor : public ValueManager { 00338 std::shared_ptr<TypeHandler> handler; 00339 std::vector<Value> values; 00340 00341 struct Entry { 00342 ValueConstructor * value; 00343 int index; 00344 00345 template<typename T> void operator = (const T & t) 00346 { 00347 ViewHandler<T> vh(handler->getElementType(index)); 00348 vh.set(Value(0, 0, 0, ValueVersion(), 00349 0, index), 00350 t, value); 00351 } 00352 }; 00353 00354 Entry operator [] (int index) 00355 { 00356 return Entry(this, index); 00357 } 00358 00359 Value toValue(ValueManager * owner) const 00360 { 00361 return handler->constructValue(values, owner); 00362 } 00363 00364 virtual Value replaceValue(const Value & element, 00365 Value && replace_with, 00366 const TypeHandler * type) 00367 { 00368 } 00369 00370 virtual Value replaceValue(const Value & element, 00371 const ConstValueBlock & replace_with, 00372 const TypeHandler * type) 00373 { 00374 int index = element.fixed_width(); 00375 values[index] = replace 00376 } 00377 }; 00378 #endif 00379 00380 struct WinLossOutput : public LogOutput { 00381 00382 std::shared_ptr<CompoundHandler> keyHandler; 00383 00384 #if 0 00385 - exchange 00386 - format 00387 - creative 00388 - domain 00389 - hour 00390 - minute 00391 - weekday 00392 - url 00393 - country 00394 - state 00395 - city 00396 #endif 00397 00398 std::shared_ptr<StringMap> formatMap; 00399 std::shared_ptr<StringMap> domainMap; 00400 std::shared_ptr<StringMap> urlMap; 00401 std::shared_ptr<StringMap> countryMap; 00402 std::shared_ptr<StringMap> stateMap; 00403 std::shared_ptr<StringMap> cityMap; 00404 00405 WinLossOutput() 00406 : numMsgs(0), start(Date::now()) 00407 { 00408 using std::make_shared; 00409 keyHandler = make_shared<CompoundHandler>(); 00410 keyHandler->add_field("campaign", make_shared<StringHandler>()); 00411 keyHandler->add_field("strategy", make_shared<StringHandler>()); 00412 keyHandler->add_field("exchange", make_shared<StringHandler>()); 00413 keyHandler->add_field("dateToHour", make_shared<DateHandler>()); 00414 keyHandler->add_field("hourOfDay", make_shared<IntHandler>()); 00415 keyHandler->add_field("weekday", make_shared<IntHandler>()); 00416 keyHandler->add_field("format", make_shared<StringMapHandler>(formatMap)); 00417 keyHandler->add_field("creative", make_shared<IntHandler>()); 00418 keyHandler->add_field("domain", make_shared<StringMapHandler>(domainMap)); 00419 keyHandler->add_field("url", make_shared<StringMapHandler>(urlMap)); 00420 keyHandler->add_field("minute", make_shared<IntHandler>()); 00421 keyHandler->add_field("country", make_shared<StringMapHandler>(countryMap)); 00422 keyHandler->add_field("state", make_shared<StringMapHandler>(stateMap)); 00423 keyHandler->add_field("city", make_shared<StringMapHandler>(cityMap)); 00424 } 00425 00426 00427 size_t numMsgs; 00428 Date start; 00429 00430 StringMap exchanges; 00431 StringMap strategies; 00432 StringMap campaigns; 00433 00434 struct Data { 00435 SchemaInferrer br_schema; 00436 SchemaInferrer md_schema; 00437 }; 00438 00439 std::map<FileKey, Data> data; 00440 00441 HandlerContext hContext; 00442 ValueManager owner; 00443 00444 virtual void logMessage(const std::string & channel, 00445 const std::string & message) 00446 { 00447 if (channel.empty() || message.empty()) return; 00448 00449 if (++numMsgs % 10000 == 0) { 00450 Date now = Date::now(); 00451 cerr << "message " << numMsgs << " in " 00452 << now.secondsSince(start) << "s (" 00453 << 1.0 * numMsgs / now.secondsSince(start) << "/s)" 00454 << endl; 00455 } 00456 00457 //if (numMsgs == 1) 00458 // cerr << "got message " << message << endl; 00459 00460 // Channels: 00461 // MATCHEDLOSS: a bid that we submitted but did not win 00462 // MATCHEDWIN: a bid that we won 00463 00464 // Eventually: 00465 // WINORPHAN: a bid that we won but without an impression 00466 // MATCHEDIMPRESSION: a bid, impression pair with no click 00467 // IMPRESSIONORPHAN: an impression that we won but without a bid req 00468 // MATCHEDCLICK: a bid, impression, click pair 00469 // CLICKORPHAN: a click without a bid request and impression 00470 00471 // Channel: MATCHEDWIN, MATCHEDLOSS, MATCHEDIMPRESSION, MATCHED 00472 // Message: tab separated timestamp, url, providerid 00473 00474 // MATCHEDWIN message 00475 // 0 date 00476 // 1 bid request ID 00477 // 2 spot id 00478 // 3 client 00479 // 4 strategy 00480 // 5 win price micros 00481 // 6 bid price micros 00482 // 7 surplus 00483 // 8 bid request JSON 00484 // 9 bid JSON 00485 // 10 metadata JSON 00486 // 11 creative id --> 30213 00487 // 12 campaign_slug --> netProphets / real time bidding / ron 00488 // 13 strategy_slug --> airtransat_tour_opt 00489 00490 LogMessageSplitter<32> fields(message); 00491 00492 Date timestamp; 00493 { 00494 ML::Parse_Context context(fields[0], fields[0].start, fields[0].end); 00495 timestamp = Date::expect_date_time(context, "%y-%M-%d", "%H:%M:%S"); 00496 if (!context.eof()) 00497 context.exception("expected date"); 00498 } 00499 00500 Id auctionId(fields[1]); 00501 Id spotId(fields[2]); 00502 00503 string client = fields[3]; 00504 string strategy = fields[4]; 00505 00506 int winPrice JML_UNUSED = boost::lexical_cast<int>(fields[5]); 00507 int bidPrice JML_UNUSED = boost::lexical_cast<int>(fields[6]); 00508 double surplus JML_UNUSED = boost::lexical_cast<double>(fields[7]); 00509 00510 string br = fields[8]; 00511 std::shared_ptr<BidRequest> req 00512 (BidRequest::parse("datacratic", br)); 00513 00514 // Find which part of a bid request needs to be removed to make it valid 00515 auto fixBidRequest = [] (const std::string & br) -> std::pair<int, int> 00516 { 00517 pair<int, int> r(0, 0); 00518 int & start = r.first; 00519 int & end = r.second; 00520 Parse_Context context(br, br.c_str(), br.size()); 00521 if (!context.match_literal('{')) return r; 00522 start = context.get_offset(); 00523 if (!context.match_literal("\"version\":")) return r; 00524 string ver; 00525 if (!ML::matchJsonString(context, ver)) return r; 00526 if (!context.match_literal(',')) return r; 00527 end = context.get_offset(); 00528 return r; 00529 }; 00530 00531 int start, end; 00532 boost::tie(start, end) = fixBidRequest(br); 00533 //cerr << "br = " << br << endl; 00534 //cerr << "start = " << start << " end = " << end << endl; 00535 if (start < end) 00536 br.erase(start, end - start); 00537 //cerr << "br fixed = " << br << endl; 00538 00539 #if 0 00540 int spotNum = req->findSpotIndex(spotId); 00541 00542 std::string format = req->imp[spotNum].format(); 00543 #endif 00544 00545 int creativeId JML_UNUSED = boost::lexical_cast<int>(fields[11]); 00546 00547 std::string country JML_UNUSED = req->location.countryCode; 00548 std::string region JML_UNUSED = req->location.regionCode; 00549 Utf8String city JML_UNUSED = req->location.cityName; 00550 00551 #if 0 00552 - domain 00553 - hour 00554 - minute 00555 - weekday 00556 - url 00557 - country 00558 - state 00559 - city 00560 #endif 00561 00562 FileKey key; 00563 key.exchange = req->exchange; 00564 key.campaign = fields[12]; 00565 key.strategy = fields[13]; 00566 00567 Data & fileData = data[key]; 00568 00569 fileData.br_schema.accept(br, &owner, hContext); 00570 fileData.md_schema.accept(fields[10], &owner, hContext); 00571 } 00572 00573 void finish() 00574 { 00575 size_t totalBytesIn = 0, totalBytesOut = 0, totalOutCompressed = 0; 00576 cerr << "got " << data.size() << " files to save" << endl; 00577 for (auto it = data.begin(), end = data.end(); it != end; ++it) { 00578 cerr << it->first << endl; 00579 it->second.md_schema.flush(); 00580 it->second.md_schema.close(); 00581 cerr << " md " << it->second.md_schema.stats() << endl; 00582 totalBytesIn += it->second.md_schema.bytesIn; 00583 totalBytesOut += it->second.md_schema.bytesOut; 00584 totalOutCompressed += it->second.md_schema.bytesCompressed(); 00585 00586 it->second.br_schema.flush(); 00587 it->second.br_schema.close(); 00588 cerr << " br " << it->second.br_schema.stats() << endl; 00589 totalBytesIn += it->second.br_schema.bytesIn; 00590 totalBytesOut += it->second.br_schema.bytesOut; 00591 totalOutCompressed += it->second.br_schema.bytesCompressed(); 00592 } 00593 00594 cerr << "total of " << totalBytesIn << " in, " << totalBytesOut 00595 << " out, ratio = " << 100.0 * totalBytesOut / totalBytesIn 00596 << "%" << endl; 00597 cerr << "compressed total of " << totalBytesOut << " in, " << totalOutCompressed 00598 << " out, ratio = " << 100.0 * totalOutCompressed / totalBytesOut 00599 << "%" << endl; 00600 cerr << "overall compression ratio = " << 100.0 * totalOutCompressed / totalBytesIn 00601 << "%" << endl; 00602 } 00603 00604 virtual void close() 00605 { 00606 } 00607 00608 }; 00609 00610 BOOST_AUTO_TEST_CASE( test_bid_aggregator ) 00611 { 00612 // Create a logger to get the behaviour events from 00613 00614 std::shared_ptr<WinLossOutput> output 00615 (new WinLossOutput()); 00616 00617 Logger logger; 00618 logger.addOutput(output); 00619 00620 logger.start(); 00621 00622 //logger.subscribe("/home/jeremy/platform/rtb_router_logger.ipc", 00623 // vector<string>({ "BEHAVIOUR" })); 00624 00625 uint64_t maxEntries = -1; 00626 //maxEntries = 10000; 00627 logger.replay("MATCHEDWIN-2012-01-31.log.gz", maxEntries); 00628 00629 cerr << "waiting until finished" << endl; 00630 00631 logger.waitUntilFinished(); 00632 00633 cerr << "finished replay" << endl; 00634 00635 output->finish(); 00636 00637 logger.shutdown(); 00638 }