RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/compressor.cc
00001 /* compressor.cc
00002    Jeremy Barnes, 19 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Implementation of compressor abstraction.
00006 */
00007 
00008 #include "compressor.h"
00009 #include "jml/utils/exc_assert.h"
00010 #include <zlib.h>
00011 #include <iostream>
00012 
00013 using namespace std;
00014 
00015 namespace Datacratic {
00016 
00017 
00018 /*****************************************************************************/
00019 /* COMPRESSOR                                                                */
00020 /*****************************************************************************/
00021 
00022 Compressor::
00023 ~Compressor()
00024 {
00025 }
00026 
00027 namespace {
00028 
00029 bool ends_with(const std::string & str, const std::string & what)
00030 {
00031     string::size_type result = str.rfind(what);
00032     return result != string::npos
00033         && result == str.size() - what.size();
00034 }
00035 
00036 } // file scope
00037 
00038 std::string
00039 Compressor::
00040 filenameToCompression(const std::string & filename)
00041 {
00042     if (ends_with(filename, ".gz") || ends_with(filename, ".gz~"))
00043         return "gzip";
00044     if (ends_with(filename, ".bz2") || ends_with(filename, ".bz2~"))
00045         return "bzip2";
00046     if (ends_with(filename, ".xz") || ends_with(filename, ".xz~"))
00047         return "lzma";
00048     return "none";
00049 }
00050 
00051 Compressor *
00052 Compressor::
00053 create(const std::string & compression,
00054        int level)
00055 {
00056     if (compression == "gzip" || compression == "gz")
00057         return new GzipCompressor(level);
00058     else if (compression == "" || compression == "none")
00059         return new NullCompressor();
00060     else throw ML::Exception("unknown compression %s:%d", compression.c_str(),
00061                              level);
00062 }
00063 
00064 
00065 /*****************************************************************************/
00066 /* NULL COMPRESSOR                                                           */
00067 /*****************************************************************************/
00068 
00069 NullCompressor::
00070 NullCompressor()
00071 {
00072 }
00073 
00074 NullCompressor::
00075 ~NullCompressor()
00076 {
00077 }
00078 
00079 size_t
00080 NullCompressor::
00081 compress(const char * data, size_t len, const OnData & onData)
00082 {
00083     size_t done = 0;
00084 
00085     while (done < len)
00086         done += onData(data + done, len - done);
00087     
00088     ExcAssertEqual(done, len);
00089 
00090     return done;
00091 }
00092     
00093 size_t
00094 NullCompressor::
00095 flush(FlushLevel flushLevel, const OnData & onData)
00096 {
00097     return 0;
00098 }
00099 
00100 size_t
00101 NullCompressor::
00102 finish(const OnData & onData)
00103 {
00104     return 0;
00105 }
00106 
00107 
00108 /*****************************************************************************/
00109 /* GZIP COMPRESSOR                                                           */
00110 /*****************************************************************************/
00111 
00112 struct GzipCompressor::Itl : public z_stream {
00113 
00114     Itl(int compressionLevel)
00115     {
00116         zalloc = 0;
00117         zfree = 0;
00118         opaque = 0;
00119         int res = deflateInit2(this, compressionLevel, Z_DEFLATED, 15 + 16, 9,
00120                                Z_DEFAULT_STRATEGY);
00121         if (res != Z_OK)
00122             throw ML::Exception("deflateInit2 failed");
00123     }
00124 
00125     ~Itl()
00126     {
00127         deflateEnd(this);
00128     }
00129 
00130     size_t pump(const char * data, size_t len, const OnData & onData,
00131                 int flushLevel)
00132     {
00133         size_t bufSize = 131072;
00134         char output[bufSize];
00135         next_in = (Bytef *)data;
00136         avail_in = len;
00137         size_t result = 0;
00138 
00139         do {
00140             next_out = (Bytef *)output;
00141             avail_out = bufSize;
00142 
00143             int res = deflate(this, flushLevel);
00144 
00145             
00146             //cerr << "pumping " << len << " bytes through with flushLevel "
00147             //     << flushLevel << " returned " << res << endl;
00148 
00149             size_t bytesWritten = (const char *)next_out - output;
00150 
00151             switch (res) {
00152             case Z_OK:
00153                 if (bytesWritten)
00154                     onData(output, bytesWritten);
00155                 result += bytesWritten;
00156                 break;
00157 
00158             case Z_STREAM_ERROR:
00159                 throw ML::Exception("Stream error on zlib");
00160 
00161             case Z_STREAM_END:
00162                 if (bytesWritten)
00163                     onData(output, bytesWritten);
00164                 result += bytesWritten;
00165                 return result;
00166 
00167             default:
00168                 throw ML::Exception("unknown output from deflate");
00169             };
00170         } while (avail_in != 0);
00171 
00172         if (flushLevel == Z_FINISH)
00173             throw ML::Exception("finished without getting to Z_STREAM_END");
00174 
00175         return result;
00176     }
00177 
00178 
00179     size_t compress(const char * data, size_t len, const OnData & onData)
00180     {
00181         return pump(data, len, onData, Z_NO_FLUSH);
00182     }
00183 
00184     size_t flush(FlushLevel flushLevel, const OnData & onData)
00185     {
00186         int zlibFlushLevel;
00187         switch (flushLevel) {
00188         case FLUSH_NONE:       zlibFlushLevel = Z_NO_FLUSH;       break;
00189         case FLUSH_AVAILABLE:  zlibFlushLevel = Z_PARTIAL_FLUSH;  break;
00190         case FLUSH_SYNC:       zlibFlushLevel = Z_SYNC_FLUSH;     break;
00191         case FLUSH_RESTART:    zlibFlushLevel = Z_FULL_FLUSH;     break;
00192         default:
00193             throw ML::Exception("bad flush level");
00194         }
00195 
00196         return pump(0, 0, onData, zlibFlushLevel);
00197     }
00198 
00199     size_t finish(const OnData & onData)
00200     {
00201         return pump(0, 0, onData, Z_FINISH);
00202     }
00203 };
00204 
00205 GzipCompressor::
00206 GzipCompressor(int compressionLevel)
00207 {
00208     itl.reset(new Itl(compressionLevel));
00209 }
00210 
00211 GzipCompressor::
00212 ~GzipCompressor()
00213 {
00214 }
00215 
00216 void
00217 GzipCompressor::
00218 open(int compressionLevel)
00219 {
00220     itl.reset(new Itl(compressionLevel));
00221 }
00222 
00223 size_t
00224 GzipCompressor::
00225 compress(const char * data, size_t len, const OnData & onData)
00226 {
00227     return itl->compress(data, len, onData);
00228 }
00229     
00230 size_t
00231 GzipCompressor::
00232 flush(FlushLevel flushLevel, const OnData & onData)
00233 {
00234     return itl->flush(flushLevel, onData);
00235 }
00236 
00237 size_t
00238 GzipCompressor::
00239 finish(const OnData & onData)
00240 {
00241     return itl->finish(onData);
00242 }
00243 
00244 
00245 /*****************************************************************************/
00246 /* LZMA COMPRESSOR                                                           */
00247 /*****************************************************************************/
00248 
00249 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator