RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/rotating_output.cc
00001 /* rotating_output.cc
00002    Jeremy Barnes, 19 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #include "rotating_output.h"
00008 #include "jml/arch/futex.h"
00009 
00010 using namespace std;
00011 using namespace ML;
00012 
00013 
00014 namespace Datacratic {
00015 
00016 
00017 /*****************************************************************************/
00018 /* ROTATING OUTPUT                                                           */
00019 /*****************************************************************************/
00020 
00021 RotatingOutput::
00022 RotatingOutput()
00023 {
00024 }
00025 
00026 RotatingOutput::
00027 ~RotatingOutput()
00028 {
00029 }
00030     
00031 void
00032 RotatingOutput::
00033 open(const std::string & periodPattern)
00034 {
00035     close();
00036 
00037     std::tie(granularity, number)
00038         = parsePeriod(periodPattern);
00039 
00040     std::tie(currentPeriodStart, interval)
00041         = findPeriod(Date::now(), granularity, number);
00042 
00043     shutdown_ = false;
00044     up_ = false;
00045 
00046     // NOTE: we can pass by reference since the log thread never touches
00047     // sem until this function has exited
00048     rotateThread
00049         .reset(new boost::thread([&](){ this->runRotateThread(); }));
00050 
00051     // Wait until we're ready
00052     while (!up_)
00053         futex_wait(up_, false);
00054 }
00055     
00056 void
00057 RotatingOutput::
00058 close()
00059 {
00060     //cerr << "rfo close" << endl;
00061     if (rotateThread) {
00062         shutdown_ = true;
00063         futex_wake(shutdown_);
00064         rotateThread->join();
00065         //cerr << "rfo joined" << endl;
00066         rotateThread.reset();
00067         shutdown_ = false;
00068     }
00069     else closeSubordinate();
00070 }
00071 
00072 void
00073 RotatingOutput::
00074 performRotation()
00075 {
00076     Guard guard(lock);
00077     
00078     currentPeriodStart.addSeconds(interval);
00079 
00080     rotateSubordinate(currentPeriodStart);
00081 }
00082 
00083 void
00084 RotatingOutput::
00085 runRotateThread()
00086 {
00087     openSubordinate(currentPeriodStart);
00088 
00089     up_ = true;
00090     futex_wake(up_);
00091 
00092     while (!shutdown_) {
00093         Date now = Date::now();
00094         Date nextRotation = currentPeriodStart.plusSeconds(interval);
00095         double secondsUntilRotation = now.secondsUntil(nextRotation);
00096 
00097         if (secondsUntilRotation <= 0) {
00098             performRotation();
00099         }
00100         else {
00101             futex_wait(shutdown_, false, secondsUntilRotation);
00102         }
00103     }
00104     
00105     cerr << "rfo shutdown acknowledged" << endl;
00106 
00107     closeSubordinate();
00108 
00109     cerr << "file closed" << endl;
00110 }
00111 
00112 
00113 /*****************************************************************************/
00114 /* ROTATING OUTPUT ADAPTOR                                                   */
00115 /*****************************************************************************/
00116 
00117 RotatingOutputAdaptor::
00118 RotatingOutputAdaptor(LoggerFactory factory)
00119     : loggerFactory(factory),
00120       logger(0, gcLock)
00121 {
00122 }
00123 
00124 RotatingOutputAdaptor::
00125 ~RotatingOutputAdaptor()
00126 {
00127     close();
00128 }
00129     
00130 void
00131 RotatingOutputAdaptor::
00132 open(const std::string & filenamePattern,
00133      const std::string & periodPattern)
00134 {
00135     close();
00136 
00137     this->filenamePattern = filenamePattern;
00138     RotatingOutput::open(periodPattern);
00139 }
00140 
00141 void
00142 RotatingOutputAdaptor::
00143 close()
00144 {
00145     RotatingOutput::close();
00146 }
00147     
00148 void
00149 RotatingOutputAdaptor::
00150 logMessage(const std::string & channel,
00151            const std::string & message)
00152 {
00153     if (!logger)
00154         throw ML::Exception("logging message with no logger");
00155     logger()->logMessage(channel, message);
00156 }
00157     
00158 Json::Value
00159 RotatingOutputAdaptor::
00160 stats() const
00161 {
00162     Guard guard(lock);
00163     return logger()->stats();
00164 }
00165 
00166 void
00167 RotatingOutputAdaptor::
00168 clearStats()
00169 {
00170     Guard guard(lock);
00171     logger()->clearStats();
00172 }
00173 
00174 void
00175 RotatingOutputAdaptor::
00176 rotateSubordinate(Date currentPeriodStart)
00177 {
00178     string oldFilename = currentFilename;
00179     string newFilename = filenameFor(currentPeriodStart, filenamePattern);
00180 
00181     if (onBeforeLogRotation)
00182         onBeforeLogRotation(oldFilename, newFilename);
00183 
00184     // We don't close, as calling openSubordinate() will automatically close
00185     // the old one once it's not in use anymore
00186     openSubordinate(currentPeriodStart);
00187     
00188     if (onAfterLogRotation)
00189         onAfterLogRotation(oldFilename, newFilename);
00190 }
00191 
00192 void
00193 RotatingOutputAdaptor::
00194 closeSubordinate()
00195 {
00196     if (logger)
00197         logger()->close();
00198     logger.replace(0);
00199 }
00200 
00201 void
00202 RotatingOutputAdaptor::
00203 openSubordinate(Date currentPeriodStart)
00204 {
00205     currentFilename = filenameFor(currentPeriodStart, filenamePattern);
00206     logger.replace(loggerFactory(currentFilename));
00207 }
00208 
00209 
00210 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator