RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* compressing_output.h -*- C++ -*- 00002 Jeremy Barnes, 20 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #ifndef __logger__compressing_output_h__ 00008 #define __logger__compressing_output_h__ 00009 00010 #include "logger.h" 00011 #include "compressor.h" 00012 #include "jml/utils/ring_buffer.h" 00013 #include "jml/arch/timers.h" 00014 00015 00016 namespace Datacratic { 00017 00018 00019 enum FileFlushLevel { 00020 FLUSH_NONE, 00021 FLUSH_TO_OS, 00022 FLUSH_TO_DISK 00023 }; 00024 00025 00026 /*****************************************************************************/ 00027 /* WORKER THREAD OUTPUT */ 00028 /*****************************************************************************/ 00029 00035 struct WorkerThreadOutput : public LogOutput { 00036 00037 WorkerThreadOutput(size_t ringBufferSize = 65536); 00038 00039 virtual ~WorkerThreadOutput(); 00040 00041 virtual void logMessage(const std::string & channel, 00042 const std::string & message); 00043 00044 virtual Json::Value stats() const; 00045 00046 virtual void clearStats(); 00047 00048 protected: 00049 void startWorkerThread(); 00050 void stopWorkerThread(); 00051 00055 void pushOperation(const std::function<void ()> & op); 00056 00057 enum MessageType { 00058 MT_LOG, 00059 MT_END, 00060 MT_OP, 00061 MT_SHUTDOWN 00062 }; 00063 00064 struct Message { 00065 MessageType type; 00066 std::string channel; 00067 std::string contents; 00068 std::function<void ()> op; 00069 }; 00070 00071 ML::RingBufferSRMW<Message> ringBuffer; 00072 00073 virtual void implementLogMessage(const std::string & channel, 00074 const std::string & message) = 0; 00075 00077 boost::scoped_ptr<boost::thread> logThread; 00078 00080 void runLogThread(); 00081 00083 ML::Duty_Cycle_Timer duty; 00084 00085 volatile int shutdown_; 00086 volatile int up_; 00087 }; 00088 00089 00090 /*****************************************************************************/ 00091 /* COMPRESSING OUTPUT */ 00092 /*****************************************************************************/ 00093 00098 struct CompressingOutput : public WorkerThreadOutput { 00099 00100 CompressingOutput(size_t ringBufferSize = 65536, 00101 Compressor::FlushLevel compressorFlushLevel 00102 = Compressor::FLUSH_AVAILABLE); 00103 00104 virtual ~CompressingOutput(); 00105 00106 struct Sink { 00107 virtual ~Sink() 00108 { 00109 } 00110 00112 virtual void close() = 0; 00113 00117 virtual size_t write(const char * data, size_t size) = 0; 00118 00122 virtual size_t flush(FileFlushLevel flushLevel) = 0; 00123 }; 00124 00125 void open(std::shared_ptr<Sink> sink, 00126 const std::string & compression, 00127 int compressionLevel); 00128 00129 void closeCompressor(); 00130 00131 boost::function<void (std::string, std::size_t)> onFileWrite; 00132 00133 protected: 00134 Compressor::FlushLevel compressorFlushLevel; 00135 std::shared_ptr<Sink> sink; 00136 std::shared_ptr<Compressor> compressor; 00137 std::function<size_t (const char *, size_t)> onData; 00138 00139 // Overrides 00140 00141 virtual void implementLogMessage(const std::string & channel, 00142 const std::string & message); 00143 }; 00144 00145 00146 } // namespace Datacratic 00147 00148 00149 #endif /* __logger__compressing_output_h__ */