RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
testing/bid_aggregator_test.cc
00001 /* bid_aggregator_test.cc
00002    Jeremy Barnes, 31 January 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Class to aggregate up bidding data and allow basic querying and feature
00006    vector generation.
00007 
00008    Time:
00009    - Filter by time period
00010    - Generate time series over the time period up to the minute granularity
00011 
00012    - Per-minute rollups / time resolution
00013 
00014    - Data segmented on
00015      - campaign
00016      - strategy
00017      - exchange
00018      - hour (implicitly)
00019 
00020    - Primary attributes (indexed):
00021      - exchange
00022      - format
00023      - creative
00024      - domain
00025      - hour
00026      - minute
00027      - weekday
00028      - url
00029      - country
00030      - state
00031      - city
00032 
00033    - Secondary attributes (non-indexed):
00034      - data profile
00035      - browser
00036      - OS
00037      - Anything else in the bid request, bid, click or impression...   
00038      - timezone
00039 
00040    - Primary metrics
00041      - bid price (mean, median, min, max, std)
00042      - win price (mean, median, min, max, std)
00043      - surplus (mean, median, min, max, std)
00044      - wins
00045      - losses
00046      - impressions (ie, 1 for each row)
00047      - clicks
00048      - eventually, anything that can be expressed as a behaviour
00049 
00050    - Aggregators
00051      - sum
00052      - count
00053      - average
00054      - weighted average (x, y)
00055      - ratio (x / y)
00056 
00057    Operations:
00058      - PivotTable
00059        - Create a pivot table over the given attributes with the given filter
00060      - TimeSeries
00061        - Create a time series over the given attributes (date + metrics)
00062      - Range
00063      - Dictionary
00064        - Return the possible values and ranges of the given attributes and their overall count (used for ui generation)
00065      - ReturnData
00066        - Return the full bid requests for the given data
00067 
00068    Data sizing
00069      - 120,000 MATCHEDWINs are 160MB uncompressed and 28MB gzip compressed
00070      - Assume with attribute memoization and cleanup it's 60GB
00071      - That is $100 per day, ie a small campaign
00072      - Assume 50% wins and 50% losses, so 320MB overall or 60MB compressed
00073      - 30 days = 1.8GB compressed to scan or 10GB uncompressed
00074      - At $1 CPM, 1GB for each $1000 spent
00075 
00076    Index format
00077      - sorted heap data structure
00078      - segment on campaign, strategy, hour
00079      - sort by primary attributes with low entropy ones first
00080      - categorical attributes
00081          - first store index of key -> offset
00082          - then store the data itself: either next level or leaf data
00083 
00084      - Leaf nodes associated with index: metrics + bid request offset
00085 
00086 
00087      zcat MATCHEDALL-2012-02-17.log.gz | awk '{ counts[$1] += 1; } END { for (i in counts) print i, counts[i]; }'
00088      MATCHEDWIN 646806
00089      MATCHEDIMPRESSION 597248
00090      MATCHEDCLICK 8
00091      MATCHEDLOSS 2124058
00092      jeremy@ag1:~/projects/platform$ ls -l MATCHEDALL-2012-02-17.log.gz 
00093      -rw-r--r-- 1 jeremy jeremy 983665573 2012-02-18 14:04 MATCHEDALL-2012-02-17.log.gz
00094 */
00095 
00096 
00097 
00098 #define BOOST_TEST_MAIN
00099 #define BOOST_TEST_DYN_LINK
00100 
00101 #include <boost/test/unit_test.hpp>
00102 #include "soa/logger/logger.h"
00103 #include "soa/logger/log_message_splitter.h"
00104 #include "rtbkit/common/bid_request.h"
00105 #include "jml/utils/smart_ptr_utils.h"
00106 #include "jml/utils/string_functions.h"
00107 #include "jml/utils/lightweight_hash.h"
00108 #include "jml/utils/json_parsing.h"
00109 #include <unordered_map>
00110 #include "jml/arch/bitops.h"
00111 #include "jml/arch/bit_range_ops.h"
00112 #include "soa/types/id.h"
00113 #include <boost/tuple/tuple.hpp>
00114 
00115 #include <boost/iostreams/filtering_stream.hpp>
00116 #include <boost/iostreams/filter/bzip2.hpp>
00117 #include <boost/iostreams/filter/gzip.hpp>
00118 #include <boost/iostreams/device/file.hpp>
00119 #include <boost/iostreams/device/file_descriptor.hpp>
00120 #include <boost/make_shared.hpp>
00121 
00122 
00123 using namespace std;
00124 using namespace Datacratic;
00125 using namespace ML;
00126 
00127 
00128 struct StringMap : public ValueMap {
00129     std::unordered_map<string, int> toInt;
00130     std::vector<std::string> toString;
00131 
00132     int operator [] (const std::string & str)
00133     {
00134         auto res = toInt.insert(make_pair(str, toInt.size()));
00135         if (res.second)
00136             toString.push_back(str);
00137         return res.first->second;
00138     }
00139 
00140     const std::string & operator [] (int i) const
00141     {
00142         if (i >= 0 && i < toString.size())
00143             return toString[i];
00144         throw ML::Exception("no stringmap");
00145     }
00146 
00150     virtual int parse(const std::string & str)
00151     {
00152         return operator [] (str);
00153     }
00154 
00158     virtual std::string print(int val) const
00159     {
00160         return operator [] (val);
00161     }
00162 
00164     virtual size_t size() const
00165     {
00166         return toInt.size();
00167     }
00168 
00170     virtual bool has(const std::string & str) const
00171     {
00172         return toInt.count(str);
00173     }
00174 
00178     virtual std::string identifier() const
00179     {
00180         return id;
00181     }
00182 
00183     std::string id;
00184 };
00185 
00186 struct FileKey {
00187     string exchange;
00188     string strategy;
00189     string campaign;
00190 
00191     bool operator == (const FileKey & other) const
00192     {
00193         return exchange == other.exchange
00194             && strategy == other.strategy
00195             && campaign == other.campaign;
00196     }
00197 
00198     bool operator != (const FileKey & other) const
00199     {
00200         return ! operator == (other);
00201     }
00202 
00203     bool operator < (const FileKey & other) const
00204     {
00205         return ML::less_all(exchange, other.exchange,
00206                             strategy, other.strategy,
00207                             campaign, other.campaign);
00208     }
00209 
00210     std::string print() const
00211     {
00212         return campaign + "|" + strategy + "|" + exchange;
00213     }
00214 };
00215 
00216 inline std::ostream & operator << (std::ostream & stream, const FileKey & key)
00217 {
00218     return stream << key.print();
00219 }
00220 
00221 using namespace boost::iostreams;
00222 
00223 struct SchemaInferrer {
00224 
00225     std::shared_ptr<const TypeHandler> currentHandler;
00226     std::vector<std::shared_ptr<const TypeHandler> > oldHandlers;
00227     std::vector<std::string> taxonomy;
00228 
00229     SchemaInferrer()
00230         : entries(0), bytesIn(0), bytesOut(0)
00231     {
00232         buffer.reset(new CompressedBuffer());
00233     }
00234 
00235     ~SchemaInferrer()
00236     {
00237     }
00238 
00239     std::string stats()
00240     {
00241         return ML::format("%8zd vals, %2zd br, %8zd in, %8zd out, %8zd cmp, %5.2f%% r1, %5.2f%% r2",
00242                           entries, taxonomy.size(),
00243                           bytesIn, bytesOut, bytesCompressed(),
00244                           100.0 * bytesOut / bytesIn,
00245                           100.0 * bytesCompressed() / bytesIn);
00246     }
00247 
00248     struct CompressedBuffer {
00249         CompressedBuffer()
00250         {
00251             filter.push(gzip_compressor(6));
00252             filter.push(sink);
00253         }
00254 
00255         ~CompressedBuffer()
00256         {
00257             filter.reset();
00258         }
00259 
00260         void close()
00261         {
00262             if (filter)
00263                 boost::iostreams::close(filter);
00264         }
00265 
00266         void flush()
00267         {
00268             boost::iostreams::flush(filter);
00269         }
00270         
00271         size_t bytesOut() const
00272         {
00273             return sink.str().size();
00274         }
00275 
00276         ostringstream sink;
00277         filtering_ostream filter;
00278     };
00279 
00280     std::shared_ptr<CompressedBuffer> buffer;
00281 
00282     size_t entries;
00283     size_t bytesIn;
00284     size_t bytesOut;
00285     size_t bytesCompressed() const { return buffer->bytesOut(); }
00286 
00287     void close()
00288     {
00289         buffer->close();
00290     }
00291 
00292     void flush()
00293     {
00294         buffer->flush();
00295     }
00296 
00297     Value accept(const std::string & field, ValueManager * owner,
00298                  HandlerContext & hContext)
00299     {
00300         ML::Parse_Context context(field, field.data(), field.size());
00301         
00302         try {
00303             std::shared_ptr<const TypeHandler> newHandler;
00304             Value value;
00305             boost::tie(value, newHandler)
00306                 = parseJston(context, currentHandler, owner, hContext);
00307 
00308             if (newHandler != currentHandler) {
00309                 taxonomy.push_back(field);
00310                 oldHandlers.push_back(currentHandler);
00311                 //cerr << "field " << fields[10] << " induced new handler "
00312                 //     << handler->typeToJson().toString() << endl;
00313             }
00314             
00315             currentHandler = newHandler;
00316 
00317             string s = value.toString();
00318             
00319             boost::iostreams::write(buffer->filter, s.c_str(), s.size());
00320 
00321             ++entries;
00322             bytesIn += field.length();
00323             bytesOut += s.size();
00324 
00325             return value;
00326         } catch (...) {
00327             cerr << "parsing field " << field << endl;
00328             throw;
00329         }
00330     }
00331 };
00332 
00333 struct BidRequestEncoder {
00334 };
00335 
00336 #if 0
00337 struct ValueConstructor : public ValueManager {
00338     std::shared_ptr<TypeHandler> handler;
00339     std::vector<Value> values;
00340     
00341     struct Entry {
00342         ValueConstructor * value;
00343         int index;
00344 
00345         template<typename T> void operator = (const T & t)
00346         {
00347             ViewHandler<T> vh(handler->getElementType(index));
00348             vh.set(Value(0, 0, 0, ValueVersion(),
00349                          0, index),
00350                    t, value);
00351         }
00352     };
00353 
00354     Entry operator [] (int index)
00355     {
00356         return Entry(this, index);
00357     }
00358 
00359     Value toValue(ValueManager * owner) const
00360     {
00361         return handler->constructValue(values, owner);
00362     }
00363 
00364     virtual Value replaceValue(const Value & element,
00365                                Value && replace_with,
00366                                const TypeHandler * type)
00367     {
00368     }
00369 
00370     virtual Value replaceValue(const Value & element,
00371                                const ConstValueBlock & replace_with,
00372                                const TypeHandler * type)
00373     {
00374         int index = element.fixed_width();
00375         values[index] = replace
00376     }
00377 };
00378 #endif
00379 
00380 struct WinLossOutput : public LogOutput {
00381 
00382     std::shared_ptr<CompoundHandler> keyHandler;
00383 
00384 #if 0
00385      - exchange
00386      - format
00387      - creative
00388      - domain
00389      - hour
00390      - minute
00391      - weekday
00392      - url
00393      - country
00394      - state
00395      - city
00396 #endif
00397 
00398    std::shared_ptr<StringMap> formatMap;
00399    std::shared_ptr<StringMap> domainMap;
00400    std::shared_ptr<StringMap> urlMap;
00401    std::shared_ptr<StringMap> countryMap;
00402    std::shared_ptr<StringMap> stateMap;
00403    std::shared_ptr<StringMap> cityMap;
00404 
00405     WinLossOutput()
00406         : numMsgs(0), start(Date::now())
00407     {
00408         using std::make_shared;
00409         keyHandler = make_shared<CompoundHandler>();
00410         keyHandler->add_field("campaign", make_shared<StringHandler>());
00411         keyHandler->add_field("strategy", make_shared<StringHandler>());
00412         keyHandler->add_field("exchange", make_shared<StringHandler>());
00413         keyHandler->add_field("dateToHour", make_shared<DateHandler>());
00414         keyHandler->add_field("hourOfDay", make_shared<IntHandler>());
00415         keyHandler->add_field("weekday", make_shared<IntHandler>());
00416         keyHandler->add_field("format", make_shared<StringMapHandler>(formatMap));
00417         keyHandler->add_field("creative", make_shared<IntHandler>());
00418         keyHandler->add_field("domain", make_shared<StringMapHandler>(domainMap));
00419         keyHandler->add_field("url", make_shared<StringMapHandler>(urlMap));
00420         keyHandler->add_field("minute", make_shared<IntHandler>());
00421         keyHandler->add_field("country", make_shared<StringMapHandler>(countryMap));
00422         keyHandler->add_field("state", make_shared<StringMapHandler>(stateMap));
00423         keyHandler->add_field("city", make_shared<StringMapHandler>(cityMap));
00424     }
00425 
00426 
00427     size_t numMsgs;
00428     Date start;
00429 
00430     StringMap exchanges;
00431     StringMap strategies;
00432     StringMap campaigns;
00433 
00434     struct Data {
00435         SchemaInferrer br_schema;
00436         SchemaInferrer md_schema;
00437     };
00438 
00439     std::map<FileKey, Data> data;
00440 
00441     HandlerContext hContext;
00442     ValueManager owner;
00443 
00444     virtual void logMessage(const std::string & channel,
00445                             const std::string & message)
00446     {
00447         if (channel.empty() || message.empty()) return;
00448 
00449         if (++numMsgs % 10000 == 0) {
00450             Date now = Date::now();
00451             cerr << "message " << numMsgs << " in "
00452                  << now.secondsSince(start) << "s ("
00453                  << 1.0 * numMsgs / now.secondsSince(start) << "/s)"
00454                  << endl;
00455         }
00456 
00457         //if (numMsgs == 1)
00458         //    cerr << "got message " << message << endl;
00459 
00460         // Channels:
00461         // MATCHEDLOSS:  a bid that we submitted but did not win
00462         // MATCHEDWIN:   a bid that we won
00463         
00464         // Eventually:
00465         // WINORPHAN:    a bid that we won but without an impression
00466         // MATCHEDIMPRESSION: a bid, impression pair with no click
00467         // IMPRESSIONORPHAN: an impression that we won but without a bid req
00468         // MATCHEDCLICK: a bid, impression, click pair
00469         // CLICKORPHAN:  a click without a bid request and impression
00470         
00471         // Channel: MATCHEDWIN, MATCHEDLOSS, MATCHEDIMPRESSION, MATCHED
00472         // Message: tab separated timestamp, url, providerid
00473 
00474         // MATCHEDWIN message
00475         //  0      date
00476         //  1      bid request ID
00477         //  2      spot id
00478         //  3      client
00479         //  4      strategy
00480         //  5      win price micros
00481         //  6      bid price micros
00482         //  7      surplus
00483         //  8      bid request JSON
00484         //  9      bid JSON
00485         // 10      metadata JSON
00486         // 11      creative id   --> 30213
00487         // 12      campaign_slug --> netProphets / real time bidding / ron
00488         // 13      strategy_slug --> airtransat_tour_opt
00489 
00490         LogMessageSplitter<32> fields(message);
00491 
00492         Date timestamp;
00493         {
00494             ML::Parse_Context context(fields[0], fields[0].start, fields[0].end);
00495             timestamp = Date::expect_date_time(context, "%y-%M-%d", "%H:%M:%S");
00496             if (!context.eof())
00497                 context.exception("expected date");
00498         }
00499 
00500         Id auctionId(fields[1]);
00501         Id spotId(fields[2]);
00502 
00503         string client = fields[3];
00504         string strategy = fields[4];
00505         
00506         int winPrice JML_UNUSED = boost::lexical_cast<int>(fields[5]);
00507         int bidPrice JML_UNUSED = boost::lexical_cast<int>(fields[6]);
00508         double surplus JML_UNUSED = boost::lexical_cast<double>(fields[7]);
00509 
00510         string br = fields[8];
00511         std::shared_ptr<BidRequest> req
00512             (BidRequest::parse("datacratic", br));
00513 
00514         // Find which part of a bid request needs to be removed to make it valid
00515         auto fixBidRequest = [] (const std::string & br) -> std::pair<int, int>
00516             {
00517                 pair<int, int> r(0, 0);
00518                 int & start = r.first;
00519                 int & end = r.second;
00520                 Parse_Context context(br, br.c_str(), br.size());
00521                 if (!context.match_literal('{')) return r;
00522                 start = context.get_offset();
00523                 if (!context.match_literal("\"version\":")) return r;
00524                 string ver;
00525                 if (!ML::matchJsonString(context, ver)) return r;
00526                 if (!context.match_literal(',')) return r;
00527                 end = context.get_offset();
00528                 return r;
00529             };
00530 
00531         int start, end;
00532         boost::tie(start, end) = fixBidRequest(br);
00533         //cerr << "br = " << br << endl;
00534         //cerr << "start = " << start << " end = " << end << endl;
00535         if (start < end)
00536             br.erase(start, end - start);
00537         //cerr << "br fixed = " << br << endl;
00538 
00539 #if 0
00540         int spotNum = req->findSpotIndex(spotId);
00541         
00542         std::string format = req->imp[spotNum].format();
00543 #endif
00544 
00545         int creativeId JML_UNUSED = boost::lexical_cast<int>(fields[11]);
00546 
00547         std::string country JML_UNUSED = req->location.countryCode;
00548         std::string region JML_UNUSED = req->location.regionCode;
00549         Utf8String city JML_UNUSED = req->location.cityName;
00550         
00551 #if 0
00552      - domain
00553      - hour
00554      - minute
00555      - weekday
00556      - url
00557      - country
00558      - state
00559      - city
00560 #endif
00561 
00562         FileKey key;
00563         key.exchange = req->exchange;
00564         key.campaign = fields[12];
00565         key.strategy = fields[13];
00566 
00567         Data & fileData = data[key];
00568 
00569         fileData.br_schema.accept(br, &owner, hContext);
00570         fileData.md_schema.accept(fields[10], &owner, hContext);
00571     }
00572 
00573     void finish()
00574     {
00575         size_t totalBytesIn = 0, totalBytesOut = 0, totalOutCompressed = 0;
00576         cerr << "got " << data.size() << " files to save" << endl;
00577         for (auto it = data.begin(), end = data.end(); it != end;  ++it) {
00578             cerr << it->first << endl;
00579             it->second.md_schema.flush();
00580             it->second.md_schema.close();
00581             cerr << "  md " << it->second.md_schema.stats() << endl;
00582             totalBytesIn += it->second.md_schema.bytesIn;
00583             totalBytesOut += it->second.md_schema.bytesOut;
00584             totalOutCompressed += it->second.md_schema.bytesCompressed();
00585 
00586             it->second.br_schema.flush();
00587             it->second.br_schema.close();
00588             cerr << "  br " << it->second.br_schema.stats() << endl;
00589             totalBytesIn += it->second.br_schema.bytesIn;
00590             totalBytesOut += it->second.br_schema.bytesOut;
00591             totalOutCompressed += it->second.br_schema.bytesCompressed();
00592         }
00593 
00594         cerr << "total of " << totalBytesIn << " in, " << totalBytesOut
00595              << " out, ratio = " << 100.0 * totalBytesOut / totalBytesIn
00596              << "%" << endl;
00597         cerr << "compressed total of " << totalBytesOut << " in, " << totalOutCompressed
00598              << " out, ratio = " << 100.0 * totalOutCompressed / totalBytesOut
00599              << "%" << endl;
00600         cerr << "overall compression ratio = " << 100.0 * totalOutCompressed / totalBytesIn
00601              << "%" << endl;
00602     }
00603 
00604     virtual void close()
00605     {
00606     }
00607 
00608 };
00609 
00610 BOOST_AUTO_TEST_CASE( test_bid_aggregator )
00611 {
00612     // Create a logger to get the behaviour events from
00613 
00614     std::shared_ptr<WinLossOutput> output
00615         (new WinLossOutput());
00616 
00617     Logger logger;
00618     logger.addOutput(output);
00619 
00620     logger.start();
00621 
00622     //logger.subscribe("/home/jeremy/platform/rtb_router_logger.ipc",
00623     //                 vector<string>({ "BEHAVIOUR" }));
00624 
00625     uint64_t maxEntries = -1;
00626     //maxEntries = 10000;
00627     logger.replay("MATCHEDWIN-2012-01-31.log.gz", maxEntries);
00628 
00629     cerr << "waiting until finished" << endl;
00630 
00631     logger.waitUntilFinished();
00632 
00633     cerr << "finished replay" << endl;
00634 
00635     output->finish();
00636 
00637     logger.shutdown();
00638 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator