![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* filter.h -*- C++ -*- 00002 Jeremy Barnes, 29 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #ifndef __logger__filter_h__ 00008 #define __logger__filter_h__ 00009 00010 00011 #include <boost/function.hpp> 00012 #include <boost/shared_ptr.hpp> 00013 #include "soa/sigslot/slot.h" 00014 00015 namespace boost { 00016 namespace iostreams { 00017 00018 struct zlib_params; 00019 struct bzip2_params; 00020 struct gzip_params; 00021 00022 } // namespace iostreams 00023 } // namespace boost 00024 00025 00026 namespace Datacratic { 00027 00028 enum Direction { 00029 COMPRESS, 00030 DECOMPRESS 00031 }; 00032 00033 std::string print(Direction dir); 00034 std::ostream & operator << (std::ostream & stream, Direction dir); 00035 00036 enum FlushLevel { 00037 FLUSH_NONE, 00038 FLUSH_SYNC, 00039 FLUSH_FULL, 00040 FLUSH_FINISH 00041 }; 00042 00043 std::string print(FlushLevel lvl); 00044 std::ostream & operator << (std::ostream & stream, FlushLevel lvl); 00045 00046 00047 /*****************************************************************************/ 00048 /* FILTER */ 00049 /*****************************************************************************/ 00050 00051 struct Filter { 00052 00053 virtual ~Filter(); 00054 00055 typedef void (OnOutputFn) (const char *, size_t, FlushLevel, boost::function<void ()>); 00056 typedef boost::function<OnOutputFn> OnOutput; 00057 OnOutput onOutput; 00058 00059 typedef void (OnErrorFn) (const std::string &); 00060 typedef boost::function<OnErrorFn> OnError; 00061 OnError onError; 00062 00063 virtual void flush(FlushLevel level, 00064 boost::function<void ()> onFlushDone 00065 = boost::function<void ()>()); 00066 00067 virtual void process(const std::string & buf, 00068 FlushLevel level = FLUSH_NONE, 00069 boost::function<void ()> onFilterDone 00070 = boost::function<void ()>()); 00071 00072 virtual void process(const char * first, const char * last, 00073 FlushLevel level = FLUSH_NONE, 00074 boost::function<void ()> onFilterDone 00075 = boost::function<void ()>()) = 0; 00076 00077 static Filter * create(const std::string & extension, 00078 Direction direction); 00079 }; 00080 00081 00082 /*****************************************************************************/ 00083 /* IDENTITY FILTER */ 00084 /*****************************************************************************/ 00085 00086 struct IdentityFilter : public Filter { 00087 00088 using Filter::process; 00089 00090 virtual void process(const char * src_begin, const char * src_end, 00091 FlushLevel level, 00092 boost::function<void ()> onMessageDone); 00093 }; 00094 00095 00096 /*****************************************************************************/ 00097 /* FILTER STACK */ 00098 /*****************************************************************************/ 00099 00100 struct FilterStack : public Filter { 00101 00102 using Filter::process; 00103 00104 virtual void process(const char * src_begin, const char * src_end, 00105 FlushLevel level, 00106 boost::function<void ()> onMessageDone); 00107 00108 void push(std::shared_ptr<Filter> filter); 00109 std::shared_ptr<Filter> pop(); 00110 size_t size() const { return filters.size(); } 00111 bool empty() const { return filters.empty(); } 00112 00113 private: 00114 std::vector<std::shared_ptr<Filter> > filters; 00115 }; 00116 00117 00118 /*****************************************************************************/ 00119 /* ZLIB COMPRESSOR */ 00120 /*****************************************************************************/ 00121 00122 struct ZlibCompressor 00123 : public Filter { 00124 00125 static const boost::iostreams::zlib_params DEFAULT_PARAMS; 00126 00127 ZlibCompressor(const boost::iostreams::zlib_params& p 00128 = DEFAULT_PARAMS); 00129 00130 ~ZlibCompressor(); 00131 00132 using Filter::process; 00133 00134 virtual void process(const char * src_begin, const char * src_end, 00135 FlushLevel level, 00136 boost::function<void ()> onMessageDone); 00137 00138 protected: 00139 ZlibCompressor(const boost::iostreams::zlib_params& p, 00140 Direction direction); 00141 00142 private: 00143 struct Itl; 00144 std::shared_ptr<Itl> itl; 00145 Direction direction; 00146 }; 00147 00148 00149 /*****************************************************************************/ 00150 /* ZLIB DECOMPRESSOR */ 00151 /*****************************************************************************/ 00152 00153 struct ZlibDecompressor 00154 : public ZlibCompressor { 00155 00156 ZlibDecompressor(const boost::iostreams::zlib_params& p 00157 = DEFAULT_PARAMS); 00158 00159 ~ZlibDecompressor(); 00160 }; 00161 00162 00163 /*****************************************************************************/ 00164 /* GZIP COMPRESSOR */ 00165 /*****************************************************************************/ 00166 00167 struct GzipCompressorFilter: public Filter { 00168 00169 static const boost::iostreams::gzip_params DEFAULT_PARAMS; 00170 00171 GzipCompressorFilter(const boost::iostreams::gzip_params& p 00172 = DEFAULT_PARAMS); 00173 00174 ~GzipCompressorFilter(); 00175 00176 using Filter::process; 00177 00178 virtual void process(const char * src_begin, const char * src_end, 00179 FlushLevel level, 00180 boost::function<void ()> onMessageDone); 00181 00182 private: 00183 struct Itl; 00184 std::shared_ptr<Itl> itl; 00185 }; 00186 00187 00188 /*****************************************************************************/ 00189 /* GZIP DECOMPRESSOR */ 00190 /*****************************************************************************/ 00191 00192 struct GzipDecompressor : public Filter { 00193 GzipDecompressor(); 00194 00195 ~GzipDecompressor(); 00196 00197 using Filter::process; 00198 00199 virtual void process(const char * src_begin, const char * src_end, 00200 FlushLevel level, 00201 boost::function<void ()> onMessageDone); 00202 00203 private: 00204 struct Itl; 00205 std::shared_ptr<Itl> itl; 00206 }; 00207 00208 00209 /*****************************************************************************/ 00210 /* BZIP2 COMPRESSOR */ 00211 /*****************************************************************************/ 00212 00213 struct Bzip2Compressor 00214 : public Filter { 00215 00216 static const boost::iostreams::bzip2_params DEFAULT_PARAMS; 00217 00218 Bzip2Compressor(const boost::iostreams::bzip2_params& p 00219 = DEFAULT_PARAMS); 00220 00221 ~Bzip2Compressor(); 00222 00223 using Filter::process; 00224 00225 virtual void process(const char * src_begin, const char * src_end, 00226 FlushLevel level, 00227 boost::function<void ()> onMessageDone); 00228 00229 protected: 00230 Bzip2Compressor(const boost::iostreams::bzip2_params& p, 00231 Direction direction); 00232 00233 private: 00234 struct Itl; 00235 std::shared_ptr<Itl> itl; 00236 Direction direction; 00237 }; 00238 00239 00240 /*****************************************************************************/ 00241 /* BZIP2 DECOMPRESSOR */ 00242 /*****************************************************************************/ 00243 00244 struct Bzip2Decompressor 00245 : public Bzip2Compressor { 00246 00247 static const boost::iostreams::bzip2_params DEFAULT_PARAMS; 00248 00249 Bzip2Decompressor(const boost::iostreams::bzip2_params& p 00250 = DEFAULT_PARAMS); 00251 00252 ~Bzip2Decompressor(); 00253 }; 00254 00255 00256 /*****************************************************************************/ 00257 /* LZMA COMPRESSOR */ 00258 /*****************************************************************************/ 00259 00260 struct LzmaCompressor 00261 : public Filter { 00262 00263 LzmaCompressor(int level = 6); 00264 ~LzmaCompressor(); 00265 00266 using Filter::process; 00267 00268 virtual void process(const char * src_begin, const char * src_end, 00269 FlushLevel level, 00270 boost::function<void ()> onMessageDone); 00271 00272 protected: 00273 LzmaCompressor(Direction direction); 00274 00275 private: 00276 struct Itl; 00277 std::shared_ptr<Itl> itl; 00278 Direction direction; 00279 }; 00280 00281 00282 /*****************************************************************************/ 00283 /* LZMA DECOMPRESSOR */ 00284 /*****************************************************************************/ 00285 00286 struct LzmaDecompressor 00287 : public LzmaCompressor { 00288 00289 LzmaDecompressor(); 00290 00291 ~LzmaDecompressor(); 00292 }; 00293 00294 00295 } // namespace Datacratic 00296 00297 00298 #endif /* __logger__filter_h__ */