RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* file_output.cc 00002 Jeremy Barnes, 29 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "file_output.h" 00008 #include "jml/utils/parse_context.h" 00009 #include <boost/tuple/tuple.hpp> 00010 00011 #define BOOST_SYSTEM_NO_DEPRECATED 00012 00013 #include <boost/filesystem.hpp> 00014 00015 00016 using namespace std; 00017 using namespace boost::filesystem; 00018 using namespace ML; 00019 00020 00021 namespace Datacratic { 00022 00023 00024 /*****************************************************************************/ 00025 /* FILE OUTPUT */ 00026 /*****************************************************************************/ 00027 00028 NamedOutput:: 00029 NamedOutput(size_t ringBufferSize) 00030 : CompressingOutput(ringBufferSize) 00031 { 00032 } 00033 00034 NamedOutput:: 00035 ~NamedOutput() 00036 { 00037 close(); 00038 } 00039 00040 void 00041 NamedOutput:: 00042 open(const std::string & filename, 00043 const std::string & compression, 00044 int level) 00045 { 00046 close(); 00047 switchFile(filename, compression, level); 00048 startWorkerThread(); 00049 } 00050 00051 void 00052 NamedOutput:: 00053 closeFile() 00054 { 00055 if (!sink) return; 00056 00057 //if (onPreFileClose) 00058 // onPreFileClose(currentFilename); 00059 00060 closeCompressor(); 00061 sink->close(); 00062 00063 //if (onPostFileClose) 00064 // onPostFileClose(currentFilename); 00065 } 00066 00067 void 00068 NamedOutput:: 00069 rotate(const std::string & newFilename, 00070 const std::string & newCompression, 00071 int newLevel) 00072 { 00073 auto op = [=] () 00074 { 00075 this->switchFile(newFilename, newCompression, newLevel); 00076 }; 00077 00078 pushOperation(op); 00079 } 00080 00081 void 00082 NamedOutput:: 00083 close() 00084 { 00085 WorkerThreadOutput::stopWorkerThread(); 00086 closeFile(); 00087 } 00088 00089 void 00090 NamedOutput:: 00091 switchFile(const std::string & filename, 00092 const std::string & compression_, 00093 int level) 00094 { 00095 closeCompressor(); 00096 closeFile(); 00097 00098 string compression = compression_; 00099 if (compression == "") 00100 compression = Compressor::filenameToCompression(filename); 00101 00102 string fn = filename; 00103 00104 bool append = false; 00105 if (filename.length() > 0 && filename[filename.length() - 1] == '+') { 00106 fn = string(filename, 0, filename.length() - 2); 00107 append = true; 00108 } 00109 00110 if (onPreFileOpen) 00111 onPreFileOpen(fn); 00112 00113 CompressingOutput::open(createSink(fn, append), compression, level); 00114 00115 if (onPostFileOpen) 00116 onPostFileOpen(fn); 00117 } 00118 00119 00120 /*****************************************************************************/ 00121 /* FILE SINK */ 00122 /*****************************************************************************/ 00123 00124 FileSink:: 00125 FileSink(const std::string & filename, bool append, bool disambiguate) 00126 : fd(-1) 00127 { 00128 if (filename != "") 00129 open(filename, append, disambiguate); 00130 } 00131 00132 FileSink:: 00133 ~FileSink() 00134 { 00135 close(); 00136 } 00137 00138 void 00139 FileSink:: 00140 open(const std::string & filename, bool append, bool disambiguate) 00141 { 00142 close(); 00143 00144 string fn = filename; 00145 00146 path p = filename; 00147 path d = p; d.remove_filename(); 00148 00149 string e = p.extension().string(); 00150 path s = p.stem(); 00151 while (s.extension() != "") { 00152 e = s.extension().string() + e; 00153 s = s.stem(); 00154 } 00155 00156 //cerr << "dir " << d << " ext " << e << " stem " << s << endl; 00157 00158 if (!exists(d)) { 00159 //cerr << "creating directory " << d << endl; 00160 create_directories(d); 00161 } 00162 00163 if (disambiguate) { 00164 string base = (d/s).string(); 00165 string disamb = ""; 00166 int n = 0; 00167 00168 while (exists((fn = base + disamb + e))) { 00169 cerr << "file " + (base + disamb + e) + " exists; disambiguating" 00170 << endl; 00171 disamb = ML::format(".%d", ++n); 00172 } 00173 00174 } 00175 00176 cerr << "opening file " << fn << " with append " << append << endl; 00177 00178 fd = ::open(fn.c_str(), 00179 O_WRONLY | O_CREAT | O_EXCL | (append ? O_APPEND : 0), 00180 00664); 00181 00182 if (fd == -1) 00183 throw ML::Exception(errno, "open of " + filename); 00184 00185 currentFilename = fn; 00186 00187 } 00188 00189 void 00190 FileSink:: 00191 close() 00192 { 00193 //cerr << "closing file " << currentFilename << endl; 00194 00195 if (fd != -1) { 00196 int r = fdatasync(fd); 00197 if (r == -1) 00198 throw ML::Exception(errno, "fdatasync " + currentFilename); 00199 00200 int res = ::close(fd); 00201 if (res == -1) 00202 throw ML::Exception(errno, "close " + currentFilename); 00203 fd = -1; 00204 00205 currentFilename = ""; 00206 } 00207 } 00208 00209 size_t 00210 FileSink:: 00211 write(const char * data, size_t size) 00212 { 00213 size_t done = 0; 00214 00215 while (done < size) { 00216 ssize_t res = ::write(fd, data + done, size - done); 00217 if (res == -1) 00218 throw ML::Exception(errno, "write to FileSink for " 00219 + currentFilename); 00220 done += res; 00221 } 00222 00223 return done; 00224 } 00225 00226 size_t 00227 FileSink:: 00228 flush(FileFlushLevel flushLevel) 00229 { 00230 switch (flushLevel) { 00231 00232 case FLUSH_NONE: 00233 return 0; 00234 00235 case FLUSH_TO_OS: 00236 return 0; // we call write() straight away, so ther is no internal 00237 // buffering and nothing to do here 00238 00239 case FLUSH_TO_DISK: { 00240 int r = fdatasync(fd); 00241 if (r == -1) 00242 throw ML::Exception(errno, "fdatasync for " + currentFilename); 00243 return 0; 00244 } 00245 00246 default: 00247 throw ML::Exception("FileSink::flush(): unknown flush level"); 00248 } 00249 } 00250 00251 00252 /*****************************************************************************/ 00253 /* FILE OUTPUT */ 00254 /*****************************************************************************/ 00255 00256 FileOutput:: 00257 FileOutput(const std::string & filename, size_t ringBufferSize) 00258 : NamedOutput(ringBufferSize) 00259 { 00260 if (filename != "") 00261 open(filename); 00262 } 00263 00264 FileOutput:: 00265 ~FileOutput() 00266 { 00267 close(); 00268 } 00269 00270 std::shared_ptr<CompressingOutput::Sink> 00271 FileOutput:: 00272 createSink(const std::string & filename, bool append) 00273 { 00274 return std::make_shared<FileSink>(filename, append); 00275 } 00276 00277 00278 /*****************************************************************************/ 00279 /* ROTATING FILE OUTPUT */ 00280 /*****************************************************************************/ 00281 00282 RotatingFileOutput:: 00283 RotatingFileOutput() 00284 : RotatingOutputAdaptor(std::bind(&RotatingFileOutput::createFile, 00285 this, 00286 std::placeholders::_1)) 00287 { 00288 } 00289 00290 RotatingFileOutput:: 00291 ~RotatingFileOutput() 00292 { 00293 close(); 00294 } 00295 00296 void 00297 RotatingFileOutput:: 00298 open(const std::string & filenamePattern, 00299 const std::string & periodPattern, 00300 const std::string & compression, 00301 int level) 00302 { 00303 this->compression = compression; 00304 this->level = level; 00305 00306 RotatingOutputAdaptor::open(filenamePattern, periodPattern); 00307 } 00308 00309 FileOutput * 00310 RotatingFileOutput:: 00311 createFile(const std::string & filename) 00312 { 00313 std::unique_ptr<FileOutput> result(new FileOutput()); 00314 00315 result->onPreFileOpen = [=] (const string & fn) 00316 { if (this->onPreFileOpen) this->onPreFileOpen(fn); }; 00317 result->onPostFileOpen = [=] (const string & fn) 00318 { if (this->onPostFileOpen) this->onPostFileOpen(fn); }; 00319 result->onPreFileClose = [=] (const string & fn) 00320 { if (this->onPreFileClose) this->onPreFileClose(fn); }; 00321 result->onPostFileClose = [=] (const string & fn) 00322 { if (this->onPostFileClose) this->onPostFileClose(fn); }; 00323 result->onFileWrite = [=] (const string& channel, const std::size_t bytes) 00324 { if (this->onFileWrite) this->onFileWrite(channel, bytes); }; 00325 00326 result->open(filename, compression, level); 00327 00328 return result.release(); 00329 } 00330 00331 00332 } // namespace Datacratic