![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* rotating_output.cc 00002 Jeremy Barnes, 19 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #include "rotating_output.h" 00008 #include "jml/arch/futex.h" 00009 00010 using namespace std; 00011 using namespace ML; 00012 00013 00014 namespace Datacratic { 00015 00016 00017 /*****************************************************************************/ 00018 /* ROTATING OUTPUT */ 00019 /*****************************************************************************/ 00020 00021 RotatingOutput:: 00022 RotatingOutput() 00023 { 00024 } 00025 00026 RotatingOutput:: 00027 ~RotatingOutput() 00028 { 00029 } 00030 00031 void 00032 RotatingOutput:: 00033 open(const std::string & periodPattern) 00034 { 00035 close(); 00036 00037 std::tie(granularity, number) 00038 = parsePeriod(periodPattern); 00039 00040 std::tie(currentPeriodStart, interval) 00041 = findPeriod(Date::now(), granularity, number); 00042 00043 shutdown_ = false; 00044 up_ = false; 00045 00046 // NOTE: we can pass by reference since the log thread never touches 00047 // sem until this function has exited 00048 rotateThread 00049 .reset(new boost::thread([&](){ this->runRotateThread(); })); 00050 00051 // Wait until we're ready 00052 while (!up_) 00053 futex_wait(up_, false); 00054 } 00055 00056 void 00057 RotatingOutput:: 00058 close() 00059 { 00060 //cerr << "rfo close" << endl; 00061 if (rotateThread) { 00062 shutdown_ = true; 00063 futex_wake(shutdown_); 00064 rotateThread->join(); 00065 //cerr << "rfo joined" << endl; 00066 rotateThread.reset(); 00067 shutdown_ = false; 00068 } 00069 else closeSubordinate(); 00070 } 00071 00072 void 00073 RotatingOutput:: 00074 performRotation() 00075 { 00076 Guard guard(lock); 00077 00078 currentPeriodStart.addSeconds(interval); 00079 00080 rotateSubordinate(currentPeriodStart); 00081 } 00082 00083 void 00084 RotatingOutput:: 00085 runRotateThread() 00086 { 00087 openSubordinate(currentPeriodStart); 00088 00089 up_ = true; 00090 futex_wake(up_); 00091 00092 while (!shutdown_) { 00093 Date now = Date::now(); 00094 Date nextRotation = currentPeriodStart.plusSeconds(interval); 00095 double secondsUntilRotation = now.secondsUntil(nextRotation); 00096 00097 if (secondsUntilRotation <= 0) { 00098 performRotation(); 00099 } 00100 else { 00101 futex_wait(shutdown_, false, secondsUntilRotation); 00102 } 00103 } 00104 00105 cerr << "rfo shutdown acknowledged" << endl; 00106 00107 closeSubordinate(); 00108 00109 cerr << "file closed" << endl; 00110 } 00111 00112 00113 /*****************************************************************************/ 00114 /* ROTATING OUTPUT ADAPTOR */ 00115 /*****************************************************************************/ 00116 00117 RotatingOutputAdaptor:: 00118 RotatingOutputAdaptor(LoggerFactory factory) 00119 : loggerFactory(factory), 00120 logger(0, gcLock) 00121 { 00122 } 00123 00124 RotatingOutputAdaptor:: 00125 ~RotatingOutputAdaptor() 00126 { 00127 close(); 00128 } 00129 00130 void 00131 RotatingOutputAdaptor:: 00132 open(const std::string & filenamePattern, 00133 const std::string & periodPattern) 00134 { 00135 close(); 00136 00137 this->filenamePattern = filenamePattern; 00138 RotatingOutput::open(periodPattern); 00139 } 00140 00141 void 00142 RotatingOutputAdaptor:: 00143 close() 00144 { 00145 RotatingOutput::close(); 00146 } 00147 00148 void 00149 RotatingOutputAdaptor:: 00150 logMessage(const std::string & channel, 00151 const std::string & message) 00152 { 00153 if (!logger) 00154 throw ML::Exception("logging message with no logger"); 00155 logger()->logMessage(channel, message); 00156 } 00157 00158 Json::Value 00159 RotatingOutputAdaptor:: 00160 stats() const 00161 { 00162 Guard guard(lock); 00163 return logger()->stats(); 00164 } 00165 00166 void 00167 RotatingOutputAdaptor:: 00168 clearStats() 00169 { 00170 Guard guard(lock); 00171 logger()->clearStats(); 00172 } 00173 00174 void 00175 RotatingOutputAdaptor:: 00176 rotateSubordinate(Date currentPeriodStart) 00177 { 00178 string oldFilename = currentFilename; 00179 string newFilename = filenameFor(currentPeriodStart, filenamePattern); 00180 00181 if (onBeforeLogRotation) 00182 onBeforeLogRotation(oldFilename, newFilename); 00183 00184 // We don't close, as calling openSubordinate() will automatically close 00185 // the old one once it's not in use anymore 00186 openSubordinate(currentPeriodStart); 00187 00188 if (onAfterLogRotation) 00189 onAfterLogRotation(oldFilename, newFilename); 00190 } 00191 00192 void 00193 RotatingOutputAdaptor:: 00194 closeSubordinate() 00195 { 00196 if (logger) 00197 logger()->close(); 00198 logger.replace(0); 00199 } 00200 00201 void 00202 RotatingOutputAdaptor:: 00203 openSubordinate(Date currentPeriodStart) 00204 { 00205 currentFilename = filenameFor(currentPeriodStart, filenamePattern); 00206 logger.replace(loggerFactory(currentFilename)); 00207 } 00208 00209 00210 } // namespace Datacratic