RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/file_output.cc
00001 /* file_output.cc
00002    Jeremy Barnes, 29 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "file_output.h"
00008 #include "jml/utils/parse_context.h"
00009 #include <boost/tuple/tuple.hpp>
00010 
00011 #define BOOST_SYSTEM_NO_DEPRECATED
00012 
00013 #include <boost/filesystem.hpp>
00014 
00015 
00016 using namespace std;
00017 using namespace boost::filesystem;
00018 using namespace ML;
00019 
00020 
00021 namespace Datacratic {
00022 
00023 
00024 /*****************************************************************************/
00025 /* FILE OUTPUT                                                               */
00026 /*****************************************************************************/
00027 
00028 NamedOutput::
00029 NamedOutput(size_t ringBufferSize)
00030     : CompressingOutput(ringBufferSize)
00031 {
00032 }
00033 
00034 NamedOutput::
00035 ~NamedOutput()
00036 {
00037     close();
00038 }
00039 
00040 void
00041 NamedOutput::
00042 open(const std::string & filename,
00043      const std::string & compression,
00044      int level)
00045 {
00046     close();
00047     switchFile(filename, compression, level);
00048     startWorkerThread();
00049 }
00050 
00051 void
00052 NamedOutput::
00053 closeFile()
00054 {
00055     if (!sink) return;
00056 
00057     //if (onPreFileClose)
00058     //    onPreFileClose(currentFilename);
00059 
00060     closeCompressor();
00061     sink->close();
00062 
00063     //if (onPostFileClose)
00064     //    onPostFileClose(currentFilename);
00065 }
00066 
00067 void
00068 NamedOutput::
00069 rotate(const std::string & newFilename,
00070        const std::string & newCompression,
00071        int newLevel)
00072 {
00073     auto op = [=] ()
00074         {
00075             this->switchFile(newFilename, newCompression, newLevel);
00076         };
00077 
00078     pushOperation(op);
00079 }
00080 
00081 void
00082 NamedOutput::
00083 close()
00084 {
00085     WorkerThreadOutput::stopWorkerThread();
00086     closeFile();
00087 }
00088 
00089 void
00090 NamedOutput::
00091 switchFile(const std::string & filename,
00092            const std::string & compression_,
00093            int level)
00094 {
00095     closeCompressor();
00096     closeFile();
00097 
00098     string compression = compression_;
00099     if (compression == "")
00100         compression = Compressor::filenameToCompression(filename);
00101 
00102     string fn = filename;
00103 
00104     bool append = false;
00105     if (filename.length() > 0 && filename[filename.length() - 1] == '+') {
00106         fn = string(filename, 0, filename.length() - 2);
00107         append = true;
00108     }
00109 
00110     if (onPreFileOpen)
00111         onPreFileOpen(fn);
00112 
00113     CompressingOutput::open(createSink(fn, append), compression, level);
00114 
00115     if (onPostFileOpen)
00116         onPostFileOpen(fn);
00117 }
00118 
00119 
00120 /*****************************************************************************/
00121 /* FILE SINK                                                                 */
00122 /*****************************************************************************/
00123 
00124 FileSink::
00125 FileSink(const std::string & filename, bool append, bool disambiguate)
00126     : fd(-1)
00127 {
00128     if (filename != "")
00129         open(filename, append, disambiguate);
00130 }
00131 
00132 FileSink::
00133 ~FileSink()
00134 {
00135     close();
00136 }
00137 
00138 void
00139 FileSink::
00140 open(const std::string & filename, bool append, bool disambiguate)
00141 {
00142     close();
00143 
00144     string fn = filename;
00145 
00146     path p = filename;
00147     path d = p;  d.remove_filename();
00148 
00149     string e = p.extension().string();
00150     path s = p.stem();
00151     while (s.extension() != "") {
00152         e = s.extension().string() + e;
00153         s = s.stem();
00154     }
00155 
00156     //cerr << "dir " << d << " ext " << e << " stem " << s << endl;
00157 
00158     if (!exists(d)) {
00159         //cerr << "creating directory " << d << endl;
00160         create_directories(d);
00161     }
00162 
00163     if (disambiguate) {
00164         string base = (d/s).string();
00165         string disamb = "";
00166         int n = 0;
00167 
00168         while (exists((fn = base + disamb + e))) {
00169             cerr << "file " + (base + disamb + e) + " exists; disambiguating"
00170                  << endl;
00171             disamb = ML::format(".%d", ++n);
00172         }
00173 
00174     }    
00175 
00176     cerr << "opening file " << fn << " with append " << append << endl;
00177 
00178     fd = ::open(fn.c_str(),
00179                 O_WRONLY | O_CREAT | O_EXCL | (append ? O_APPEND : 0),
00180                 00664);
00181     
00182     if (fd == -1)
00183         throw ML::Exception(errno, "open of " + filename);
00184 
00185     currentFilename = fn;
00186 
00187 }
00188 
00189 void
00190 FileSink::
00191 close()
00192 {
00193     //cerr << "closing file " << currentFilename << endl;
00194 
00195     if (fd != -1) {
00196         int r = fdatasync(fd);
00197         if (r == -1)
00198             throw ML::Exception(errno, "fdatasync " + currentFilename);
00199 
00200         int res = ::close(fd);
00201         if (res == -1)
00202             throw ML::Exception(errno, "close " + currentFilename);
00203         fd = -1;
00204 
00205         currentFilename = "";
00206     }
00207 }
00208 
00209 size_t
00210 FileSink::
00211 write(const char * data, size_t size)
00212 {
00213     size_t done = 0;
00214     
00215     while (done < size) {
00216         ssize_t res = ::write(fd, data + done, size - done);
00217         if (res == -1)
00218             throw ML::Exception(errno, "write to FileSink for "
00219                                 + currentFilename);
00220         done += res;
00221     }
00222 
00223     return done;
00224 }
00225 
00226 size_t
00227 FileSink::
00228 flush(FileFlushLevel flushLevel)
00229 {
00230     switch (flushLevel) {
00231 
00232     case FLUSH_NONE:
00233         return 0;
00234 
00235     case FLUSH_TO_OS:
00236         return 0;   // we call write() straight away, so ther is no internal
00237                     // buffering and nothing to do here
00238 
00239     case FLUSH_TO_DISK: {
00240         int r = fdatasync(fd);
00241         if (r == -1)
00242             throw ML::Exception(errno, "fdatasync for " + currentFilename);
00243         return 0;
00244     }
00245 
00246     default:
00247         throw ML::Exception("FileSink::flush(): unknown flush level");
00248     }
00249 }
00250 
00251 
00252 /*****************************************************************************/
00253 /* FILE OUTPUT                                                               */
00254 /*****************************************************************************/
00255 
00256 FileOutput::
00257 FileOutput(const std::string & filename, size_t ringBufferSize)
00258     : NamedOutput(ringBufferSize)
00259 {
00260     if (filename != "")
00261         open(filename);
00262 }
00263 
00264 FileOutput::
00265 ~FileOutput()
00266 {
00267     close();
00268 }
00269 
00270 std::shared_ptr<CompressingOutput::Sink>
00271 FileOutput::
00272 createSink(const std::string & filename, bool append)
00273 {
00274     return std::make_shared<FileSink>(filename, append);
00275 }
00276 
00277 
00278 /*****************************************************************************/
00279 /* ROTATING FILE OUTPUT                                                      */
00280 /*****************************************************************************/
00281 
00282 RotatingFileOutput::
00283 RotatingFileOutput()
00284     : RotatingOutputAdaptor(std::bind(&RotatingFileOutput::createFile,
00285                                       this,
00286                                       std::placeholders::_1))
00287 {
00288 }
00289 
00290 RotatingFileOutput::
00291 ~RotatingFileOutput()
00292 {
00293     close();
00294 }
00295     
00296 void
00297 RotatingFileOutput::
00298 open(const std::string & filenamePattern,
00299      const std::string & periodPattern,
00300      const std::string & compression,
00301      int level)
00302 {
00303     this->compression = compression;
00304     this->level = level;
00305 
00306     RotatingOutputAdaptor::open(filenamePattern, periodPattern);
00307 }
00308 
00309 FileOutput *
00310 RotatingFileOutput::
00311 createFile(const std::string & filename)
00312 {
00313     std::unique_ptr<FileOutput> result(new FileOutput());
00314 
00315     result->onPreFileOpen = [=] (const string & fn)
00316         { if (this->onPreFileOpen) this->onPreFileOpen(fn); };
00317     result->onPostFileOpen = [=] (const string & fn)
00318         { if (this->onPostFileOpen) this->onPostFileOpen(fn); };
00319     result->onPreFileClose = [=] (const string & fn)
00320         { if (this->onPreFileClose) this->onPreFileClose(fn); };
00321     result->onPostFileClose = [=] (const string & fn)
00322         { if (this->onPostFileClose) this->onPostFileClose(fn); };
00323     result->onFileWrite = [=] (const string& channel, const std::size_t bytes)
00324     { if (this->onFileWrite) this->onFileWrite(channel, bytes); };
00325 
00326     result->open(filename, compression, level);
00327 
00328     return result.release();
00329 }
00330 
00331 
00332 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator