![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* json_filter.cc 00002 Jeremy Barnes, 5 June 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Filters to pre-compress JSON quickly. 00006 */ 00007 00008 #include "json_filter.h" 00009 #include "jml/db/portable_oarchive.h" 00010 #include <unordered_map> 00011 #include <map> 00012 #include "jml/utils/hash_specializations.h" 00013 #include "jml/utils/vector_utils.h" 00014 #include "jml/arch/format.h" 00015 00016 using namespace std; 00017 using namespace ML; 00018 00019 00020 namespace Datacratic { 00021 00022 /*****************************************************************************/ 00023 /* JSON COMPRESSOR */ 00024 /*****************************************************************************/ 00025 00026 struct JsonContext { 00027 JsonContext() 00028 : stringNum(0) 00029 { 00030 size_t buf_size = 65536; 00031 start = new char[buf_size]; 00032 current = start; 00033 end = start + buf_size; 00034 } 00035 00036 ~JsonContext() 00037 { 00038 //cerr << "stringNum = " << stringNum << endl; 00039 } 00040 00041 void writeByte(uint8_t byte) 00042 { 00043 if (JML_UNLIKELY(current >= end)) { 00044 //cerr << "overflow " << (int)byte << endl; 00045 overflow += byte; 00046 return; 00047 } 00048 *current++ = byte; 00049 } 00050 00051 void writeSize(size_t size) 00052 { 00053 int len = DB::compact_encode_length(size); 00054 00055 if (JML_UNLIKELY(current + len > end)) { 00056 char buf[len]; 00057 char * b = buf; 00058 DB::encode_compact(b, buf + len, size); 00059 overflow.append(buf, 0, len); 00060 return; 00061 } 00062 00063 DB::encode_compact(current, end, size); 00064 } 00065 00066 void writeString(const std::string & s) 00067 { 00068 writeSize(s.length()); 00069 writeBinary(s.c_str(), s.length()); 00070 } 00071 00072 void writeString(const char * first, const char * last) 00073 { 00074 size_t sz = last - first; 00075 writeSize(sz); 00076 writeBinary(first, last); 00077 } 00078 00079 void writeBinary(const char * first, const char * last) 00080 { 00081 writeBinary(first, last - first); 00082 } 00083 00084 void writeBinary(const char * first, size_t sz) 00085 { 00086 if (JML_UNLIKELY(current + sz > end)) { 00087 overflow.append(first, first + sz); 00088 return; 00089 } 00090 00091 std::copy(first, first + sz, current); 00092 current += sz; 00093 } 00094 00095 void reset() 00096 { 00097 } 00098 00099 void writeInternedString(const std::string & s) 00100 { 00101 auto itC = commonStrings.find(s); 00102 00103 if (itC != commonStrings.end()) { 00104 int idx = itC->second; 00105 if (idx < 128) { 00106 writeByte(idx + 128); 00107 return; 00108 } 00109 else { 00110 writeByte('C'); 00111 writeByte(idx - 128); 00112 return; 00113 } 00114 } 00115 00116 auto it = lastStrings.find(s); 00117 00118 if (it != lastStrings.end()) { 00119 // It's already there; write a reference to it 00120 size_t n = it->second.index; 00121 ++it->second.count; 00122 writeByte('R'); // interned string reference 00123 writeSize(n); 00124 } 00125 else { 00126 // not there 00127 lastStrings.insert(make_pair(s, StringEntry(stringNum, 0))).second; 00128 //cerr << "string " << s << " is at ID " << stringNum << endl; 00129 ++stringNum; 00130 writeByte('S'); // interned string definition 00131 writeString(s); 00132 } 00133 00134 if (stringNum == 5000) 00135 sortStrings(); 00136 } 00137 00138 void sortStrings() 00139 { 00140 writeByte('<'); 00141 vector<pair<size_t, std::string> > counts; 00142 counts.reserve(lastStrings.size()); 00143 00144 for (auto it = lastStrings.begin(), end = lastStrings.end(); it != end; ++it) { 00145 size_t count = it->second.count; 00146 if (count < 2) continue; 00147 counts.push_back(make_pair(count, it->first)); 00148 } 00149 00150 std::sort(counts.begin(), counts.end(), std::greater<std::pair<size_t, string> >()); 00151 00152 commonStrings.clear(); 00153 00154 for (unsigned i = 0; i < 256 + 128 && i < counts.size(); ++i) 00155 commonStrings[counts[i].second] = i; 00156 } 00157 00158 struct StringEntry { 00159 StringEntry(int index = 0, int count = 0) 00160 : index(index), count(count) 00161 { 00162 } 00163 00164 size_t index; 00165 size_t count; 00166 }; 00167 00168 #if 0 00169 struct StringHash { 00170 size_t operator () (const std::string & str) const 00171 { 00172 size_t l = str.length(); 00173 switch (l) { 00174 case 0: return 0; 00175 case 1: return ML::chain_hash(str[0], 0); 00176 case 2: return ML::chain_hash(str[0], str[1]); 00177 case 3: return ML::chain_hash((str[0] << 8) + str[1], str[2]); 00178 default: 00179 return ML::chain_hash((str[0] << 24) 00180 + (str[1] << 16) 00181 + (str[l - 1] << 8) 00182 + l, 00183 (str[l << 1] << 16) + (str[(l << 1) + 1] << 8)); 00184 } 00185 } 00186 }; 00187 #endif 00188 00189 std::unordered_map<std::string, int> commonStrings; 00190 00191 std::unordered_map<std::string, StringEntry> lastStrings; 00192 size_t stringNum; 00193 00194 void writeOutput(Filter::OnOutput onOutput, 00195 FlushLevel level, 00196 boost::function<void ()> onMessageDone) 00197 { 00198 if (overflow.empty()) { 00199 onOutput(start, current - start, level, onMessageDone); 00200 } 00201 else { 00202 onOutput(start, current - start, FLUSH_NONE, []{}); 00203 onOutput(overflow.c_str(), overflow.length(), level, onMessageDone); 00204 overflow.clear(); 00205 } 00206 current = start; 00207 } 00208 00209 char * start; 00210 char * current; 00211 char * end; 00212 00213 std::string overflow; 00214 }; 00215 00216 struct ParseState { 00217 ParseState(JsonContext & context) 00218 : context(context) 00219 { 00220 } 00221 00222 virtual ~ParseState() 00223 { 00224 } 00225 00226 enum ProcessResult { 00227 CONTINUE, // State should continue 00228 FINISHED, // State has finished without an error 00229 ERROR // State had an error; parent should back out to before 00230 }; 00231 00232 JsonContext & context; 00233 00234 virtual ProcessResult 00235 process(const char * & first, const char * last) = 0; 00236 00237 virtual ProcessResult flush() = 0; 00238 }; 00239 00240 struct StringState : public ParseState { 00241 00242 enum State { 00243 IN_STRING, 00244 AFTER_BACKSLASH, 00245 } state; 00246 00247 StringState(JsonContext & context) 00248 : ParseState(context), state(IN_STRING) 00249 { 00250 current.reserve(64); 00251 current = "\""; 00252 } 00253 00254 virtual ProcessResult 00255 process(const char * & first, const char * last) 00256 { 00257 while (first < last) { 00258 switch (state) { 00259 case IN_STRING: { 00260 const char * start = first; 00261 while (first < last && *first != '"' && *first != '\\') 00262 ++first; 00263 00264 if (first == last) { 00265 current.append(start, first); 00266 return CONTINUE; 00267 } 00268 00269 char c = *first++; 00270 00271 if (c == '"') { 00272 writeOutput(start, first); 00273 return FINISHED; 00274 } 00275 else if (c == '\\') { 00276 state = AFTER_BACKSLASH; 00277 } 00278 break; 00279 } 00280 00281 case AFTER_BACKSLASH: 00282 current += *first++; 00283 state = IN_STRING; 00284 break; 00285 00286 default: 00287 throw Exception("invalid state"); 00288 }; 00289 } 00290 return CONTINUE; 00291 } 00292 00293 virtual ProcessResult flush() 00294 { 00295 writeOutput(); 00296 return CONTINUE; 00297 } 00298 00299 ProcessResult do_c(char c) 00300 { 00301 return CONTINUE; 00302 } 00303 00304 void writeOutput(const char * first = 0, const char * last = 0) 00305 { 00306 size_t slen = current.size() + (last - first); 00307 if (slen > 1 && slen < 32) { 00308 if (current.empty()) 00309 context.writeInternedString(string(first, last)); 00310 else { 00311 current.append(first, last); 00312 context.writeInternedString(current); 00313 current.clear(); 00314 } 00315 } 00316 else { 00317 context.writeByte('s'); 00318 context.writeSize(slen); 00319 if (!current.empty()) { 00320 context.writeBinary(current.c_str(), current.length()); 00321 current.clear(); 00322 } 00323 context.writeBinary(first, last); 00324 } 00325 } 00326 00327 std::string current; 00328 }; 00329 00330 struct RootState : public ParseState { 00331 00332 RootState(JsonContext & context) 00333 : ParseState(context) 00334 { 00335 } 00336 00337 virtual ProcessResult process(const char * & first, const char * last) 00338 { 00339 ProcessResult result = CONTINUE; 00340 00341 for (; first < last; /* no inc */) { 00342 if (next) { 00343 ProcessResult nextResult = next->process(first, last); 00344 if (nextResult == CONTINUE && first != last) 00345 throw Exception("asked to continue but didn't consume all input"); 00346 if (nextResult == FINISHED) 00347 next.reset(); 00348 } 00349 else { 00350 const char * start = first; 00351 while (first < last && *first != '"') 00352 ++first; 00353 00354 00355 if (first == last) { 00356 current.append(start, first); 00357 return CONTINUE; 00358 } 00359 00360 if (*first == '"') { 00361 writeOutput(start, first); 00362 ++first; 00363 next.reset(new StringState(context)); 00364 } 00365 else throw Exception("out of sync"); 00366 } 00367 } 00368 00369 if (first > last) 00370 throw Exception("first > last"); 00371 00372 return result; 00373 } 00374 00375 virtual ProcessResult flush() 00376 { 00377 if (next) 00378 return next->flush(); 00379 else { 00380 writeOutput(); 00381 return CONTINUE; 00382 } 00383 } 00384 00385 void writeOutput(const char * first = 0, const char * last = 0) 00386 { 00387 size_t slen = current.size() + (last - first); 00388 if (slen > 1 && slen < 32) { 00389 if (current.empty()) 00390 context.writeInternedString(string(first, last)); 00391 else { 00392 current.append(first, last); 00393 context.writeInternedString(current); 00394 current.clear(); 00395 } 00396 } 00397 else { 00398 context.writeByte('r'); 00399 context.writeSize(slen); 00400 if (!current.empty()) { 00401 context.writeBinary(current.c_str(), current.length()); 00402 current.clear(); 00403 } 00404 context.writeBinary(first, last); 00405 } 00406 } 00407 00408 void reset() 00409 { 00410 } 00411 00412 std::string current; 00413 std::shared_ptr<ParseState> next; 00414 }; 00415 00416 00417 00418 struct JsonCompressor::Itl { 00419 Itl() 00420 : state(context) 00421 { 00422 } 00423 00424 JsonContext context; 00425 RootState state; 00426 00427 void reset() 00428 { 00429 context.reset(); 00430 state.reset(); 00431 } 00432 }; 00433 00434 JsonCompressor:: 00435 JsonCompressor() 00436 : itl(new Itl()) 00437 { 00438 } 00439 00440 JsonCompressor:: 00441 ~JsonCompressor() 00442 { 00443 } 00444 00445 void 00446 JsonCompressor:: 00447 process(const char * src_begin, const char * src_end, 00448 FlushLevel level, 00449 boost::function<void ()> onMessageDone) 00450 { 00451 itl->state.process(src_begin, src_end); 00452 if (level != FLUSH_NONE) 00453 itl->state.flush(); 00454 if (level == FLUSH_FULL) 00455 itl->reset(); 00456 itl->context.writeOutput(onOutput, level, onMessageDone); 00457 } 00458 00459 00460 00461 /*****************************************************************************/ 00462 /* JSON DECOMPRESSOR */ 00463 /*****************************************************************************/ 00464 00465 struct JsonDecompressor::Itl { 00466 00467 struct State; 00468 00469 typedef void (Itl::* OnDataFn) (const char * &, const char *, State &); 00470 typedef void (Itl::* OnFinishedFn) (State & current, State & parent); 00471 00472 struct State { 00473 State() 00474 : onData(0), onFinished(0), param(0), len(0), done(0), phase(0) 00475 { 00476 } 00477 00478 // Thing to call once we get data 00479 OnDataFn onData; 00480 OnFinishedFn onFinished; 00481 void * param; 00482 00483 size_t len, done; 00484 int phase; 00485 std::string str; 00486 }; 00487 00488 std::vector<State> states; 00489 00490 Itl() 00491 { 00492 states.reserve(20); 00493 pushState(&Itl::processBase); 00494 } 00495 00496 State & pushState(OnDataFn onData, OnFinishedFn onFinished = 0, 00497 void * param = 0) 00498 { 00499 states.push_back(State()); 00500 states.back().onData = onData; 00501 states.back().onFinished = onFinished; 00502 states.back().param = param; 00503 00504 return states.back(); 00505 } 00506 00507 void popState() 00508 { 00509 if (states.size() < 2) 00510 throw Exception("pop of base state"); 00511 State & current = states.back(); 00512 00513 if (!current.onFinished) { 00514 states.pop_back(); 00515 return; 00516 } 00517 00518 State & parent = states[states.size() - 2]; 00519 00520 (this ->* current.onFinished) (current, parent); 00521 00522 states.pop_back(); 00523 } 00524 00525 void process(const char * first, const char * last) 00526 { 00527 while (first != last) { 00528 if (states.empty()) 00529 throw Exception("empty state"); 00530 (this ->* states.back().onData)(first, last, states.back()); 00531 } 00532 } 00533 00534 void processBase(const char * & first, const char * last, State & state) 00535 { 00536 //cerr << "processBase" << endl; 00537 00538 if (first >= last) 00539 throw Exception("processing with no characters"); 00540 uint8_t c = *first++; 00541 00542 switch (c) { 00543 00544 case 'r': 00545 pushState(&Itl::processRaw); 00546 return; 00547 00548 case 's': 00549 pushState(&Itl::processString); 00550 return; 00551 00552 case 'S': 00553 pushState(&Itl::processInternedStringDefinition); 00554 return; 00555 00556 case 'R': 00557 pushState(&Itl::processInternedStringReference); 00558 return; 00559 00560 case '<': 00561 sortStrings(); 00562 return; 00563 00564 case 'C': 00565 pushState(&Itl::processCommonString); 00566 return; 00567 00568 default: 00569 if (c >= 128) 00570 current << commonStrings.at(c - 128); 00571 else 00572 throw Exception("unknown state character %d %c", c, c); 00573 } 00574 } 00575 00576 void processString(const char * & first, const char * last, State & state) 00577 { 00578 //cerr << "processing string input phase " << state.phase 00579 // << " with " << last - first << " characters depth " 00580 // << states.size() << endl; 00581 00582 if (first >= last) 00583 throw Exception("processing with no characters"); 00584 00585 switch (state.phase) { 00586 00587 case 0: // reading length 00588 // TODO: if length is inline then do that to avoid indirection 00589 00590 pushState(&Itl::processLength, &Itl::doneStringLength); 00591 break; 00592 00593 case 1: { // reading data 00594 size_t avail = last - first; 00595 size_t toRead = std::min(avail, state.len - state.done); 00596 00597 //cerr << "string is " << std::string(first, first + toRead) 00598 // << endl; 00599 00600 current.write(first, toRead); 00601 first += toRead; 00602 state.done += toRead; 00603 00604 //cerr << "string: done " << state.done << " len " 00605 // << state.len << endl; 00606 00607 if (state.done == state.len) 00608 popState(); 00609 break; 00610 } 00611 default: 00612 throw Exception("processString: invalid phase"); 00613 } 00614 } 00615 00616 void doneStringLength(State & current, State & parent) 00617 { 00618 //cerr << "string is of length " << current.len << endl; 00619 parent.done = 0; 00620 parent.len = current.len; 00621 parent.phase = 1; 00622 } 00623 00624 void processRaw(const char * & first, const char * last, State & state) 00625 { 00626 //cerr << "processing raw input" << endl; 00627 return processString(first, last, state); 00628 } 00629 00630 void doneInternedStringLength(State & current, State & parent) 00631 { 00632 //cerr << "string is of length " << current.len << endl; 00633 parent.done = 0; 00634 parent.len = current.len; 00635 parent.phase = 1; 00636 parent.str.reserve(current.len); 00637 } 00638 00639 void processInternedStringDefinition(const char * & first, 00640 const char * last, 00641 State & state) 00642 { 00643 if (first >= last) 00644 throw Exception("processing with no characters"); 00645 00646 switch (state.phase) { 00647 00648 case 0: // reading length 00649 // TODO: if length is inline then do that to avoid indirection 00650 00651 pushState(&Itl::processLength, &Itl::doneInternedStringLength); 00652 break; 00653 00654 case 1: { // reading data 00655 size_t avail = last - first; 00656 size_t toRead = std::min(avail, state.len - state.done); 00657 00658 state.str.append(first, first + toRead); 00659 first += toRead; 00660 state.done += toRead; 00661 00662 if (state.done == state.len) { 00663 //cerr << "defined interned string " << state.str << " at " 00664 // << currentStrings.size() << endl; 00665 current << state.str; 00666 currentStrings.push_back(make_pair(1, state.str)); 00667 popState(); 00668 } 00669 break; 00670 } 00671 default: 00672 throw Exception("processString: invalid phase"); 00673 } 00674 } 00675 00676 void doneInternedStringReference(State & current, State & parent) 00677 { 00678 parent.phase = 1; 00679 parent.str = currentStrings.at(current.len).second; 00680 currentStrings[current.len].first += 1; 00681 //cerr << "reference to interned string " << current.len 00682 // << " -> " << parent.str << endl; 00683 } 00684 00685 void processInternedStringReference(const char * & first, 00686 const char * last, 00687 State & state) 00688 { 00689 //cerr << "doing interned string reference phase " 00690 // << state.phase << endl; 00691 00692 if (first >= last) 00693 throw Exception("processing with no characters"); 00694 00695 switch (state.phase) { 00696 00697 case 0: // reading length 00698 pushState(&Itl::processLength, &Itl::doneInternedStringReference); 00699 break; 00700 00701 case 1: // finished 00702 current << state.str; 00703 popState(); 00704 break; 00705 00706 default: 00707 throw Exception("processString: invalid phase"); 00708 } 00709 } 00710 00711 void processCommonString(const char * & first, const char * last, State & state) 00712 { 00713 const uint8_t * p = reinterpret_cast<const uint8_t *>(first); 00714 int idx = *p; 00715 current << commonStrings.at(idx + 128); 00716 ++first; 00717 popState(); 00718 } 00719 00723 void processLength(const char * & first, const char * last, State & state) 00724 { 00725 //cerr << "processing length" << endl; 00726 // Length of the length in bytes 00727 size_t len = ML::DB::compact_decode_length(*first); 00728 size_t avail = last - first; 00729 00730 // If we have enough data, get the length 00731 if (avail >= len) { 00732 state.len = ML::DB::decode_compact(first, last); 00733 //cerr << "length is " << state.len << endl; 00734 popState(); 00735 return; 00736 } 00737 00738 // Otherwise, require more data 00739 requireData(len, first, last); 00740 } 00741 00742 void requireData(size_t amount, const char * & first, const char * last) 00743 { 00744 //cerr << "requiring " << amount << " bytes" << endl; 00745 00746 size_t avail = last - first; 00747 if (avail >= amount) 00748 throw Exception("required amount already available"); 00749 State & state = pushState(&Itl::processRequireData); 00750 state.len = amount; 00751 state.str.reserve(amount); 00752 state.str.append(first, last); 00753 first = last; 00754 } 00755 00756 void processRequireData(const char * & first, const char * last, 00757 State & state) 00758 { 00759 // Buffer until we get enough 00760 size_t avail = last - first; 00761 size_t toRead = std::min(avail, state.len - state.str.size()); 00762 state.str.append(first, first + toRead); 00763 first += toRead; 00764 00765 if (state.len == state.str.size()) { 00766 // Enough buffered; pass it on to the previous entry in the 00767 // stack. 00768 string s = state.str; // copy so it doesn't go out of scope 00769 states.pop_back(); 00770 process(s.c_str(), s.c_str() + s.length()); 00771 } 00772 } 00773 00774 void writeOutput(Filter::OnOutput onOutput, 00775 FlushLevel level, 00776 boost::function<void ()> onMessageDone) 00777 { 00778 string s = current.str(); 00779 if (!s.empty()) 00780 onOutput(s.c_str(), s.length(), level, onMessageDone); 00781 else onMessageDone(); 00782 current.str(""); 00783 } 00784 00785 void sortStrings() 00786 { 00787 vector<pair<size_t, std::string> > counts = currentStrings; 00788 std::sort(counts.begin(), counts.end(), std::greater<std::pair<size_t, string> >()); 00789 00790 commonStrings.clear(); 00791 00792 for (unsigned i = 0; i < 256 + 128 && i < counts.size(); ++i) 00793 commonStrings.push_back(counts[i].second); 00794 00795 //cerr << "commonStrings = " << commonStrings << endl; 00796 } 00797 00798 ostringstream current; 00799 std::vector<std::string> commonStrings; 00800 std::vector<std::pair<size_t, std::string> > currentStrings; 00801 }; 00802 00803 JsonDecompressor:: 00804 JsonDecompressor() 00805 : itl(new Itl()) 00806 { 00807 } 00808 00809 JsonDecompressor:: 00810 ~JsonDecompressor() 00811 { 00812 } 00813 00814 void 00815 JsonDecompressor:: 00816 process(const char * src_begin, const char * src_end, 00817 FlushLevel level, 00818 boost::function<void ()> onMessageDone) 00819 { 00820 itl->process(src_begin, src_end); 00821 itl->writeOutput(onOutput, level, onMessageDone); 00822 } 00823 00824 } // namespace Datacratic 00825