RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/filter.cc
00001 /* filter.cc
00002    Jeremy Barnes, 30 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Logging output filters.
00006 */
00007 
00008 #include "filter.h"
00009 #include "zlib.h"
00010 #include "jml/arch/exception.h"
00011 #include <iostream>
00012 #include <boost/iostreams/filter/zlib.hpp>
00013 #include <boost/iostreams/filter/bzip2.hpp>
00014 #include <boost/iostreams/filter/gzip.hpp>
00015 #include "jml/utils/hex_dump.h"
00016 #include "jml/utils/string_functions.h"
00017 #include "lzma.h"
00018 
00019 
00020 using namespace std;
00021 using namespace ML;
00022 
00023 
00024 namespace Datacratic {
00025 
00026 std::string print(Direction dir)
00027 {
00028     switch (dir) {
00029     case COMPRESS: return "COMPRESS";
00030     case DECOMPRESS: return "DECOMPRESS";
00031     default: return ML::format("Direction(%d)", dir);
00032     }
00033 }
00034 
00035 std::ostream & operator << (std::ostream & stream, Direction dir)
00036 {
00037     return stream << print(dir);
00038 }
00039 
00040 std::string print(FlushLevel lvl)
00041 {
00042     switch (lvl) {
00043     case FLUSH_NONE: return "FLUSH_NONE";
00044     case FLUSH_SYNC: return "FLUSH_SYNC";
00045     case FLUSH_FULL: return "FLUSH_FULL";
00046     case FLUSH_FINISH: return "FLUSH_FINISH";
00047     default: return ML::format("FlushLevel(%d)", lvl);
00048     }
00049 }
00050 
00051 std::ostream & operator << (std::ostream & stream, FlushLevel lvl)
00052 {
00053     return stream << print(lvl);
00054 }
00055 
00056 
00057 /*****************************************************************************/
00058 /* FILTER                                                                    */
00059 /*****************************************************************************/
00060 
00061 Filter::
00062 ~Filter()
00063 {
00064 }
00065 
00066 void
00067 Filter::
00068 flush(FlushLevel level, boost::function<void ()> onFlushDone)
00069 {
00070     process(0, 0, level, onFlushDone);
00071 }
00072 
00073 void
00074 Filter::
00075 process(const std::string & buf,
00076         FlushLevel level,
00077         boost::function<void ()> onFilterDone)
00078 {
00079     process(buf.c_str(),
00080             buf.c_str() + buf.length(),
00081             level,
00082             onFilterDone);
00083 }
00084 
00085 Filter *
00086 Filter::
00087 create(const std::string & extension,
00088        Direction direction)
00089 {
00090     if (extension == "z") {
00091         if (direction == COMPRESS) return new ZlibCompressor();
00092         else return new ZlibDecompressor();
00093     }
00094     else if (extension == "bz" || extension == "bz2") {
00095         if (direction == COMPRESS) return new Bzip2Compressor();
00096         else return new Bzip2Decompressor();
00097     }
00098     else if (extension == "xz" || extension == "lzma") {
00099         if (direction == COMPRESS) return new LzmaCompressor();
00100         else return new LzmaDecompressor();
00101     }
00102     else return new IdentityFilter();
00103 }
00104 
00105 
00106 /*****************************************************************************/
00107 /* IDENTITY FILTER                                                           */
00108 /*****************************************************************************/
00109 
00110 void
00111 IdentityFilter::
00112 process(const char * src_begin, const char * src_end,
00113         FlushLevel level,
00114         boost::function<void ()> onMessageDone)
00115 {
00116     onOutput(src_begin, src_end - src_begin, level, onMessageDone);
00117 }
00118 
00119 
00120 /*****************************************************************************/
00121 /* FILTER STACK                                                              */
00122 /*****************************************************************************/
00123 
00124 void
00125 FilterStack::
00126 process(const char * src_begin, const char * src_end,
00127         FlushLevel level,
00128         boost::function<void ()> onMessageDone)
00129 {
00130     if (filters.empty())
00131         throw Exception("empty filter stack cannot process anything");
00132 
00133     const std::shared_ptr<Filter> & f
00134         = filters.front();
00135     f->process(src_begin, src_end, level, onMessageDone);
00136 }
00137 
00138 void
00139 FilterStack::
00140 push(std::shared_ptr<Filter> filter)
00141 {
00142     if (!filters.empty()) {
00143         filters.back()->onOutput = [=] (const char * p, size_t n, FlushLevel f,
00144                                         boost::function<void ()> cb)
00145             {
00146                 filter->process(p, p + n, f, cb);
00147             };
00148     }
00149     
00150     filter->onOutput = [=] (const char * p, size_t n, FlushLevel f,
00151                             boost::function<void ()> cb)
00152         {
00153             this->onOutput(p, n, f, cb);
00154         };
00155 
00156     filters.push_back(filter);
00157 }
00158 
00159 std::shared_ptr<Filter>
00160 FilterStack::
00161 pop()
00162 {
00163     throw Exception("can't pop from a filter stack");
00164 }
00165 
00166 
00167 /*****************************************************************************/
00168 /* ZLIB COMPRESSOR                                                           */
00169 /*****************************************************************************/
00170 
00171 using namespace boost::iostreams;
00172 
00173 struct ZlibCompressor::Itl
00174     : public boost::iostreams::detail::zlib_base {
00175 
00176     typedef boost::iostreams::detail::zlib_base Compressor;
00177 
00178     Itl(const boost::iostreams::zlib_params& p,
00179         Direction direction)
00180         : direction(direction)
00181     {
00182         detail::zlib_allocator<std::allocator<char> > alloc;
00183         init(p, direction == COMPRESS, alloc);
00184     }
00185     
00186     ~Itl()
00187     {
00188         reset(direction == COMPRESS, false);
00189     }
00190 
00191     void process(const char * & src_begin,
00192                  const char * & src_end,
00193                  char * & dest_begin,
00194                  char * & dest_end,
00195                  FlushLevel level)
00196     {
00197         int flush;
00198         
00199         switch (level) {
00200         case FLUSH_NONE:   flush = zlib::no_flush;    break;
00201         case FLUSH_SYNC:   flush = zlib::sync_flush;  break;
00202         case FLUSH_FULL:   flush = Z_FULL_FLUSH;      break;
00203         case FLUSH_FINISH: flush = zlib::finish;      break;
00204         default:
00205             throw Exception("invalid flush level");
00206         }
00207 
00208         int result = Z_OK;
00209 
00210         before(src_begin, src_end, dest_begin, dest_end);
00211 
00212         //cerr << "calling " << (direction == COMPRESS ? "deflate" : "inflate")
00213         //     << endl;
00214 
00215         //ML::hex_dump(src_begin, src_end - src_begin);
00216 
00217         result = (direction == COMPRESS
00218                   ? xdeflate(flush)
00219                   : xinflate(zlib::no_flush));
00220 
00221         after(src_begin, dest_begin, direction == COMPRESS);
00222 
00223         switch (result) {
00224         case Z_OK: 
00225         case Z_STREAM_END: 
00226             //case Z_BUF_ERROR: 
00227             break;
00228         case Z_MEM_ERROR: 
00229             boost::throw_exception(std::bad_alloc());
00230         default:
00231             throw ML::Exception("zlib error %d on %s at byte %d with flush %d: %s",
00232                                 result,
00233                                 (direction == COMPRESS ? "compression" : "decompression"),
00234                                 total_in(), level,
00235                                 zError(result));
00236         };
00237     }
00238 
00239     Direction direction;
00240 };
00241 
00242 const zlib_params ZlibCompressor::
00243 DEFAULT_PARAMS(zlib::default_compression,
00244                zlib::deflated,
00245                zlib::default_window_bits, 
00246                zlib::default_mem_level, 
00247                zlib::default_strategy,
00248                true /* no header */,
00249                true /* no CRC */);
00250 
00251 ZlibCompressor::
00252 ZlibCompressor(const zlib_params& p)
00253     : itl(new Itl(p, COMPRESS))
00254 {
00255 }
00256     
00257 ZlibCompressor::
00258 ZlibCompressor(const zlib_params& p, Direction dir)
00259     : itl(new Itl(p, dir))
00260 {
00261 }
00262     
00263 ZlibCompressor::
00264 ~ZlibCompressor()
00265 {
00266 }
00267 
00268 void
00269 ZlibCompressor::
00270 process(const char * src_begin, const char * src_end,
00271         FlushLevel level,
00272         boost::function<void ()> onMessageDone)
00273 {
00274     size_t buffer_size = 65536;
00275         
00276     //cerr << "filter " << src_end - src_begin << " bytes with level "
00277     //     << level << " direction " << itl->direction
00278     //     << endl;
00279 
00280     do {
00281         char dest[buffer_size];
00282         char * dest_begin = dest;
00283         char * dest_end = dest_begin + buffer_size;
00284             
00285         if (src_end != src_begin || level == FLUSH_FINISH) {
00286             itl->process(src_begin, src_end, dest_begin, dest_end, level);
00287         }
00288 
00289         bool done = src_begin == src_end;
00290 
00291         size_t bytes_written = dest_begin - dest;
00292         //cerr << "calling onOutput with " << bytes_written << " bytes"
00293         //     << " done = " << done << endl;
00294         //ML::hex_dump(dest, bytes_written);
00295         onOutput(dest, bytes_written, done ? level : FLUSH_NONE,
00296                  done ? onMessageDone : boost::function<void ()>());
00297             
00298     } while (src_begin != src_end);
00299 }
00300 
00301 
00302 /*****************************************************************************/
00303 /* ZLIB DECOMPRESSOR                                                         */
00304 /*****************************************************************************/
00305 
00306 ZlibDecompressor::
00307 ZlibDecompressor(const boost::iostreams::zlib_params& p)
00308     : ZlibCompressor(p, DECOMPRESS)
00309 {
00310 }
00311     
00312 ZlibDecompressor::
00313 ~ZlibDecompressor()
00314 {
00315 }
00316 
00317 /*****************************************************************************/
00318 /* GZIP COMPRESSOR                                                           */
00319 /*****************************************************************************/
00320 
00321 using namespace boost::iostreams;
00322 
00323 struct GzipCompressorFilter::Itl : public boost::iostreams::gzip_compressor {
00324     
00325     typedef boost::iostreams::gzip_compressor Compressor;
00326 
00327     Itl(const boost::iostreams::gzip_params& p)
00328         : Compressor(p)
00329     {
00330     }
00331     
00332     ~Itl()
00333     {
00334     }
00335 
00336     void process(const char * & src_begin,
00337                  const char * & src_end,
00338                  char * & dest_begin,
00339                  char * & dest_end,
00340                  FlushLevel level)
00341     {
00342 #if 0
00343         int flush;
00344         
00345         switch (level) {
00346         case FLUSH_NONE:   flush = gzip::no_flush;    break;
00347         case FLUSH_SYNC:   flush = gzip::sync_flush;  break;
00348         case FLUSH_FULL:   flush = Z_FULL_FLUSH;      break;
00349         case FLUSH_FINISH: flush = gzip::finish;      break;
00350         default:
00351             throw Exception("invalid flush level");
00352         }
00353 #endif
00354 
00355         struct Source {
00356             Source(const char * & src_begin,
00357                    const char * & src_end)
00358                 : src_begin(src_begin),
00359                   src_end(src_end)
00360             {
00361             }
00362             
00363             typedef char char_type;
00364             struct category
00365                 : dual_use,
00366                   filter_tag,
00367                   multichar_tag,
00368                   closable_tag {
00369             };
00370 
00371             const char * & src_begin;
00372             const char * & src_end;
00373 
00374             size_t read(char * buf, size_t n)
00375             {
00376                 size_t left = std::distance(src_begin, src_end);
00377                 size_t todo = std::min(left, n);
00378                 std::copy(src_begin, src_begin + todo, buf);
00379                 src_begin += todo;
00380 
00381                 //cerr << "source: presented " << todo << " of "
00382                 //     << left << " bytes (wanted " << n << ")" << endl;
00383 
00384                 return todo;
00385             }
00386         };
00387 
00388         Source source(src_begin, src_end);
00389 
00390         ssize_t n = read(source, dest_begin, dest_end - dest_begin);
00391         //cerr << "got " << n << " bytes in output" << endl;
00392 
00393         if (n != -1) {
00394             dest_begin += n;
00395         }
00396     }
00397 
00398     Direction direction;
00399 };
00400 
00401 const gzip_params GzipCompressorFilter::
00402 DEFAULT_PARAMS;
00403 
00404 GzipCompressorFilter::
00405 GzipCompressorFilter(const gzip_params& p)
00406     : itl(new Itl(p))
00407 {
00408 }
00409     
00410 GzipCompressorFilter::
00411 ~GzipCompressorFilter()
00412 {
00413 }
00414 
00415 void
00416 GzipCompressorFilter::
00417 process(const char * src_begin, const char * src_end,
00418         FlushLevel level,
00419         boost::function<void ()> onMessageDone)
00420 {
00421     size_t buffer_size = 65536;
00422     
00423     //cerr << "filter " << src_end - src_begin << " bytes with level "
00424     //     << level << " direction " << itl->direction
00425     //     << endl;
00426 
00427     do {
00428         char dest[buffer_size];
00429         char * dest_begin = dest;
00430         char * dest_end = dest_begin + buffer_size;
00431             
00432         if (src_end != src_begin || level == FLUSH_FINISH) {
00433             itl->process(src_begin, src_end, dest_begin, dest_end, level);
00434         }
00435 
00436         bool done = src_begin == src_end;
00437 
00438         size_t bytes_written = dest_begin - dest;
00439         //cerr << "calling onOutput with " << bytes_written << " bytes"
00440         //     << " done = " << done << endl;
00441         //ML::hex_dump(dest, bytes_written);
00442         onOutput(dest, bytes_written, done ? level : FLUSH_NONE,
00443                  done ? onMessageDone : boost::function<void ()>());
00444         
00445     } while (src_begin != src_end);
00446 }
00447 
00448 /*****************************************************************************/
00449 /* GZIP DECOMPRESSOR                                                         */
00450 /*****************************************************************************/
00451 
00452 using namespace boost::iostreams;
00453 
00454 struct GzipDecompressor::Itl : public boost::iostreams::gzip_decompressor {
00455     
00456     typedef boost::iostreams::gzip_decompressor Decompressor;
00457 
00458     Itl()
00459         : Decompressor()
00460     {
00461     }
00462     
00463     ~Itl()
00464     {
00465     }
00466 
00467     void process(const char * & src_begin,
00468                  const char * & src_end,
00469                  char * & dest_begin,
00470                  char * & dest_end,
00471                  FlushLevel level)
00472     {
00473         struct Source {
00474             Source(const char * & src_begin,
00475                    const char * & src_end)
00476                 : src_begin(src_begin),
00477                   src_end(src_end)
00478             {
00479             }
00480             
00481             typedef char char_type;
00482             struct category
00483                 : dual_use,
00484                   filter_tag,
00485                   multichar_tag,
00486                   closable_tag {
00487             };
00488 
00489             const char * & src_begin;
00490             const char * & src_end;
00491 
00492             ssize_t read(char * buf, size_t n)
00493             {
00494                 size_t left = std::distance(src_begin, src_end);
00495                 size_t todo = std::min(left, n);
00496                 std::copy(src_begin, src_begin + todo, buf);
00497                 src_begin += todo;
00498                 return todo;
00499             }
00500         };
00501 
00502         Source source(src_begin, src_end);
00503 
00504         ssize_t n = read(source, dest_begin, dest_end - dest_begin);
00505         
00506         dest_begin += n;
00507     }
00508 
00509     Direction direction;
00510 };
00511 
00512 GzipDecompressor::
00513 GzipDecompressor()
00514     : itl(new Itl())
00515 {
00516 }
00517     
00518 GzipDecompressor::
00519 ~GzipDecompressor()
00520 {
00521 }
00522 
00523 void
00524 GzipDecompressor::
00525 process(const char * src_begin, const char * src_end,
00526         FlushLevel level,
00527         boost::function<void ()> onMessageDone)
00528 {
00529     size_t buffer_size = 65536;
00530         
00531     //cerr << "filter " << src_end - src_begin << " bytes with level "
00532     //     << level << " direction " << itl->direction
00533     //     << endl;
00534 
00535     do {
00536         char dest[buffer_size];
00537         char * dest_begin = dest;
00538         char * dest_end = dest_begin + buffer_size;
00539             
00540         if (src_end != src_begin || level == FLUSH_FINISH) {
00541             itl->process(src_begin, src_end, dest_begin, dest_end, level);
00542         }
00543 
00544         bool done = src_begin == src_end;
00545 
00546         size_t bytes_written = dest_begin - dest;
00547         //cerr << "calling onOutput with " << bytes_written << " bytes"
00548         //     << " done = " << done << endl;
00549         //ML::hex_dump(dest, bytes_written);
00550         onOutput(dest, bytes_written, done ? level : FLUSH_NONE,
00551                  done ? onMessageDone : boost::function<void ()>());
00552             
00553     } while (src_begin != src_end);
00554 }
00555 
00556 
00557 /*****************************************************************************/
00558 /* BZIP2 COMPRESSOR                                                          */
00559 /*****************************************************************************/
00560 
00561 using namespace boost::iostreams;
00562 
00563 struct Bzip2Compressor::Itl
00564     : public boost::iostreams::detail::bzip2_base {
00565 
00566     typedef boost::iostreams::detail::bzip2_base Compressor;
00567 
00568     Itl(const boost::iostreams::bzip2_params& p,
00569         Direction direction)
00570         : Compressor(p), direction(direction)
00571     {
00572         detail::bzip2_allocator<std::allocator<char> > alloc;
00573         init(direction == COMPRESS, alloc);
00574     }
00575     
00576     ~Itl()
00577     {
00578     }
00579 
00580     void process(const char * & src_begin,
00581                  const char * & src_end,
00582                  char * & dest_begin,
00583                  char * & dest_end,
00584                  FlushLevel level)
00585     {
00586         int flush;
00587         
00588         switch (level) {
00589         case FLUSH_NONE:   flush = bzip2::run;         break;
00590         case FLUSH_FINISH: flush = bzip2::finish;      break;
00591         case FLUSH_SYNC:
00592         case FLUSH_FULL:
00593             throw ML::Exception("bzip2 doesn't support flushing");
00594         default:
00595             throw Exception("invalid flush level");
00596         }
00597 
00598         int result = Z_OK;
00599 
00600         before(src_begin, src_end, dest_begin, dest_end);
00601 
00602         //cerr << "calling " << (direction == COMPRESS ? "deflate" : "inflate")
00603         //     << endl;
00604 
00605         //ML::hex_dump(src_begin, src_end - src_begin);
00606 
00607         result = (direction == COMPRESS
00608                   ? compress(flush)
00609                   : decompress());
00610         
00611         after(src_begin, dest_begin);
00612 
00613         using namespace boost::iostreams::bzip2;
00614 
00615         if (result == ok
00616             || result == run_ok
00617             || result == finish_ok
00618             || result == flush_ok
00619             || result == stream_end) {
00620             ; 
00621         }
00622         else if (result == sequence_error)
00623             throw Exception("bzip2 sequence error");
00624         else if (result == param_error)
00625             throw Exception("bzip2 param error");
00626         else if (result == mem_error)
00627             throw Exception("bzip2 mem error");
00628         else if (result == data_error)
00629             throw Exception("bzip2 data error");
00630         else if (result == data_error_magic)
00631             throw Exception("bzip2 magic data error");
00632         else if (result == io_error)
00633             throw Exception("bzip2 io error");
00634         else if (result == unexpected_eof)
00635             throw Exception("bzip2 unexpected eof");
00636         else if (result == outbuff_full)
00637             throw Exception("bzip2 output buffer full");
00638         else if (result == config_error)
00639             throw Exception("bzip2 config error");
00640         else {
00641             throw Exception("unknown bzip2 error %d", result);
00642         }
00643 #if 0
00644         throw ML::Exception("bzip2 error %d on %s with flush %d: %s",
00645                                 result,
00646                                 (direction == COMPRESS
00647                                  ? "compression" : "decompression"),
00648                                 level,
00649                                 zError(result));
00650 #endif
00651     }
00652 
00653     Direction direction;
00654 };
00655 
00656 const bzip2_params Bzip2Compressor::DEFAULT_PARAMS;
00657 
00658 Bzip2Compressor::
00659 Bzip2Compressor(const bzip2_params& p)
00660     : itl(new Itl(p, COMPRESS))
00661 {
00662 }
00663     
00664 Bzip2Compressor::
00665 Bzip2Compressor(const bzip2_params& p, Direction dir)
00666     : itl(new Itl(p, dir))
00667 {
00668 }
00669     
00670 Bzip2Compressor::
00671 ~Bzip2Compressor()
00672 {
00673 }
00674 
00675 void
00676 Bzip2Compressor::
00677 process(const char * src_begin, const char * src_end,
00678         FlushLevel level,
00679         boost::function<void ()> onMessageDone)
00680 {
00681     size_t buffer_size = 65536;
00682         
00683     //cerr << "filter " << src_end - src_begin << " bytes with level "
00684     //     << level << " direction " << itl->direction
00685     //     << endl;
00686 
00687     do {
00688         char dest[buffer_size];
00689         char * dest_begin = dest;
00690         char * dest_end = dest_begin + buffer_size;
00691             
00692         if (src_end != src_begin || level == FLUSH_FINISH) {
00693             itl->process(src_begin, src_end, dest_begin, dest_end, level);
00694         }
00695 
00696         bool done = src_begin == src_end;
00697 
00698         size_t bytes_written = dest_begin - dest;
00699         //cerr << "calling onOutput with " << bytes_written << " bytes"
00700         //     << " done = " << done << endl;
00701         //ML::hex_dump(dest, bytes_written);
00702         onOutput(dest, bytes_written, done ? level : FLUSH_NONE,
00703                  done ? onMessageDone : boost::function<void ()>());
00704             
00705     } while (src_begin != src_end);
00706 }
00707 
00708 
00709 /*****************************************************************************/
00710 /* BZIP2 DECOMPRESSOR                                                        */
00711 /*****************************************************************************/
00712 
00713 const bzip2_params Bzip2Decompressor::DEFAULT_PARAMS(false /* small */);
00714 
00715 Bzip2Decompressor::
00716 Bzip2Decompressor(const boost::iostreams::bzip2_params& p)
00717     : Bzip2Compressor(p, DECOMPRESS)
00718 {
00719 }
00720     
00721 Bzip2Decompressor::
00722 ~Bzip2Decompressor()
00723 {
00724 }
00725 
00726 
00727 /*****************************************************************************/
00728 /* LZMA COMPRESSOR                                                           */
00729 /*****************************************************************************/
00730 
00731 using namespace boost::iostreams;
00732 
00733 struct LzmaCompressor::Itl {
00734     Itl(Direction direction, int level = -1)
00735         : direction(direction)
00736     {
00737         stream_ = LZMA_STREAM_INIT;
00738 
00739         lzma_ret res;
00740         if (direction == COMPRESS) {
00741             res = lzma_easy_encoder(&stream_, level, LZMA_CHECK_CRC32);
00742         }
00743         else {
00744             res = lzma_stream_decoder(&stream_, 100 * 1024 * 1024, 0 /* flags */);
00745         }
00746 
00747         if (res != LZMA_OK)
00748             throw ML::Exception("LZMA compressor init for direction %d: %s",
00749                                 direction,
00750                                 lzma_strerror(res).c_str());
00751     }
00752     
00753     ~Itl()
00754     {
00755         lzma_end(&stream_);
00756     }
00757 
00758     Direction direction;
00759     lzma_stream  stream_;
00760 
00761     void before( const char*& src_begin, const char* src_end,
00762                              char*& dest_begin, char* dest_end )
00763     {
00764         stream_.next_in = reinterpret_cast<const uint8_t*>(src_begin);
00765         stream_.avail_in = static_cast<size_t>(src_end - src_begin);
00766         stream_.next_out = reinterpret_cast<uint8_t*>(dest_begin);
00767         stream_.avail_out= static_cast<size_t>(dest_end - dest_begin);
00768     }
00769 
00770     void after(const char*& src_begin, char*& dest_begin, bool compress)
00771     {
00772         const char* next_in = reinterpret_cast<const char*>(stream_.next_in);
00773         char* next_out = reinterpret_cast<char*>(stream_.next_out);
00774         src_begin = next_in;
00775         dest_begin = next_out;
00776     }
00777 
00778     std::string lzma_strerror(lzma_ret code)
00779     {
00780         switch (code) {
00781         case LZMA_OK: return "Operation completed successfully";
00782         case LZMA_STREAM_END: return "End of stream was reached";
00783         case LZMA_NO_CHECK: return "Input stream has no integrity check";
00784         case LZMA_UNSUPPORTED_CHECK: return "Cannot calculate the integrity check";
00785         case LZMA_GET_CHECK: return "Integrity check type is now available";
00786         case LZMA_MEM_ERROR: return "Cannot allocate memory";
00787         case LZMA_MEMLIMIT_ERROR: return "Memory usage limit was reached";
00788         case LZMA_FORMAT_ERROR: return "File format not recognized";
00789         case LZMA_OPTIONS_ERROR: return "Invalid or unsupported options";
00790         case LZMA_DATA_ERROR: return "Data is corrupt";
00791         case LZMA_BUF_ERROR: return "No progress is possible";
00792         case LZMA_PROG_ERROR: return "Programming error";
00793         default: return ML::format("lzma_ret(%d)", code);
00794         }
00795     }
00796 
00797     void process(const char * & src_begin,
00798                  const char * & src_end,
00799                  char * & dest_begin,
00800                  char * & dest_end,
00801                  FlushLevel level)
00802     {
00803         lzma_action action;
00804         
00805         if (direction == COMPRESS) {
00806             switch (level) {
00807             case FLUSH_NONE:   action = LZMA_RUN;  break;
00808             case FLUSH_SYNC:   action = LZMA_SYNC_FLUSH;  break;
00809             case FLUSH_FULL:   action = LZMA_FULL_FLUSH;  break;
00810             case FLUSH_FINISH: action = LZMA_FINISH;      break;
00811             default:
00812                 throw Exception("invalid flush level for LZMA processing");
00813             }
00814         } else {
00815             action = LZMA_RUN;
00816         }
00817 
00818         for (;;) {
00819             lzma_ret result = LZMA_OK;
00820 
00821             before(src_begin, src_end, dest_begin, dest_end);
00822 
00823             result = lzma_code(&stream_, action);
00824 
00825             after(src_begin, dest_begin, direction == COMPRESS);
00826 
00827             //cerr << "got result " << lzma_strerror(result) << " for action "
00828             //     << action << endl;
00829 
00830             if (result == LZMA_OK && action != LZMA_RUN) continue;
00831             if (result == LZMA_OK && action == LZMA_RUN) break;
00832             if (result == LZMA_STREAM_END) break;
00833 
00834             throw ML::Exception("lzma error %d on %s at byte %d with action %d: %s",
00835                                 result,
00836                                 (direction == COMPRESS ? "compression" : "decompression"),
00837                                 (int)stream_.total_in, action,
00838                                 lzma_strerror(result).c_str());
00839         }
00840     }
00841 };
00842 
00843 LzmaCompressor::
00844 LzmaCompressor(int level)
00845     : itl(new Itl(COMPRESS, level))
00846 {
00847 }
00848     
00849 LzmaCompressor::
00850 LzmaCompressor(Direction dir)
00851     : itl(new Itl(dir))
00852 {
00853 }
00854     
00855 LzmaCompressor::
00856 ~LzmaCompressor()
00857 {
00858 }
00859 
00860 void
00861 LzmaCompressor::
00862 process(const char * src_begin, const char * src_end,
00863         FlushLevel level,
00864         boost::function<void ()> onMessageDone)
00865 {
00866     size_t buffer_size = 65536;
00867         
00868     //cerr << "filter " << src_end - src_begin << " bytes with level "
00869     //     << level << " direction " << itl->direction
00870     //     << endl;
00871 
00872     do {
00873         char dest[buffer_size];
00874         char * dest_begin = dest;
00875         char * dest_end = dest_begin + buffer_size;
00876             
00877         if (src_end != src_begin || level == FLUSH_FINISH) {
00878             itl->process(src_begin, src_end, dest_begin, dest_end, level);
00879         }
00880 
00881         bool done = src_begin == src_end;
00882 
00883         size_t bytes_written = dest_begin - dest;
00884         //cerr << "calling onOutput with " << bytes_written << " bytes"
00885         //     << " done = " << done << endl;
00886         //ML::hex_dump(dest, bytes_written);
00887         onOutput(dest, bytes_written, done ? level : FLUSH_NONE,
00888                  done ? onMessageDone : boost::function<void ()>());
00889             
00890     } while (src_begin != src_end);
00891 }
00892 
00893 
00894 /*****************************************************************************/
00895 /* LZMA DECOMPRESSOR                                                         */
00896 /*****************************************************************************/
00897 
00898 LzmaDecompressor::
00899 LzmaDecompressor()
00900     : LzmaCompressor(DECOMPRESS)
00901 {
00902 }
00903     
00904 LzmaDecompressor::
00905 ~LzmaDecompressor()
00906 {
00907 }
00908 
00909 
00910 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator