RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/filter.h
00001 /* filter.h                                                        -*- C++ -*-
00002    Jeremy Barnes, 29 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #ifndef __logger__filter_h__
00008 #define __logger__filter_h__
00009 
00010 
00011 #include <boost/function.hpp>
00012 #include <boost/shared_ptr.hpp>
00013 #include "soa/sigslot/slot.h"
00014 
00015 namespace boost {
00016 namespace iostreams {
00017 
00018 struct zlib_params;
00019 struct bzip2_params;
00020 struct gzip_params;
00021 
00022 } // namespace iostreams
00023 } // namespace boost
00024 
00025 
00026 namespace Datacratic {
00027 
00028 enum Direction {
00029     COMPRESS,
00030     DECOMPRESS
00031 };
00032 
00033 std::string print(Direction dir);
00034 std::ostream & operator << (std::ostream & stream, Direction dir);
00035 
00036 enum FlushLevel {
00037     FLUSH_NONE,   
00038     FLUSH_SYNC,   
00039     FLUSH_FULL,   
00040     FLUSH_FINISH  
00041 };
00042 
00043 std::string print(FlushLevel lvl);
00044 std::ostream & operator << (std::ostream & stream, FlushLevel lvl);
00045 
00046 
00047 /*****************************************************************************/
00048 /* FILTER                                                                    */
00049 /*****************************************************************************/
00050 
00051 struct Filter {
00052 
00053     virtual ~Filter();
00054 
00055     typedef void (OnOutputFn) (const char *, size_t, FlushLevel, boost::function<void ()>);
00056     typedef boost::function<OnOutputFn> OnOutput;
00057     OnOutput onOutput;
00058 
00059     typedef void (OnErrorFn) (const std::string &);
00060     typedef boost::function<OnErrorFn> OnError;
00061     OnError onError;
00062 
00063     virtual void flush(FlushLevel level,
00064                        boost::function<void ()> onFlushDone
00065                            = boost::function<void ()>());
00066 
00067     virtual void process(const std::string & buf,
00068                          FlushLevel level = FLUSH_NONE,
00069                          boost::function<void ()> onFilterDone
00070                              = boost::function<void ()>());
00071 
00072     virtual void process(const char * first, const char * last,
00073                          FlushLevel level = FLUSH_NONE,
00074                          boost::function<void ()> onFilterDone
00075                              = boost::function<void ()>()) = 0;
00076 
00077     static Filter * create(const std::string & extension,
00078                            Direction direction);
00079 };
00080 
00081 
00082 /*****************************************************************************/
00083 /* IDENTITY FILTER                                                           */
00084 /*****************************************************************************/
00085 
00086 struct IdentityFilter : public Filter {
00087 
00088     using Filter::process;
00089 
00090     virtual void process(const char * src_begin, const char * src_end,
00091                          FlushLevel level,
00092                          boost::function<void ()> onMessageDone);
00093 };
00094 
00095 
00096 /*****************************************************************************/
00097 /* FILTER STACK                                                              */
00098 /*****************************************************************************/
00099 
00100 struct FilterStack : public Filter {
00101 
00102     using Filter::process;
00103 
00104     virtual void process(const char * src_begin, const char * src_end,
00105                          FlushLevel level,
00106                          boost::function<void ()> onMessageDone);
00107 
00108     void push(std::shared_ptr<Filter> filter);
00109     std::shared_ptr<Filter> pop();
00110     size_t size() const { return filters.size(); }
00111     bool empty() const { return filters.empty(); }
00112 
00113 private:
00114     std::vector<std::shared_ptr<Filter> > filters;
00115 };
00116 
00117 
00118 /*****************************************************************************/
00119 /* ZLIB COMPRESSOR                                                           */
00120 /*****************************************************************************/
00121 
00122 struct ZlibCompressor
00123     : public Filter {
00124 
00125     static const boost::iostreams::zlib_params DEFAULT_PARAMS;
00126 
00127     ZlibCompressor(const boost::iostreams::zlib_params& p
00128                        = DEFAULT_PARAMS);
00129     
00130     ~ZlibCompressor();
00131 
00132     using Filter::process;
00133 
00134     virtual void process(const char * src_begin, const char * src_end,
00135                          FlushLevel level,
00136                          boost::function<void ()> onMessageDone);
00137 
00138 protected:
00139     ZlibCompressor(const boost::iostreams::zlib_params& p,
00140                    Direction direction);
00141 
00142 private:
00143     struct Itl;
00144     std::shared_ptr<Itl> itl;
00145     Direction direction;
00146 };
00147 
00148 
00149 /*****************************************************************************/
00150 /* ZLIB DECOMPRESSOR                                                         */
00151 /*****************************************************************************/
00152 
00153 struct ZlibDecompressor
00154     : public ZlibCompressor {
00155 
00156     ZlibDecompressor(const boost::iostreams::zlib_params& p
00157                      = DEFAULT_PARAMS);
00158     
00159     ~ZlibDecompressor();
00160 };
00161 
00162 
00163 /*****************************************************************************/
00164 /* GZIP COMPRESSOR                                                           */
00165 /*****************************************************************************/
00166 
00167 struct GzipCompressorFilter: public Filter {
00168 
00169     static const boost::iostreams::gzip_params DEFAULT_PARAMS;
00170 
00171     GzipCompressorFilter(const boost::iostreams::gzip_params& p
00172                        = DEFAULT_PARAMS);
00173     
00174     ~GzipCompressorFilter();
00175 
00176     using Filter::process;
00177 
00178     virtual void process(const char * src_begin, const char * src_end,
00179                          FlushLevel level,
00180                          boost::function<void ()> onMessageDone);
00181 
00182 private:
00183     struct Itl;
00184     std::shared_ptr<Itl> itl;
00185 };
00186 
00187 
00188 /*****************************************************************************/
00189 /* GZIP DECOMPRESSOR                                                         */
00190 /*****************************************************************************/
00191 
00192 struct GzipDecompressor : public Filter {
00193     GzipDecompressor();
00194     
00195     ~GzipDecompressor();
00196     
00197     using Filter::process;
00198 
00199     virtual void process(const char * src_begin, const char * src_end,
00200                          FlushLevel level,
00201                          boost::function<void ()> onMessageDone);
00202 
00203 private:
00204     struct Itl;
00205     std::shared_ptr<Itl> itl;
00206 };
00207 
00208 
00209 /*****************************************************************************/
00210 /* BZIP2 COMPRESSOR                                                          */
00211 /*****************************************************************************/
00212 
00213 struct Bzip2Compressor
00214     : public Filter {
00215 
00216     static const boost::iostreams::bzip2_params DEFAULT_PARAMS;
00217 
00218     Bzip2Compressor(const boost::iostreams::bzip2_params& p
00219                        = DEFAULT_PARAMS);
00220     
00221     ~Bzip2Compressor();
00222 
00223     using Filter::process;
00224 
00225     virtual void process(const char * src_begin, const char * src_end,
00226                          FlushLevel level,
00227                          boost::function<void ()> onMessageDone);
00228 
00229 protected:
00230     Bzip2Compressor(const boost::iostreams::bzip2_params& p,
00231                    Direction direction);
00232 
00233 private:
00234     struct Itl;
00235     std::shared_ptr<Itl> itl;
00236     Direction direction;
00237 };
00238 
00239 
00240 /*****************************************************************************/
00241 /* BZIP2 DECOMPRESSOR                                                        */
00242 /*****************************************************************************/
00243 
00244 struct Bzip2Decompressor
00245     : public Bzip2Compressor {
00246 
00247     static const boost::iostreams::bzip2_params DEFAULT_PARAMS;
00248 
00249     Bzip2Decompressor(const boost::iostreams::bzip2_params& p
00250                      = DEFAULT_PARAMS);
00251     
00252     ~Bzip2Decompressor();
00253 };
00254 
00255 
00256 /*****************************************************************************/
00257 /* LZMA COMPRESSOR                                                           */
00258 /*****************************************************************************/
00259 
00260 struct LzmaCompressor
00261     : public Filter {
00262 
00263     LzmaCompressor(int level = 6);
00264     ~LzmaCompressor();
00265 
00266     using Filter::process;
00267 
00268     virtual void process(const char * src_begin, const char * src_end,
00269                          FlushLevel level,
00270                          boost::function<void ()> onMessageDone);
00271 
00272 protected:
00273     LzmaCompressor(Direction direction);
00274 
00275 private:
00276     struct Itl;
00277     std::shared_ptr<Itl> itl;
00278     Direction direction;
00279 };
00280 
00281 
00282 /*****************************************************************************/
00283 /* LZMA DECOMPRESSOR                                                         */
00284 /*****************************************************************************/
00285 
00286 struct LzmaDecompressor
00287     : public LzmaCompressor {
00288 
00289     LzmaDecompressor();
00290     
00291     ~LzmaDecompressor();
00292 };
00293 
00294 
00295 } // namespace Datacratic
00296 
00297 
00298 #endif /* __logger__filter_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator