RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/compressing_output.h
00001 /* compressing_output.h                                            -*- C++ -*-
00002    Jeremy Barnes, 20 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #ifndef __logger__compressing_output_h__
00008 #define __logger__compressing_output_h__
00009 
00010 #include "logger.h"
00011 #include "compressor.h"
00012 #include "jml/utils/ring_buffer.h"
00013 #include "jml/arch/timers.h"
00014 
00015 
00016 namespace Datacratic {
00017 
00018 
00019 enum FileFlushLevel {
00020     FLUSH_NONE,       
00021     FLUSH_TO_OS,      
00022     FLUSH_TO_DISK     
00023 };
00024 
00025 
00026 /*****************************************************************************/
00027 /* WORKER THREAD OUTPUT                                                      */
00028 /*****************************************************************************/
00029 
00035 struct WorkerThreadOutput : public LogOutput {
00036 
00037     WorkerThreadOutput(size_t ringBufferSize = 65536);
00038 
00039     virtual ~WorkerThreadOutput();
00040 
00041     virtual void logMessage(const std::string & channel,
00042                             const std::string & message);
00043 
00044     virtual Json::Value stats() const;
00045 
00046     virtual void clearStats();
00047 
00048 protected:
00049     void startWorkerThread();
00050     void stopWorkerThread();
00051 
00055     void pushOperation(const std::function<void ()> & op);
00056 
00057     enum MessageType {
00058         MT_LOG,     
00059         MT_END,     
00060         MT_OP,      
00061         MT_SHUTDOWN
00062     };
00063 
00064     struct Message {
00065         MessageType type;
00066         std::string channel;
00067         std::string contents;
00068         std::function<void ()> op;
00069     };
00070 
00071     ML::RingBufferSRMW<Message> ringBuffer;
00072 
00073     virtual void implementLogMessage(const std::string & channel,
00074                                      const std::string & message) = 0;
00075 
00077     boost::scoped_ptr<boost::thread> logThread;
00078 
00080     void runLogThread();
00081 
00083     ML::Duty_Cycle_Timer duty;
00084 
00085     volatile int shutdown_;
00086     volatile int up_;
00087 };
00088 
00089 
00090 /*****************************************************************************/
00091 /* COMPRESSING OUTPUT                                                        */
00092 /*****************************************************************************/
00093 
00098 struct CompressingOutput : public WorkerThreadOutput {
00099 
00100     CompressingOutput(size_t ringBufferSize = 65536,
00101                       Compressor::FlushLevel compressorFlushLevel
00102                           = Compressor::FLUSH_AVAILABLE);
00103 
00104     virtual ~CompressingOutput();
00105 
00106     struct Sink {
00107         virtual ~Sink()
00108         {
00109         }
00110 
00112         virtual void close() = 0;
00113         
00117         virtual size_t write(const char * data, size_t size) = 0;
00118 
00122         virtual size_t flush(FileFlushLevel flushLevel) = 0;
00123     };
00124 
00125     void open(std::shared_ptr<Sink> sink,
00126               const std::string & compression,
00127               int compressionLevel);
00128 
00129     void closeCompressor();
00130 
00131     boost::function<void (std::string, std::size_t)> onFileWrite;
00132 
00133 protected:
00134     Compressor::FlushLevel compressorFlushLevel;
00135     std::shared_ptr<Sink> sink;
00136     std::shared_ptr<Compressor> compressor;
00137     std::function<size_t (const char *, size_t)> onData;
00138 
00139     // Overrides
00140 
00141     virtual void implementLogMessage(const std::string & channel,
00142                                      const std::string & message);
00143 };
00144 
00145 
00146 } // namespace Datacratic
00147 
00148 
00149 #endif /* __logger__compressing_output_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator