RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/compressing_output.cc
00001 /* compressing_output.cc
00002    Jeremy Barnes, 20 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Base for an output source that compresses its data.
00006 */
00007 
00008 #include "compressing_output.h"
00009 #include "jml/utils/parse_context.h"
00010 
00011 
00012 using namespace std;
00013 using namespace ML;
00014 
00015 
00016 namespace Datacratic {
00017 
00018 
00019 /*****************************************************************************/
00020 /* WORKER THREAD OUTPUT                                                      */
00021 /*****************************************************************************/
00022 
00023 WorkerThreadOutput::
00024 WorkerThreadOutput(size_t ringBufferSize)
00025     : ringBuffer(ringBufferSize)
00026 {
00027 }
00028 
00029 WorkerThreadOutput::
00030 ~WorkerThreadOutput()
00031 {
00032 }
00033 
00034 void
00035 WorkerThreadOutput::
00036 startWorkerThread()
00037 {
00038     if (logThread)
00039         throw ML::Exception("log thread already up");
00040 
00041     shutdown_ = 0;
00042     up_ = 0;
00043 
00044     // NOTE: we can pass by reference since the log thread never touches
00045     // sem until this function has exited
00046     logThread.reset(new boost::thread([&](){ this->runLogThread(); }));
00047 
00048     // Wait until we're ready
00049     while (!up_)
00050         futex_wait(up_, false);
00051 }
00052 
00053 void
00054 WorkerThreadOutput::
00055 stopWorkerThread()
00056 {
00057     if (logThread) {
00058         shutdown_ = 1;
00059 
00060         Message message;
00061         message.type = MT_SHUTDOWN;
00062         ringBuffer.push(message);
00063 
00064         logThread->join();
00065         logThread.reset();
00066     }
00067 }
00068 
00069 void
00070 WorkerThreadOutput::
00071 logMessage(const std::string & channel,
00072            const std::string & contents)
00073 {
00074     Message message;
00075     message.type    = MT_LOG;
00076     message.channel = channel;
00077     message.contents = contents;
00078 
00079     ringBuffer.push(std::move(message));
00080 }
00081 
00082 #if 0
00083 void
00084 WorkerThreadOutput::
00085 endRecord()
00086 {
00087     Message message;
00088     message.type    = MT_END;
00089 
00090     ringBuffer.push(std::move(message));
00091 }
00092 #endif
00093 
00094 void
00095 WorkerThreadOutput::
00096 pushOperation(const std::function<void ()> & op)
00097 {
00098     Message message;
00099     message.type = MT_OP;
00100     message.op = op;
00101 
00102     ringBuffer.push(std::move(message));
00103 }
00104 
00105 Json::Value
00106 WorkerThreadOutput::
00107 stats() const
00108 {
00109     Json::Value result;
00110 
00111     ML::Duty_Cycle_Timer::Stats dutyStats = duty.stats();
00112 
00113     result["duty"]["dutyCycle"] = dutyStats.duty_cycle();
00114     result["duty"]["usAwake"] = (int)dutyStats.usAwake;
00115     result["duty"]["usAsleep"] = (int)dutyStats.usAsleep;
00116     result["duty"]["wakeups"] = (int)dutyStats.numWakeups;
00117 
00118     return result;
00119 }
00120 
00121 void
00122 WorkerThreadOutput::
00123 clearStats()
00124 {
00125     duty.clear();
00126 }
00127 
00128 void
00129 WorkerThreadOutput::
00130 runLogThread()
00131 {
00132     using namespace std;
00133 
00134     duty.clear();
00135 
00136     up_ = 1;
00137     futex_wake(up_);
00138 
00139     while (!shutdown_) {
00140 
00141         duty.notifyBeforeSleep();
00142         Message msg;
00143         bool found = ringBuffer.tryPop(msg, 0.5);
00144         duty.notifyAfterSleep();
00145 
00146         if (!found)
00147             continue;
00148 
00149         switch (msg.type) {
00150 
00151         case MT_LOG:
00152             implementLogMessage(msg.channel, msg.contents);
00153             break;
00154             
00155         case MT_END:
00156             //implementEndRecord();
00157             break;
00158 
00159         case MT_OP:
00160             try {
00161                 msg.op();
00162             } catch (const std::exception & exc) {
00163                 cerr << "warning: log operation threw exception: "
00164                      << exc.what() << endl;
00165             }
00166             break;
00167 
00168         case MT_SHUTDOWN:
00169             break;
00170             
00171         default:
00172             throw ML::Exception("unknown file logger message type");
00173         }
00174     }
00175 }
00176 
00177 
00178 /*****************************************************************************/
00179 /* COMPRESSING OUTPUT                                                        */
00180 /*****************************************************************************/
00181 
00186 CompressingOutput::
00187 CompressingOutput(size_t ringBufferSize,
00188                   Compressor::FlushLevel flushLevel)
00189     : WorkerThreadOutput(ringBufferSize),
00190       compressorFlushLevel(flushLevel)
00191 {
00192 }
00193 
00194 CompressingOutput::
00195 ~CompressingOutput()
00196 {
00197 }
00198 
00199 void
00200 CompressingOutput::
00201 open(std::shared_ptr<Sink> sink,
00202      const std::string & compression,
00203      int compressionLevel)
00204 {
00205     if (compressor)
00206         throw ML::Exception("can't open compressor without closing the "
00207                             "previous one");
00208     compressor.reset(Compressor::create(compression, compressionLevel));
00209 
00210     this->sink = sink;
00211 
00212     onData = std::bind(&Sink::write,
00213                        sink,
00214                        std::placeholders::_1,
00215                        std::placeholders::_2);
00216 }
00217 
00218 void
00219 CompressingOutput::
00220 closeCompressor()
00221 {
00222     if (!compressor)
00223         return;
00224     compressor->finish(onData);
00225     compressor.reset();
00226 }
00227 
00228 void
00229 CompressingOutput::
00230 implementLogMessage(const std::string & channel,
00231                     const std::string & message)
00232 {
00233     if (!compressor)
00234         throw ML::Exception("implementLogMessage without compressor");
00235 
00236     if (onFileWrite) 
00237         onFileWrite(channel, channel.size() + message.size() + 2);
00238 
00239     char buf[channel.size() + message.size() + 2];
00240     memcpy(buf, channel.c_str(), channel.size());
00241     buf[channel.size()] = '\t';
00242     memcpy(buf + channel.size() + 1, message.c_str(), message.size());
00243     buf[channel.size() + message.size() + 1] = '\n';
00244 
00245     compressor->compress(buf, channel.size() + message.size() + 2,
00246                          onData);
00247 
00248     // This should be done elsewhere or accessed via a flag
00249     compressor->flush(compressorFlushLevel, onData);
00250 }
00251 
00252 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator