RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* filter.cc 00002 Jeremy Barnes, 30 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Logging output filters. 00006 */ 00007 00008 #include "filter.h" 00009 #include "zlib.h" 00010 #include "jml/arch/exception.h" 00011 #include <iostream> 00012 #include <boost/iostreams/filter/zlib.hpp> 00013 #include <boost/iostreams/filter/bzip2.hpp> 00014 #include <boost/iostreams/filter/gzip.hpp> 00015 #include "jml/utils/hex_dump.h" 00016 #include "jml/utils/string_functions.h" 00017 #include "lzma.h" 00018 00019 00020 using namespace std; 00021 using namespace ML; 00022 00023 00024 namespace Datacratic { 00025 00026 std::string print(Direction dir) 00027 { 00028 switch (dir) { 00029 case COMPRESS: return "COMPRESS"; 00030 case DECOMPRESS: return "DECOMPRESS"; 00031 default: return ML::format("Direction(%d)", dir); 00032 } 00033 } 00034 00035 std::ostream & operator << (std::ostream & stream, Direction dir) 00036 { 00037 return stream << print(dir); 00038 } 00039 00040 std::string print(FlushLevel lvl) 00041 { 00042 switch (lvl) { 00043 case FLUSH_NONE: return "FLUSH_NONE"; 00044 case FLUSH_SYNC: return "FLUSH_SYNC"; 00045 case FLUSH_FULL: return "FLUSH_FULL"; 00046 case FLUSH_FINISH: return "FLUSH_FINISH"; 00047 default: return ML::format("FlushLevel(%d)", lvl); 00048 } 00049 } 00050 00051 std::ostream & operator << (std::ostream & stream, FlushLevel lvl) 00052 { 00053 return stream << print(lvl); 00054 } 00055 00056 00057 /*****************************************************************************/ 00058 /* FILTER */ 00059 /*****************************************************************************/ 00060 00061 Filter:: 00062 ~Filter() 00063 { 00064 } 00065 00066 void 00067 Filter:: 00068 flush(FlushLevel level, boost::function<void ()> onFlushDone) 00069 { 00070 process(0, 0, level, onFlushDone); 00071 } 00072 00073 void 00074 Filter:: 00075 process(const std::string & buf, 00076 FlushLevel level, 00077 boost::function<void ()> onFilterDone) 00078 { 00079 process(buf.c_str(), 00080 buf.c_str() + buf.length(), 00081 level, 00082 onFilterDone); 00083 } 00084 00085 Filter * 00086 Filter:: 00087 create(const std::string & extension, 00088 Direction direction) 00089 { 00090 if (extension == "z") { 00091 if (direction == COMPRESS) return new ZlibCompressor(); 00092 else return new ZlibDecompressor(); 00093 } 00094 else if (extension == "bz" || extension == "bz2") { 00095 if (direction == COMPRESS) return new Bzip2Compressor(); 00096 else return new Bzip2Decompressor(); 00097 } 00098 else if (extension == "xz" || extension == "lzma") { 00099 if (direction == COMPRESS) return new LzmaCompressor(); 00100 else return new LzmaDecompressor(); 00101 } 00102 else return new IdentityFilter(); 00103 } 00104 00105 00106 /*****************************************************************************/ 00107 /* IDENTITY FILTER */ 00108 /*****************************************************************************/ 00109 00110 void 00111 IdentityFilter:: 00112 process(const char * src_begin, const char * src_end, 00113 FlushLevel level, 00114 boost::function<void ()> onMessageDone) 00115 { 00116 onOutput(src_begin, src_end - src_begin, level, onMessageDone); 00117 } 00118 00119 00120 /*****************************************************************************/ 00121 /* FILTER STACK */ 00122 /*****************************************************************************/ 00123 00124 void 00125 FilterStack:: 00126 process(const char * src_begin, const char * src_end, 00127 FlushLevel level, 00128 boost::function<void ()> onMessageDone) 00129 { 00130 if (filters.empty()) 00131 throw Exception("empty filter stack cannot process anything"); 00132 00133 const std::shared_ptr<Filter> & f 00134 = filters.front(); 00135 f->process(src_begin, src_end, level, onMessageDone); 00136 } 00137 00138 void 00139 FilterStack:: 00140 push(std::shared_ptr<Filter> filter) 00141 { 00142 if (!filters.empty()) { 00143 filters.back()->onOutput = [=] (const char * p, size_t n, FlushLevel f, 00144 boost::function<void ()> cb) 00145 { 00146 filter->process(p, p + n, f, cb); 00147 }; 00148 } 00149 00150 filter->onOutput = [=] (const char * p, size_t n, FlushLevel f, 00151 boost::function<void ()> cb) 00152 { 00153 this->onOutput(p, n, f, cb); 00154 }; 00155 00156 filters.push_back(filter); 00157 } 00158 00159 std::shared_ptr<Filter> 00160 FilterStack:: 00161 pop() 00162 { 00163 throw Exception("can't pop from a filter stack"); 00164 } 00165 00166 00167 /*****************************************************************************/ 00168 /* ZLIB COMPRESSOR */ 00169 /*****************************************************************************/ 00170 00171 using namespace boost::iostreams; 00172 00173 struct ZlibCompressor::Itl 00174 : public boost::iostreams::detail::zlib_base { 00175 00176 typedef boost::iostreams::detail::zlib_base Compressor; 00177 00178 Itl(const boost::iostreams::zlib_params& p, 00179 Direction direction) 00180 : direction(direction) 00181 { 00182 detail::zlib_allocator<std::allocator<char> > alloc; 00183 init(p, direction == COMPRESS, alloc); 00184 } 00185 00186 ~Itl() 00187 { 00188 reset(direction == COMPRESS, false); 00189 } 00190 00191 void process(const char * & src_begin, 00192 const char * & src_end, 00193 char * & dest_begin, 00194 char * & dest_end, 00195 FlushLevel level) 00196 { 00197 int flush; 00198 00199 switch (level) { 00200 case FLUSH_NONE: flush = zlib::no_flush; break; 00201 case FLUSH_SYNC: flush = zlib::sync_flush; break; 00202 case FLUSH_FULL: flush = Z_FULL_FLUSH; break; 00203 case FLUSH_FINISH: flush = zlib::finish; break; 00204 default: 00205 throw Exception("invalid flush level"); 00206 } 00207 00208 int result = Z_OK; 00209 00210 before(src_begin, src_end, dest_begin, dest_end); 00211 00212 //cerr << "calling " << (direction == COMPRESS ? "deflate" : "inflate") 00213 // << endl; 00214 00215 //ML::hex_dump(src_begin, src_end - src_begin); 00216 00217 result = (direction == COMPRESS 00218 ? xdeflate(flush) 00219 : xinflate(zlib::no_flush)); 00220 00221 after(src_begin, dest_begin, direction == COMPRESS); 00222 00223 switch (result) { 00224 case Z_OK: 00225 case Z_STREAM_END: 00226 //case Z_BUF_ERROR: 00227 break; 00228 case Z_MEM_ERROR: 00229 boost::throw_exception(std::bad_alloc()); 00230 default: 00231 throw ML::Exception("zlib error %d on %s at byte %d with flush %d: %s", 00232 result, 00233 (direction == COMPRESS ? "compression" : "decompression"), 00234 total_in(), level, 00235 zError(result)); 00236 }; 00237 } 00238 00239 Direction direction; 00240 }; 00241 00242 const zlib_params ZlibCompressor:: 00243 DEFAULT_PARAMS(zlib::default_compression, 00244 zlib::deflated, 00245 zlib::default_window_bits, 00246 zlib::default_mem_level, 00247 zlib::default_strategy, 00248 true /* no header */, 00249 true /* no CRC */); 00250 00251 ZlibCompressor:: 00252 ZlibCompressor(const zlib_params& p) 00253 : itl(new Itl(p, COMPRESS)) 00254 { 00255 } 00256 00257 ZlibCompressor:: 00258 ZlibCompressor(const zlib_params& p, Direction dir) 00259 : itl(new Itl(p, dir)) 00260 { 00261 } 00262 00263 ZlibCompressor:: 00264 ~ZlibCompressor() 00265 { 00266 } 00267 00268 void 00269 ZlibCompressor:: 00270 process(const char * src_begin, const char * src_end, 00271 FlushLevel level, 00272 boost::function<void ()> onMessageDone) 00273 { 00274 size_t buffer_size = 65536; 00275 00276 //cerr << "filter " << src_end - src_begin << " bytes with level " 00277 // << level << " direction " << itl->direction 00278 // << endl; 00279 00280 do { 00281 char dest[buffer_size]; 00282 char * dest_begin = dest; 00283 char * dest_end = dest_begin + buffer_size; 00284 00285 if (src_end != src_begin || level == FLUSH_FINISH) { 00286 itl->process(src_begin, src_end, dest_begin, dest_end, level); 00287 } 00288 00289 bool done = src_begin == src_end; 00290 00291 size_t bytes_written = dest_begin - dest; 00292 //cerr << "calling onOutput with " << bytes_written << " bytes" 00293 // << " done = " << done << endl; 00294 //ML::hex_dump(dest, bytes_written); 00295 onOutput(dest, bytes_written, done ? level : FLUSH_NONE, 00296 done ? onMessageDone : boost::function<void ()>()); 00297 00298 } while (src_begin != src_end); 00299 } 00300 00301 00302 /*****************************************************************************/ 00303 /* ZLIB DECOMPRESSOR */ 00304 /*****************************************************************************/ 00305 00306 ZlibDecompressor:: 00307 ZlibDecompressor(const boost::iostreams::zlib_params& p) 00308 : ZlibCompressor(p, DECOMPRESS) 00309 { 00310 } 00311 00312 ZlibDecompressor:: 00313 ~ZlibDecompressor() 00314 { 00315 } 00316 00317 /*****************************************************************************/ 00318 /* GZIP COMPRESSOR */ 00319 /*****************************************************************************/ 00320 00321 using namespace boost::iostreams; 00322 00323 struct GzipCompressorFilter::Itl : public boost::iostreams::gzip_compressor { 00324 00325 typedef boost::iostreams::gzip_compressor Compressor; 00326 00327 Itl(const boost::iostreams::gzip_params& p) 00328 : Compressor(p) 00329 { 00330 } 00331 00332 ~Itl() 00333 { 00334 } 00335 00336 void process(const char * & src_begin, 00337 const char * & src_end, 00338 char * & dest_begin, 00339 char * & dest_end, 00340 FlushLevel level) 00341 { 00342 #if 0 00343 int flush; 00344 00345 switch (level) { 00346 case FLUSH_NONE: flush = gzip::no_flush; break; 00347 case FLUSH_SYNC: flush = gzip::sync_flush; break; 00348 case FLUSH_FULL: flush = Z_FULL_FLUSH; break; 00349 case FLUSH_FINISH: flush = gzip::finish; break; 00350 default: 00351 throw Exception("invalid flush level"); 00352 } 00353 #endif 00354 00355 struct Source { 00356 Source(const char * & src_begin, 00357 const char * & src_end) 00358 : src_begin(src_begin), 00359 src_end(src_end) 00360 { 00361 } 00362 00363 typedef char char_type; 00364 struct category 00365 : dual_use, 00366 filter_tag, 00367 multichar_tag, 00368 closable_tag { 00369 }; 00370 00371 const char * & src_begin; 00372 const char * & src_end; 00373 00374 size_t read(char * buf, size_t n) 00375 { 00376 size_t left = std::distance(src_begin, src_end); 00377 size_t todo = std::min(left, n); 00378 std::copy(src_begin, src_begin + todo, buf); 00379 src_begin += todo; 00380 00381 //cerr << "source: presented " << todo << " of " 00382 // << left << " bytes (wanted " << n << ")" << endl; 00383 00384 return todo; 00385 } 00386 }; 00387 00388 Source source(src_begin, src_end); 00389 00390 ssize_t n = read(source, dest_begin, dest_end - dest_begin); 00391 //cerr << "got " << n << " bytes in output" << endl; 00392 00393 if (n != -1) { 00394 dest_begin += n; 00395 } 00396 } 00397 00398 Direction direction; 00399 }; 00400 00401 const gzip_params GzipCompressorFilter:: 00402 DEFAULT_PARAMS; 00403 00404 GzipCompressorFilter:: 00405 GzipCompressorFilter(const gzip_params& p) 00406 : itl(new Itl(p)) 00407 { 00408 } 00409 00410 GzipCompressorFilter:: 00411 ~GzipCompressorFilter() 00412 { 00413 } 00414 00415 void 00416 GzipCompressorFilter:: 00417 process(const char * src_begin, const char * src_end, 00418 FlushLevel level, 00419 boost::function<void ()> onMessageDone) 00420 { 00421 size_t buffer_size = 65536; 00422 00423 //cerr << "filter " << src_end - src_begin << " bytes with level " 00424 // << level << " direction " << itl->direction 00425 // << endl; 00426 00427 do { 00428 char dest[buffer_size]; 00429 char * dest_begin = dest; 00430 char * dest_end = dest_begin + buffer_size; 00431 00432 if (src_end != src_begin || level == FLUSH_FINISH) { 00433 itl->process(src_begin, src_end, dest_begin, dest_end, level); 00434 } 00435 00436 bool done = src_begin == src_end; 00437 00438 size_t bytes_written = dest_begin - dest; 00439 //cerr << "calling onOutput with " << bytes_written << " bytes" 00440 // << " done = " << done << endl; 00441 //ML::hex_dump(dest, bytes_written); 00442 onOutput(dest, bytes_written, done ? level : FLUSH_NONE, 00443 done ? onMessageDone : boost::function<void ()>()); 00444 00445 } while (src_begin != src_end); 00446 } 00447 00448 /*****************************************************************************/ 00449 /* GZIP DECOMPRESSOR */ 00450 /*****************************************************************************/ 00451 00452 using namespace boost::iostreams; 00453 00454 struct GzipDecompressor::Itl : public boost::iostreams::gzip_decompressor { 00455 00456 typedef boost::iostreams::gzip_decompressor Decompressor; 00457 00458 Itl() 00459 : Decompressor() 00460 { 00461 } 00462 00463 ~Itl() 00464 { 00465 } 00466 00467 void process(const char * & src_begin, 00468 const char * & src_end, 00469 char * & dest_begin, 00470 char * & dest_end, 00471 FlushLevel level) 00472 { 00473 struct Source { 00474 Source(const char * & src_begin, 00475 const char * & src_end) 00476 : src_begin(src_begin), 00477 src_end(src_end) 00478 { 00479 } 00480 00481 typedef char char_type; 00482 struct category 00483 : dual_use, 00484 filter_tag, 00485 multichar_tag, 00486 closable_tag { 00487 }; 00488 00489 const char * & src_begin; 00490 const char * & src_end; 00491 00492 ssize_t read(char * buf, size_t n) 00493 { 00494 size_t left = std::distance(src_begin, src_end); 00495 size_t todo = std::min(left, n); 00496 std::copy(src_begin, src_begin + todo, buf); 00497 src_begin += todo; 00498 return todo; 00499 } 00500 }; 00501 00502 Source source(src_begin, src_end); 00503 00504 ssize_t n = read(source, dest_begin, dest_end - dest_begin); 00505 00506 dest_begin += n; 00507 } 00508 00509 Direction direction; 00510 }; 00511 00512 GzipDecompressor:: 00513 GzipDecompressor() 00514 : itl(new Itl()) 00515 { 00516 } 00517 00518 GzipDecompressor:: 00519 ~GzipDecompressor() 00520 { 00521 } 00522 00523 void 00524 GzipDecompressor:: 00525 process(const char * src_begin, const char * src_end, 00526 FlushLevel level, 00527 boost::function<void ()> onMessageDone) 00528 { 00529 size_t buffer_size = 65536; 00530 00531 //cerr << "filter " << src_end - src_begin << " bytes with level " 00532 // << level << " direction " << itl->direction 00533 // << endl; 00534 00535 do { 00536 char dest[buffer_size]; 00537 char * dest_begin = dest; 00538 char * dest_end = dest_begin + buffer_size; 00539 00540 if (src_end != src_begin || level == FLUSH_FINISH) { 00541 itl->process(src_begin, src_end, dest_begin, dest_end, level); 00542 } 00543 00544 bool done = src_begin == src_end; 00545 00546 size_t bytes_written = dest_begin - dest; 00547 //cerr << "calling onOutput with " << bytes_written << " bytes" 00548 // << " done = " << done << endl; 00549 //ML::hex_dump(dest, bytes_written); 00550 onOutput(dest, bytes_written, done ? level : FLUSH_NONE, 00551 done ? onMessageDone : boost::function<void ()>()); 00552 00553 } while (src_begin != src_end); 00554 } 00555 00556 00557 /*****************************************************************************/ 00558 /* BZIP2 COMPRESSOR */ 00559 /*****************************************************************************/ 00560 00561 using namespace boost::iostreams; 00562 00563 struct Bzip2Compressor::Itl 00564 : public boost::iostreams::detail::bzip2_base { 00565 00566 typedef boost::iostreams::detail::bzip2_base Compressor; 00567 00568 Itl(const boost::iostreams::bzip2_params& p, 00569 Direction direction) 00570 : Compressor(p), direction(direction) 00571 { 00572 detail::bzip2_allocator<std::allocator<char> > alloc; 00573 init(direction == COMPRESS, alloc); 00574 } 00575 00576 ~Itl() 00577 { 00578 } 00579 00580 void process(const char * & src_begin, 00581 const char * & src_end, 00582 char * & dest_begin, 00583 char * & dest_end, 00584 FlushLevel level) 00585 { 00586 int flush; 00587 00588 switch (level) { 00589 case FLUSH_NONE: flush = bzip2::run; break; 00590 case FLUSH_FINISH: flush = bzip2::finish; break; 00591 case FLUSH_SYNC: 00592 case FLUSH_FULL: 00593 throw ML::Exception("bzip2 doesn't support flushing"); 00594 default: 00595 throw Exception("invalid flush level"); 00596 } 00597 00598 int result = Z_OK; 00599 00600 before(src_begin, src_end, dest_begin, dest_end); 00601 00602 //cerr << "calling " << (direction == COMPRESS ? "deflate" : "inflate") 00603 // << endl; 00604 00605 //ML::hex_dump(src_begin, src_end - src_begin); 00606 00607 result = (direction == COMPRESS 00608 ? compress(flush) 00609 : decompress()); 00610 00611 after(src_begin, dest_begin); 00612 00613 using namespace boost::iostreams::bzip2; 00614 00615 if (result == ok 00616 || result == run_ok 00617 || result == finish_ok 00618 || result == flush_ok 00619 || result == stream_end) { 00620 ; 00621 } 00622 else if (result == sequence_error) 00623 throw Exception("bzip2 sequence error"); 00624 else if (result == param_error) 00625 throw Exception("bzip2 param error"); 00626 else if (result == mem_error) 00627 throw Exception("bzip2 mem error"); 00628 else if (result == data_error) 00629 throw Exception("bzip2 data error"); 00630 else if (result == data_error_magic) 00631 throw Exception("bzip2 magic data error"); 00632 else if (result == io_error) 00633 throw Exception("bzip2 io error"); 00634 else if (result == unexpected_eof) 00635 throw Exception("bzip2 unexpected eof"); 00636 else if (result == outbuff_full) 00637 throw Exception("bzip2 output buffer full"); 00638 else if (result == config_error) 00639 throw Exception("bzip2 config error"); 00640 else { 00641 throw Exception("unknown bzip2 error %d", result); 00642 } 00643 #if 0 00644 throw ML::Exception("bzip2 error %d on %s with flush %d: %s", 00645 result, 00646 (direction == COMPRESS 00647 ? "compression" : "decompression"), 00648 level, 00649 zError(result)); 00650 #endif 00651 } 00652 00653 Direction direction; 00654 }; 00655 00656 const bzip2_params Bzip2Compressor::DEFAULT_PARAMS; 00657 00658 Bzip2Compressor:: 00659 Bzip2Compressor(const bzip2_params& p) 00660 : itl(new Itl(p, COMPRESS)) 00661 { 00662 } 00663 00664 Bzip2Compressor:: 00665 Bzip2Compressor(const bzip2_params& p, Direction dir) 00666 : itl(new Itl(p, dir)) 00667 { 00668 } 00669 00670 Bzip2Compressor:: 00671 ~Bzip2Compressor() 00672 { 00673 } 00674 00675 void 00676 Bzip2Compressor:: 00677 process(const char * src_begin, const char * src_end, 00678 FlushLevel level, 00679 boost::function<void ()> onMessageDone) 00680 { 00681 size_t buffer_size = 65536; 00682 00683 //cerr << "filter " << src_end - src_begin << " bytes with level " 00684 // << level << " direction " << itl->direction 00685 // << endl; 00686 00687 do { 00688 char dest[buffer_size]; 00689 char * dest_begin = dest; 00690 char * dest_end = dest_begin + buffer_size; 00691 00692 if (src_end != src_begin || level == FLUSH_FINISH) { 00693 itl->process(src_begin, src_end, dest_begin, dest_end, level); 00694 } 00695 00696 bool done = src_begin == src_end; 00697 00698 size_t bytes_written = dest_begin - dest; 00699 //cerr << "calling onOutput with " << bytes_written << " bytes" 00700 // << " done = " << done << endl; 00701 //ML::hex_dump(dest, bytes_written); 00702 onOutput(dest, bytes_written, done ? level : FLUSH_NONE, 00703 done ? onMessageDone : boost::function<void ()>()); 00704 00705 } while (src_begin != src_end); 00706 } 00707 00708 00709 /*****************************************************************************/ 00710 /* BZIP2 DECOMPRESSOR */ 00711 /*****************************************************************************/ 00712 00713 const bzip2_params Bzip2Decompressor::DEFAULT_PARAMS(false /* small */); 00714 00715 Bzip2Decompressor:: 00716 Bzip2Decompressor(const boost::iostreams::bzip2_params& p) 00717 : Bzip2Compressor(p, DECOMPRESS) 00718 { 00719 } 00720 00721 Bzip2Decompressor:: 00722 ~Bzip2Decompressor() 00723 { 00724 } 00725 00726 00727 /*****************************************************************************/ 00728 /* LZMA COMPRESSOR */ 00729 /*****************************************************************************/ 00730 00731 using namespace boost::iostreams; 00732 00733 struct LzmaCompressor::Itl { 00734 Itl(Direction direction, int level = -1) 00735 : direction(direction) 00736 { 00737 stream_ = LZMA_STREAM_INIT; 00738 00739 lzma_ret res; 00740 if (direction == COMPRESS) { 00741 res = lzma_easy_encoder(&stream_, level, LZMA_CHECK_CRC32); 00742 } 00743 else { 00744 res = lzma_stream_decoder(&stream_, 100 * 1024 * 1024, 0 /* flags */); 00745 } 00746 00747 if (res != LZMA_OK) 00748 throw ML::Exception("LZMA compressor init for direction %d: %s", 00749 direction, 00750 lzma_strerror(res).c_str()); 00751 } 00752 00753 ~Itl() 00754 { 00755 lzma_end(&stream_); 00756 } 00757 00758 Direction direction; 00759 lzma_stream stream_; 00760 00761 void before( const char*& src_begin, const char* src_end, 00762 char*& dest_begin, char* dest_end ) 00763 { 00764 stream_.next_in = reinterpret_cast<const uint8_t*>(src_begin); 00765 stream_.avail_in = static_cast<size_t>(src_end - src_begin); 00766 stream_.next_out = reinterpret_cast<uint8_t*>(dest_begin); 00767 stream_.avail_out= static_cast<size_t>(dest_end - dest_begin); 00768 } 00769 00770 void after(const char*& src_begin, char*& dest_begin, bool compress) 00771 { 00772 const char* next_in = reinterpret_cast<const char*>(stream_.next_in); 00773 char* next_out = reinterpret_cast<char*>(stream_.next_out); 00774 src_begin = next_in; 00775 dest_begin = next_out; 00776 } 00777 00778 std::string lzma_strerror(lzma_ret code) 00779 { 00780 switch (code) { 00781 case LZMA_OK: return "Operation completed successfully"; 00782 case LZMA_STREAM_END: return "End of stream was reached"; 00783 case LZMA_NO_CHECK: return "Input stream has no integrity check"; 00784 case LZMA_UNSUPPORTED_CHECK: return "Cannot calculate the integrity check"; 00785 case LZMA_GET_CHECK: return "Integrity check type is now available"; 00786 case LZMA_MEM_ERROR: return "Cannot allocate memory"; 00787 case LZMA_MEMLIMIT_ERROR: return "Memory usage limit was reached"; 00788 case LZMA_FORMAT_ERROR: return "File format not recognized"; 00789 case LZMA_OPTIONS_ERROR: return "Invalid or unsupported options"; 00790 case LZMA_DATA_ERROR: return "Data is corrupt"; 00791 case LZMA_BUF_ERROR: return "No progress is possible"; 00792 case LZMA_PROG_ERROR: return "Programming error"; 00793 default: return ML::format("lzma_ret(%d)", code); 00794 } 00795 } 00796 00797 void process(const char * & src_begin, 00798 const char * & src_end, 00799 char * & dest_begin, 00800 char * & dest_end, 00801 FlushLevel level) 00802 { 00803 lzma_action action; 00804 00805 if (direction == COMPRESS) { 00806 switch (level) { 00807 case FLUSH_NONE: action = LZMA_RUN; break; 00808 case FLUSH_SYNC: action = LZMA_SYNC_FLUSH; break; 00809 case FLUSH_FULL: action = LZMA_FULL_FLUSH; break; 00810 case FLUSH_FINISH: action = LZMA_FINISH; break; 00811 default: 00812 throw Exception("invalid flush level for LZMA processing"); 00813 } 00814 } else { 00815 action = LZMA_RUN; 00816 } 00817 00818 for (;;) { 00819 lzma_ret result = LZMA_OK; 00820 00821 before(src_begin, src_end, dest_begin, dest_end); 00822 00823 result = lzma_code(&stream_, action); 00824 00825 after(src_begin, dest_begin, direction == COMPRESS); 00826 00827 //cerr << "got result " << lzma_strerror(result) << " for action " 00828 // << action << endl; 00829 00830 if (result == LZMA_OK && action != LZMA_RUN) continue; 00831 if (result == LZMA_OK && action == LZMA_RUN) break; 00832 if (result == LZMA_STREAM_END) break; 00833 00834 throw ML::Exception("lzma error %d on %s at byte %d with action %d: %s", 00835 result, 00836 (direction == COMPRESS ? "compression" : "decompression"), 00837 (int)stream_.total_in, action, 00838 lzma_strerror(result).c_str()); 00839 } 00840 } 00841 }; 00842 00843 LzmaCompressor:: 00844 LzmaCompressor(int level) 00845 : itl(new Itl(COMPRESS, level)) 00846 { 00847 } 00848 00849 LzmaCompressor:: 00850 LzmaCompressor(Direction dir) 00851 : itl(new Itl(dir)) 00852 { 00853 } 00854 00855 LzmaCompressor:: 00856 ~LzmaCompressor() 00857 { 00858 } 00859 00860 void 00861 LzmaCompressor:: 00862 process(const char * src_begin, const char * src_end, 00863 FlushLevel level, 00864 boost::function<void ()> onMessageDone) 00865 { 00866 size_t buffer_size = 65536; 00867 00868 //cerr << "filter " << src_end - src_begin << " bytes with level " 00869 // << level << " direction " << itl->direction 00870 // << endl; 00871 00872 do { 00873 char dest[buffer_size]; 00874 char * dest_begin = dest; 00875 char * dest_end = dest_begin + buffer_size; 00876 00877 if (src_end != src_begin || level == FLUSH_FINISH) { 00878 itl->process(src_begin, src_end, dest_begin, dest_end, level); 00879 } 00880 00881 bool done = src_begin == src_end; 00882 00883 size_t bytes_written = dest_begin - dest; 00884 //cerr << "calling onOutput with " << bytes_written << " bytes" 00885 // << " done = " << done << endl; 00886 //ML::hex_dump(dest, bytes_written); 00887 onOutput(dest, bytes_written, done ? level : FLUSH_NONE, 00888 done ? onMessageDone : boost::function<void ()>()); 00889 00890 } while (src_begin != src_end); 00891 } 00892 00893 00894 /*****************************************************************************/ 00895 /* LZMA DECOMPRESSOR */ 00896 /*****************************************************************************/ 00897 00898 LzmaDecompressor:: 00899 LzmaDecompressor() 00900 : LzmaCompressor(DECOMPRESS) 00901 { 00902 } 00903 00904 LzmaDecompressor:: 00905 ~LzmaDecompressor() 00906 { 00907 } 00908 00909 00910 } // namespace Datacratic