RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/json_filter.cc
00001 /* json_filter.cc
00002    Jeremy Barnes, 5 June 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Filters to pre-compress JSON quickly.
00006 */
00007 
00008 #include "json_filter.h"
00009 #include "jml/db/portable_oarchive.h"
00010 #include <unordered_map>
00011 #include <map>
00012 #include "jml/utils/hash_specializations.h"
00013 #include "jml/utils/vector_utils.h"
00014 #include "jml/arch/format.h"
00015 
00016 using namespace std;
00017 using namespace ML;
00018 
00019 
00020 namespace Datacratic {
00021 
00022 /*****************************************************************************/
00023 /* JSON COMPRESSOR                                                           */
00024 /*****************************************************************************/
00025 
00026 struct JsonContext {
00027     JsonContext()
00028         : stringNum(0)
00029     {
00030         size_t buf_size = 65536;
00031         start = new char[buf_size];
00032         current = start;
00033         end = start + buf_size;
00034     }
00035 
00036     ~JsonContext()
00037     {
00038         //cerr << "stringNum = " << stringNum << endl;
00039     }
00040 
00041     void writeByte(uint8_t byte)
00042     {
00043         if (JML_UNLIKELY(current >= end)) {
00044             //cerr << "overflow " << (int)byte << endl;
00045             overflow += byte;
00046             return;
00047         }
00048         *current++ = byte;
00049     }
00050 
00051     void writeSize(size_t size)
00052     {
00053         int len = DB::compact_encode_length(size);
00054 
00055         if (JML_UNLIKELY(current + len > end)) {
00056             char buf[len];
00057             char * b = buf;
00058             DB::encode_compact(b, buf + len, size);
00059             overflow.append(buf, 0, len);
00060             return;
00061         }
00062 
00063         DB::encode_compact(current, end, size);
00064     }
00065 
00066     void writeString(const std::string & s)
00067     {
00068         writeSize(s.length());
00069         writeBinary(s.c_str(), s.length());
00070     }
00071     
00072     void writeString(const char * first, const char * last)
00073     {
00074         size_t sz = last - first;
00075         writeSize(sz);
00076         writeBinary(first, last);
00077     }
00078 
00079     void writeBinary(const char * first, const char * last)
00080     {
00081         writeBinary(first, last - first);
00082     }
00083 
00084     void writeBinary(const char * first, size_t sz)
00085     {
00086         if (JML_UNLIKELY(current + sz > end)) {
00087             overflow.append(first, first + sz);
00088             return;
00089         }
00090 
00091         std::copy(first, first + sz, current);
00092         current += sz;
00093     }
00094     
00095     void reset()
00096     {
00097     }
00098 
00099     void writeInternedString(const std::string & s)
00100     {
00101         auto itC = commonStrings.find(s);
00102 
00103         if (itC != commonStrings.end()) {
00104             int idx = itC->second;
00105             if (idx < 128) {
00106                 writeByte(idx + 128);
00107                 return;
00108             }
00109             else {
00110                 writeByte('C');
00111                 writeByte(idx - 128);
00112                 return;
00113             }
00114         }
00115 
00116         auto it = lastStrings.find(s);
00117 
00118         if (it != lastStrings.end()) {
00119             // It's already there; write a reference to it
00120             size_t n = it->second.index;
00121             ++it->second.count;
00122             writeByte('R');  // interned string reference
00123             writeSize(n);
00124         }
00125         else {
00126             // not there
00127             lastStrings.insert(make_pair(s, StringEntry(stringNum, 0))).second;
00128             //cerr << "string " << s << " is at ID " << stringNum << endl;
00129             ++stringNum;
00130             writeByte('S');  // interned string definition
00131             writeString(s);
00132         }
00133         
00134         if (stringNum == 5000)
00135             sortStrings();
00136     }
00137 
00138     void sortStrings()
00139     {
00140         writeByte('<');
00141         vector<pair<size_t, std::string> > counts;
00142         counts.reserve(lastStrings.size());
00143         
00144         for (auto it = lastStrings.begin(), end = lastStrings.end();  it != end;  ++it) {
00145             size_t count = it->second.count;
00146             if (count < 2) continue;
00147             counts.push_back(make_pair(count, it->first));
00148         }
00149 
00150         std::sort(counts.begin(), counts.end(), std::greater<std::pair<size_t, string> >());
00151 
00152         commonStrings.clear();
00153 
00154         for (unsigned i = 0;  i < 256 + 128 && i < counts.size();  ++i)
00155             commonStrings[counts[i].second] = i;
00156     }
00157 
00158     struct StringEntry {
00159         StringEntry(int index = 0, int count = 0)
00160             : index(index), count(count)
00161         {
00162         }
00163 
00164         size_t index;
00165         size_t count;
00166     };
00167 
00168 #if 0
00169     struct StringHash {
00170         size_t operator () (const std::string & str) const
00171         {
00172             size_t l = str.length();
00173             switch (l) {
00174             case 0: return 0;
00175             case 1: return ML::chain_hash(str[0], 0);
00176             case 2: return ML::chain_hash(str[0], str[1]);
00177             case 3: return ML::chain_hash((str[0] << 8) + str[1], str[2]);
00178             default:
00179                 return ML::chain_hash((str[0] << 24)
00180                                       + (str[1] << 16)
00181                                       + (str[l - 1] << 8)
00182                                       + l,
00183                                       (str[l << 1] << 16) + (str[(l << 1) + 1] << 8));
00184             }
00185         }
00186     };
00187 #endif
00188 
00189     std::unordered_map<std::string, int> commonStrings;
00190 
00191     std::unordered_map<std::string, StringEntry> lastStrings;
00192     size_t stringNum;
00193 
00194     void writeOutput(Filter::OnOutput onOutput,
00195                      FlushLevel level,
00196                      boost::function<void ()> onMessageDone)
00197     {
00198         if (overflow.empty()) {
00199             onOutput(start, current - start, level, onMessageDone);
00200         }
00201         else {
00202             onOutput(start, current - start, FLUSH_NONE, []{});
00203             onOutput(overflow.c_str(), overflow.length(), level, onMessageDone);
00204             overflow.clear();
00205         }
00206         current = start;
00207     }
00208     
00209     char * start;
00210     char * current;
00211     char * end;
00212     
00213     std::string overflow;
00214 };
00215 
00216 struct ParseState {
00217     ParseState(JsonContext & context)
00218         : context(context)
00219     {
00220     }
00221 
00222     virtual ~ParseState()
00223     {
00224     }
00225 
00226     enum ProcessResult {
00227         CONTINUE,   // State should continue
00228         FINISHED,   // State has finished without an error
00229         ERROR       // State had an error; parent should back out to before
00230     };
00231 
00232     JsonContext & context;
00233 
00234     virtual ProcessResult
00235     process(const char * & first, const char * last) = 0;
00236 
00237     virtual ProcessResult flush() = 0;
00238 };
00239 
00240 struct StringState : public ParseState {
00241 
00242     enum State {
00243         IN_STRING,
00244         AFTER_BACKSLASH,
00245     } state;
00246 
00247     StringState(JsonContext & context)
00248         : ParseState(context), state(IN_STRING)
00249     {
00250         current.reserve(64);
00251         current = "\"";
00252     }
00253 
00254     virtual ProcessResult
00255     process(const char * & first, const char * last)
00256     {
00257         while (first < last) {
00258             switch (state) {
00259             case IN_STRING: {
00260                 const char * start = first;
00261                 while (first < last && *first != '"' && *first != '\\')
00262                     ++first;
00263 
00264                 if (first == last) {
00265                     current.append(start, first);
00266                     return CONTINUE;
00267                 }
00268                 
00269                 char c = *first++;
00270 
00271                 if (c == '"') {
00272                     writeOutput(start, first);
00273                     return FINISHED;
00274                 }
00275                 else if (c == '\\') {
00276                     state = AFTER_BACKSLASH;
00277                 }
00278                 break;
00279             }
00280 
00281             case AFTER_BACKSLASH:
00282                 current += *first++;
00283                 state = IN_STRING;
00284                 break;
00285 
00286             default:
00287                 throw Exception("invalid state");
00288             };
00289         }
00290         return CONTINUE;
00291     }
00292 
00293     virtual ProcessResult flush()
00294     {
00295         writeOutput();
00296         return CONTINUE;
00297     }
00298     
00299     ProcessResult do_c(char c)
00300     {
00301         return CONTINUE;
00302     }
00303 
00304     void writeOutput(const char * first = 0, const char * last = 0)
00305     {
00306         size_t slen = current.size() + (last - first);
00307         if (slen > 1 && slen < 32) {
00308             if (current.empty())
00309                 context.writeInternedString(string(first, last));
00310             else {
00311                 current.append(first, last);
00312                 context.writeInternedString(current);
00313                 current.clear();
00314             }
00315         }
00316         else {
00317             context.writeByte('s');
00318             context.writeSize(slen);
00319             if (!current.empty()) {
00320                 context.writeBinary(current.c_str(), current.length());
00321                 current.clear();
00322             }
00323             context.writeBinary(first, last);
00324         }
00325     }
00326 
00327     std::string current;
00328 };
00329 
00330 struct RootState : public ParseState {
00331 
00332     RootState(JsonContext & context)
00333         : ParseState(context)
00334     {
00335     }
00336     
00337     virtual ProcessResult process(const char * & first, const char * last)
00338     {
00339         ProcessResult result = CONTINUE;
00340 
00341         for (; first < last;  /* no inc */) {
00342             if (next) {
00343                 ProcessResult nextResult = next->process(first, last);
00344                 if (nextResult == CONTINUE && first != last)
00345                     throw Exception("asked to continue but didn't consume all input");
00346                 if (nextResult == FINISHED)
00347                     next.reset();
00348             }
00349             else {
00350                 const char * start = first;
00351                 while (first < last && *first != '"')
00352                     ++first;
00353 
00354 
00355                 if (first == last) {
00356                     current.append(start, first);
00357                     return CONTINUE;
00358                 }
00359 
00360                 if (*first == '"') {
00361                     writeOutput(start, first);
00362                     ++first;
00363                     next.reset(new StringState(context));
00364                 }
00365                 else throw Exception("out of sync");
00366             }
00367         }
00368 
00369         if (first > last)
00370             throw Exception("first > last");
00371         
00372         return result;
00373     }
00374 
00375     virtual ProcessResult flush()
00376     {
00377         if (next)
00378             return next->flush();
00379         else {
00380             writeOutput();
00381             return CONTINUE;
00382         }
00383     }
00384 
00385     void writeOutput(const char * first = 0, const char * last = 0)
00386     {
00387         size_t slen = current.size() + (last - first);
00388         if (slen > 1 && slen < 32) {
00389             if (current.empty())
00390                 context.writeInternedString(string(first, last));
00391             else {
00392                 current.append(first, last);
00393                 context.writeInternedString(current);
00394                 current.clear();
00395             }
00396         }
00397         else {
00398             context.writeByte('r');
00399             context.writeSize(slen);
00400             if (!current.empty()) {
00401                 context.writeBinary(current.c_str(), current.length());
00402                 current.clear();
00403             }
00404             context.writeBinary(first, last);
00405         }
00406     }
00407 
00408     void reset()
00409     {
00410     }
00411     
00412     std::string current;
00413     std::shared_ptr<ParseState> next;
00414 };
00415 
00416 
00417 
00418 struct JsonCompressor::Itl {
00419     Itl()
00420         : state(context)
00421     {
00422     }
00423 
00424     JsonContext context;
00425     RootState state;
00426 
00427     void reset()
00428     {
00429         context.reset();
00430         state.reset();
00431     }
00432 };
00433 
00434 JsonCompressor::
00435 JsonCompressor()
00436     : itl(new Itl())
00437 {
00438 }
00439 
00440 JsonCompressor::
00441 ~JsonCompressor()
00442 {
00443 }
00444 
00445 void
00446 JsonCompressor::
00447 process(const char * src_begin, const char * src_end,
00448         FlushLevel level,
00449         boost::function<void ()> onMessageDone)
00450 {
00451     itl->state.process(src_begin, src_end);
00452     if (level != FLUSH_NONE)
00453         itl->state.flush();
00454     if (level == FLUSH_FULL)
00455         itl->reset();
00456     itl->context.writeOutput(onOutput, level, onMessageDone);
00457 }
00458 
00459 
00460 
00461 /*****************************************************************************/
00462 /* JSON DECOMPRESSOR                                                         */
00463 /*****************************************************************************/
00464 
00465 struct JsonDecompressor::Itl {
00466 
00467     struct State;
00468 
00469     typedef void (Itl::* OnDataFn) (const char * &, const char *, State &);
00470     typedef void (Itl::* OnFinishedFn) (State & current, State & parent);
00471 
00472     struct State {
00473         State()
00474             : onData(0), onFinished(0), param(0), len(0), done(0), phase(0)
00475         {
00476         }
00477 
00478         // Thing to call once we get data
00479         OnDataFn onData;
00480         OnFinishedFn onFinished;
00481         void * param;
00482 
00483         size_t len, done;
00484         int phase;
00485         std::string str;
00486     };
00487 
00488     std::vector<State> states;
00489 
00490     Itl()
00491     {
00492         states.reserve(20);
00493         pushState(&Itl::processBase);
00494     }
00495 
00496     State & pushState(OnDataFn onData, OnFinishedFn onFinished = 0,
00497                       void * param = 0)
00498     {
00499         states.push_back(State());
00500         states.back().onData = onData;
00501         states.back().onFinished = onFinished;
00502         states.back().param = param;
00503 
00504         return states.back();
00505     }
00506 
00507     void popState()
00508     {
00509         if (states.size() < 2)
00510             throw Exception("pop of base state");
00511         State & current = states.back();
00512 
00513         if (!current.onFinished) {
00514             states.pop_back();
00515             return;
00516         }
00517 
00518         State & parent = states[states.size() - 2];
00519         
00520         (this ->* current.onFinished) (current, parent);
00521 
00522         states.pop_back();
00523     }
00524 
00525     void process(const char * first, const char * last)
00526     {
00527         while (first != last) {
00528             if (states.empty())
00529                 throw Exception("empty state");
00530             (this ->* states.back().onData)(first, last, states.back());
00531         }
00532     }
00533 
00534     void processBase(const char * & first, const char * last, State & state)
00535     {
00536         //cerr << "processBase" << endl;
00537 
00538         if (first >= last)
00539             throw Exception("processing with no characters");
00540         uint8_t c = *first++;
00541 
00542         switch (c) {
00543 
00544         case 'r':
00545             pushState(&Itl::processRaw);
00546             return;
00547 
00548         case 's':
00549             pushState(&Itl::processString);
00550             return;
00551 
00552         case 'S':
00553             pushState(&Itl::processInternedStringDefinition);
00554             return;
00555 
00556         case 'R':
00557             pushState(&Itl::processInternedStringReference);
00558             return;
00559 
00560         case '<':
00561             sortStrings();
00562             return;
00563 
00564         case 'C':
00565             pushState(&Itl::processCommonString);
00566             return;
00567 
00568         default:
00569             if (c >= 128)
00570                 current << commonStrings.at(c - 128);
00571             else
00572                 throw Exception("unknown state character %d %c", c, c);
00573         }
00574     }
00575 
00576     void processString(const char * & first, const char * last, State & state)
00577     {
00578         //cerr << "processing string input phase " << state.phase
00579         //     << " with " << last - first << " characters depth "
00580         //     << states.size() << endl;
00581 
00582         if (first >= last)
00583             throw Exception("processing with no characters");
00584 
00585         switch (state.phase) {
00586 
00587         case 0: // reading length
00588             // TODO: if length is inline then do that to avoid indirection
00589 
00590             pushState(&Itl::processLength, &Itl::doneStringLength);
00591             break;
00592 
00593         case 1: { // reading data
00594             size_t avail = last - first;
00595             size_t toRead = std::min(avail, state.len - state.done);
00596             
00597             //cerr << "string is " << std::string(first, first + toRead)
00598             //     << endl;
00599 
00600             current.write(first, toRead);
00601             first += toRead;
00602             state.done += toRead;
00603 
00604             //cerr << "string: done " << state.done << " len "
00605             //     << state.len << endl;
00606 
00607             if (state.done == state.len)
00608                 popState();
00609             break;
00610         }
00611         default:
00612             throw Exception("processString: invalid phase");
00613         }
00614     }
00615 
00616     void doneStringLength(State & current, State & parent)
00617     {
00618         //cerr << "string is of length " << current.len << endl;
00619         parent.done = 0;
00620         parent.len = current.len;
00621         parent.phase = 1;
00622     }
00623 
00624     void processRaw(const char * & first, const char * last, State & state)
00625     {
00626         //cerr << "processing raw input" << endl;
00627         return processString(first, last, state);
00628     }
00629 
00630     void doneInternedStringLength(State & current, State & parent)
00631     {
00632         //cerr << "string is of length " << current.len << endl;
00633         parent.done = 0;
00634         parent.len = current.len;
00635         parent.phase = 1;
00636         parent.str.reserve(current.len);
00637     }
00638 
00639     void processInternedStringDefinition(const char * & first,
00640                                          const char * last,
00641                                          State & state)
00642     {
00643         if (first >= last)
00644             throw Exception("processing with no characters");
00645 
00646         switch (state.phase) {
00647 
00648         case 0: // reading length
00649             // TODO: if length is inline then do that to avoid indirection
00650 
00651             pushState(&Itl::processLength, &Itl::doneInternedStringLength);
00652             break;
00653 
00654         case 1: { // reading data
00655             size_t avail = last - first;
00656             size_t toRead = std::min(avail, state.len - state.done);
00657             
00658             state.str.append(first, first + toRead);
00659             first += toRead;
00660             state.done += toRead;
00661 
00662             if (state.done == state.len) {
00663                 //cerr << "defined interned string " << state.str << " at "
00664                 //     << currentStrings.size() << endl;
00665                 current << state.str;
00666                 currentStrings.push_back(make_pair(1, state.str));
00667                 popState();
00668             }
00669             break;
00670         }
00671         default:
00672             throw Exception("processString: invalid phase");
00673         }
00674     }
00675 
00676     void doneInternedStringReference(State & current, State & parent)
00677     {
00678         parent.phase = 1;
00679         parent.str = currentStrings.at(current.len).second;
00680         currentStrings[current.len].first += 1;
00681         //cerr << "reference to interned string " << current.len
00682         //     << " -> " << parent.str << endl;
00683     }
00684 
00685     void processInternedStringReference(const char * & first,
00686                                         const char * last,
00687                                         State & state)
00688     {
00689         //cerr << "doing interned string reference phase "
00690         //     << state.phase << endl;
00691 
00692         if (first >= last)
00693             throw Exception("processing with no characters");
00694 
00695         switch (state.phase) {
00696 
00697         case 0: // reading length
00698             pushState(&Itl::processLength, &Itl::doneInternedStringReference);
00699             break;
00700 
00701         case 1: // finished
00702             current << state.str;
00703             popState();
00704             break;
00705             
00706         default:
00707             throw Exception("processString: invalid phase");
00708         }
00709     }
00710     
00711     void processCommonString(const char * & first, const char * last, State & state)
00712     {
00713         const uint8_t * p = reinterpret_cast<const uint8_t *>(first);
00714         int idx = *p;
00715         current << commonStrings.at(idx + 128);
00716         ++first;
00717         popState();
00718     }
00719 
00723     void processLength(const char * & first, const char * last, State & state)
00724     {
00725         //cerr << "processing length" << endl;
00726         // Length of the length in bytes
00727         size_t len = ML::DB::compact_decode_length(*first);
00728         size_t avail = last - first;
00729 
00730         // If we have enough data, get the length
00731         if (avail >= len) {
00732             state.len = ML::DB::decode_compact(first, last);
00733             //cerr << "length is " << state.len << endl;
00734             popState();
00735             return;
00736         }
00737         
00738         // Otherwise, require more data
00739         requireData(len, first, last);
00740     }
00741 
00742     void requireData(size_t amount, const char * & first, const char * last)
00743     {
00744         //cerr << "requiring " << amount << " bytes" << endl;
00745 
00746         size_t avail = last - first;
00747         if (avail >= amount)
00748             throw Exception("required amount already available");
00749         State & state = pushState(&Itl::processRequireData);
00750         state.len = amount;
00751         state.str.reserve(amount);
00752         state.str.append(first, last);
00753         first = last;
00754     }
00755 
00756     void processRequireData(const char * & first, const char * last,
00757                             State & state)
00758     {
00759         // Buffer until we get enough
00760         size_t avail = last - first;
00761         size_t toRead = std::min(avail, state.len - state.str.size());
00762         state.str.append(first, first + toRead);
00763         first += toRead;
00764 
00765         if (state.len == state.str.size()) {
00766             // Enough buffered; pass it on to the previous entry in the
00767             // stack.
00768             string s = state.str;  // copy so it doesn't go out of scope
00769             states.pop_back();
00770             process(s.c_str(), s.c_str() + s.length());
00771         }
00772     }
00773     
00774     void writeOutput(Filter::OnOutput onOutput,
00775                      FlushLevel level,
00776                      boost::function<void ()> onMessageDone)
00777     {
00778         string s = current.str();
00779         if (!s.empty())
00780             onOutput(s.c_str(), s.length(), level, onMessageDone);
00781         else onMessageDone();
00782         current.str("");
00783     }
00784 
00785     void sortStrings()
00786     {
00787         vector<pair<size_t, std::string> > counts = currentStrings;
00788         std::sort(counts.begin(), counts.end(), std::greater<std::pair<size_t, string> >());
00789 
00790         commonStrings.clear();
00791 
00792         for (unsigned i = 0;  i < 256 + 128 && i < counts.size();  ++i)
00793             commonStrings.push_back(counts[i].second);
00794         
00795         //cerr << "commonStrings = " << commonStrings << endl;
00796     }
00797 
00798     ostringstream current;
00799     std::vector<std::string> commonStrings;
00800     std::vector<std::pair<size_t, std::string> > currentStrings;
00801 };
00802 
00803 JsonDecompressor::
00804 JsonDecompressor()
00805     : itl(new Itl())
00806 {
00807 }
00808 
00809 JsonDecompressor::
00810 ~JsonDecompressor()
00811 {
00812 }
00813 
00814 void
00815 JsonDecompressor::
00816 process(const char * src_begin, const char * src_end,
00817         FlushLevel level,
00818         boost::function<void ()> onMessageDone)
00819 {
00820     itl->process(src_begin, src_end);
00821     itl->writeOutput(onOutput, level, onMessageDone);
00822 }
00823 
00824 } // namespace Datacratic
00825 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator