RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* compressor.cc 00002 Jeremy Barnes, 19 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Implementation of compressor abstraction. 00006 */ 00007 00008 #include "compressor.h" 00009 #include "jml/utils/exc_assert.h" 00010 #include <zlib.h> 00011 #include <iostream> 00012 00013 using namespace std; 00014 00015 namespace Datacratic { 00016 00017 00018 /*****************************************************************************/ 00019 /* COMPRESSOR */ 00020 /*****************************************************************************/ 00021 00022 Compressor:: 00023 ~Compressor() 00024 { 00025 } 00026 00027 namespace { 00028 00029 bool ends_with(const std::string & str, const std::string & what) 00030 { 00031 string::size_type result = str.rfind(what); 00032 return result != string::npos 00033 && result == str.size() - what.size(); 00034 } 00035 00036 } // file scope 00037 00038 std::string 00039 Compressor:: 00040 filenameToCompression(const std::string & filename) 00041 { 00042 if (ends_with(filename, ".gz") || ends_with(filename, ".gz~")) 00043 return "gzip"; 00044 if (ends_with(filename, ".bz2") || ends_with(filename, ".bz2~")) 00045 return "bzip2"; 00046 if (ends_with(filename, ".xz") || ends_with(filename, ".xz~")) 00047 return "lzma"; 00048 return "none"; 00049 } 00050 00051 Compressor * 00052 Compressor:: 00053 create(const std::string & compression, 00054 int level) 00055 { 00056 if (compression == "gzip" || compression == "gz") 00057 return new GzipCompressor(level); 00058 else if (compression == "" || compression == "none") 00059 return new NullCompressor(); 00060 else throw ML::Exception("unknown compression %s:%d", compression.c_str(), 00061 level); 00062 } 00063 00064 00065 /*****************************************************************************/ 00066 /* NULL COMPRESSOR */ 00067 /*****************************************************************************/ 00068 00069 NullCompressor:: 00070 NullCompressor() 00071 { 00072 } 00073 00074 NullCompressor:: 00075 ~NullCompressor() 00076 { 00077 } 00078 00079 size_t 00080 NullCompressor:: 00081 compress(const char * data, size_t len, const OnData & onData) 00082 { 00083 size_t done = 0; 00084 00085 while (done < len) 00086 done += onData(data + done, len - done); 00087 00088 ExcAssertEqual(done, len); 00089 00090 return done; 00091 } 00092 00093 size_t 00094 NullCompressor:: 00095 flush(FlushLevel flushLevel, const OnData & onData) 00096 { 00097 return 0; 00098 } 00099 00100 size_t 00101 NullCompressor:: 00102 finish(const OnData & onData) 00103 { 00104 return 0; 00105 } 00106 00107 00108 /*****************************************************************************/ 00109 /* GZIP COMPRESSOR */ 00110 /*****************************************************************************/ 00111 00112 struct GzipCompressor::Itl : public z_stream { 00113 00114 Itl(int compressionLevel) 00115 { 00116 zalloc = 0; 00117 zfree = 0; 00118 opaque = 0; 00119 int res = deflateInit2(this, compressionLevel, Z_DEFLATED, 15 + 16, 9, 00120 Z_DEFAULT_STRATEGY); 00121 if (res != Z_OK) 00122 throw ML::Exception("deflateInit2 failed"); 00123 } 00124 00125 ~Itl() 00126 { 00127 deflateEnd(this); 00128 } 00129 00130 size_t pump(const char * data, size_t len, const OnData & onData, 00131 int flushLevel) 00132 { 00133 size_t bufSize = 131072; 00134 char output[bufSize]; 00135 next_in = (Bytef *)data; 00136 avail_in = len; 00137 size_t result = 0; 00138 00139 do { 00140 next_out = (Bytef *)output; 00141 avail_out = bufSize; 00142 00143 int res = deflate(this, flushLevel); 00144 00145 00146 //cerr << "pumping " << len << " bytes through with flushLevel " 00147 // << flushLevel << " returned " << res << endl; 00148 00149 size_t bytesWritten = (const char *)next_out - output; 00150 00151 switch (res) { 00152 case Z_OK: 00153 if (bytesWritten) 00154 onData(output, bytesWritten); 00155 result += bytesWritten; 00156 break; 00157 00158 case Z_STREAM_ERROR: 00159 throw ML::Exception("Stream error on zlib"); 00160 00161 case Z_STREAM_END: 00162 if (bytesWritten) 00163 onData(output, bytesWritten); 00164 result += bytesWritten; 00165 return result; 00166 00167 default: 00168 throw ML::Exception("unknown output from deflate"); 00169 }; 00170 } while (avail_in != 0); 00171 00172 if (flushLevel == Z_FINISH) 00173 throw ML::Exception("finished without getting to Z_STREAM_END"); 00174 00175 return result; 00176 } 00177 00178 00179 size_t compress(const char * data, size_t len, const OnData & onData) 00180 { 00181 return pump(data, len, onData, Z_NO_FLUSH); 00182 } 00183 00184 size_t flush(FlushLevel flushLevel, const OnData & onData) 00185 { 00186 int zlibFlushLevel; 00187 switch (flushLevel) { 00188 case FLUSH_NONE: zlibFlushLevel = Z_NO_FLUSH; break; 00189 case FLUSH_AVAILABLE: zlibFlushLevel = Z_PARTIAL_FLUSH; break; 00190 case FLUSH_SYNC: zlibFlushLevel = Z_SYNC_FLUSH; break; 00191 case FLUSH_RESTART: zlibFlushLevel = Z_FULL_FLUSH; break; 00192 default: 00193 throw ML::Exception("bad flush level"); 00194 } 00195 00196 return pump(0, 0, onData, zlibFlushLevel); 00197 } 00198 00199 size_t finish(const OnData & onData) 00200 { 00201 return pump(0, 0, onData, Z_FINISH); 00202 } 00203 }; 00204 00205 GzipCompressor:: 00206 GzipCompressor(int compressionLevel) 00207 { 00208 itl.reset(new Itl(compressionLevel)); 00209 } 00210 00211 GzipCompressor:: 00212 ~GzipCompressor() 00213 { 00214 } 00215 00216 void 00217 GzipCompressor:: 00218 open(int compressionLevel) 00219 { 00220 itl.reset(new Itl(compressionLevel)); 00221 } 00222 00223 size_t 00224 GzipCompressor:: 00225 compress(const char * data, size_t len, const OnData & onData) 00226 { 00227 return itl->compress(data, len, onData); 00228 } 00229 00230 size_t 00231 GzipCompressor:: 00232 flush(FlushLevel flushLevel, const OnData & onData) 00233 { 00234 return itl->flush(flushLevel, onData); 00235 } 00236 00237 size_t 00238 GzipCompressor:: 00239 finish(const OnData & onData) 00240 { 00241 return itl->finish(onData); 00242 } 00243 00244 00245 /*****************************************************************************/ 00246 /* LZMA COMPRESSOR */ 00247 /*****************************************************************************/ 00248 00249 } // namespace Datacratic