RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* compressing_output.cc 00002 Jeremy Barnes, 20 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Base for an output source that compresses its data. 00006 */ 00007 00008 #include "compressing_output.h" 00009 #include "jml/utils/parse_context.h" 00010 00011 00012 using namespace std; 00013 using namespace ML; 00014 00015 00016 namespace Datacratic { 00017 00018 00019 /*****************************************************************************/ 00020 /* WORKER THREAD OUTPUT */ 00021 /*****************************************************************************/ 00022 00023 WorkerThreadOutput:: 00024 WorkerThreadOutput(size_t ringBufferSize) 00025 : ringBuffer(ringBufferSize) 00026 { 00027 } 00028 00029 WorkerThreadOutput:: 00030 ~WorkerThreadOutput() 00031 { 00032 } 00033 00034 void 00035 WorkerThreadOutput:: 00036 startWorkerThread() 00037 { 00038 if (logThread) 00039 throw ML::Exception("log thread already up"); 00040 00041 shutdown_ = 0; 00042 up_ = 0; 00043 00044 // NOTE: we can pass by reference since the log thread never touches 00045 // sem until this function has exited 00046 logThread.reset(new boost::thread([&](){ this->runLogThread(); })); 00047 00048 // Wait until we're ready 00049 while (!up_) 00050 futex_wait(up_, false); 00051 } 00052 00053 void 00054 WorkerThreadOutput:: 00055 stopWorkerThread() 00056 { 00057 if (logThread) { 00058 shutdown_ = 1; 00059 00060 Message message; 00061 message.type = MT_SHUTDOWN; 00062 ringBuffer.push(message); 00063 00064 logThread->join(); 00065 logThread.reset(); 00066 } 00067 } 00068 00069 void 00070 WorkerThreadOutput:: 00071 logMessage(const std::string & channel, 00072 const std::string & contents) 00073 { 00074 Message message; 00075 message.type = MT_LOG; 00076 message.channel = channel; 00077 message.contents = contents; 00078 00079 ringBuffer.push(std::move(message)); 00080 } 00081 00082 #if 0 00083 void 00084 WorkerThreadOutput:: 00085 endRecord() 00086 { 00087 Message message; 00088 message.type = MT_END; 00089 00090 ringBuffer.push(std::move(message)); 00091 } 00092 #endif 00093 00094 void 00095 WorkerThreadOutput:: 00096 pushOperation(const std::function<void ()> & op) 00097 { 00098 Message message; 00099 message.type = MT_OP; 00100 message.op = op; 00101 00102 ringBuffer.push(std::move(message)); 00103 } 00104 00105 Json::Value 00106 WorkerThreadOutput:: 00107 stats() const 00108 { 00109 Json::Value result; 00110 00111 ML::Duty_Cycle_Timer::Stats dutyStats = duty.stats(); 00112 00113 result["duty"]["dutyCycle"] = dutyStats.duty_cycle(); 00114 result["duty"]["usAwake"] = (int)dutyStats.usAwake; 00115 result["duty"]["usAsleep"] = (int)dutyStats.usAsleep; 00116 result["duty"]["wakeups"] = (int)dutyStats.numWakeups; 00117 00118 return result; 00119 } 00120 00121 void 00122 WorkerThreadOutput:: 00123 clearStats() 00124 { 00125 duty.clear(); 00126 } 00127 00128 void 00129 WorkerThreadOutput:: 00130 runLogThread() 00131 { 00132 using namespace std; 00133 00134 duty.clear(); 00135 00136 up_ = 1; 00137 futex_wake(up_); 00138 00139 while (!shutdown_) { 00140 00141 duty.notifyBeforeSleep(); 00142 Message msg; 00143 bool found = ringBuffer.tryPop(msg, 0.5); 00144 duty.notifyAfterSleep(); 00145 00146 if (!found) 00147 continue; 00148 00149 switch (msg.type) { 00150 00151 case MT_LOG: 00152 implementLogMessage(msg.channel, msg.contents); 00153 break; 00154 00155 case MT_END: 00156 //implementEndRecord(); 00157 break; 00158 00159 case MT_OP: 00160 try { 00161 msg.op(); 00162 } catch (const std::exception & exc) { 00163 cerr << "warning: log operation threw exception: " 00164 << exc.what() << endl; 00165 } 00166 break; 00167 00168 case MT_SHUTDOWN: 00169 break; 00170 00171 default: 00172 throw ML::Exception("unknown file logger message type"); 00173 } 00174 } 00175 } 00176 00177 00178 /*****************************************************************************/ 00179 /* COMPRESSING OUTPUT */ 00180 /*****************************************************************************/ 00181 00186 CompressingOutput:: 00187 CompressingOutput(size_t ringBufferSize, 00188 Compressor::FlushLevel flushLevel) 00189 : WorkerThreadOutput(ringBufferSize), 00190 compressorFlushLevel(flushLevel) 00191 { 00192 } 00193 00194 CompressingOutput:: 00195 ~CompressingOutput() 00196 { 00197 } 00198 00199 void 00200 CompressingOutput:: 00201 open(std::shared_ptr<Sink> sink, 00202 const std::string & compression, 00203 int compressionLevel) 00204 { 00205 if (compressor) 00206 throw ML::Exception("can't open compressor without closing the " 00207 "previous one"); 00208 compressor.reset(Compressor::create(compression, compressionLevel)); 00209 00210 this->sink = sink; 00211 00212 onData = std::bind(&Sink::write, 00213 sink, 00214 std::placeholders::_1, 00215 std::placeholders::_2); 00216 } 00217 00218 void 00219 CompressingOutput:: 00220 closeCompressor() 00221 { 00222 if (!compressor) 00223 return; 00224 compressor->finish(onData); 00225 compressor.reset(); 00226 } 00227 00228 void 00229 CompressingOutput:: 00230 implementLogMessage(const std::string & channel, 00231 const std::string & message) 00232 { 00233 if (!compressor) 00234 throw ML::Exception("implementLogMessage without compressor"); 00235 00236 if (onFileWrite) 00237 onFileWrite(channel, channel.size() + message.size() + 2); 00238 00239 char buf[channel.size() + message.size() + 2]; 00240 memcpy(buf, channel.c_str(), channel.size()); 00241 buf[channel.size()] = '\t'; 00242 memcpy(buf + channel.size() + 1, message.c_str(), message.size()); 00243 buf[channel.size() + message.size() + 1] = '\n'; 00244 00245 compressor->compress(buf, channel.size() + message.size() + 2, 00246 onData); 00247 00248 // This should be done elsewhere or accessed via a flag 00249 compressor->flush(compressorFlushLevel, onData); 00250 } 00251 00252 } // namespace Datacratic