![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* cloud_output.cc 00002 Jeremy Barnes, 18 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 */ 00006 00007 #include "cloud_output.h" 00008 #include <memory> 00009 #include <boost/filesystem.hpp> 00010 00011 namespace Datacratic { 00012 using namespace std; 00013 namespace fs = boost::filesystem; 00014 00015 CloudSink:: 00016 CloudSink(const std::string & uri, bool append, bool disambiguate): 00017 currentUri_(uri),tmpFileDir_("./cloudfiles/") 00018 { 00019 if (uri != "") 00020 open(uri, append, disambiguate); 00021 } 00022 00023 CloudSink:: 00024 ~CloudSink() 00025 { 00026 //cerr << "CloudSink::~CloudSink::was called with uri " << currentUri_ << endl; 00027 close(); 00028 } 00029 00030 void 00031 CloudSink:: 00032 open(const std::string & uri, bool append, bool disambiguate) 00033 { 00034 cloudStream.close(); 00035 cloudStream.open(uri, std::ios_base::out | 00036 (append ? std::ios_base::app : std::ios::openmode())); 00037 // Get the file name from the s3 uri. We want to preserve the path since 00038 // if we only get the filename we could overwrite files with the same name 00039 // but in a different directory. uri format is s3:// 00040 fs::path filePath(tmpFileDir_ + uri.substr(5)); 00041 // Get the path and create the directories 00042 fs::create_directories(filePath.parent_path()); 00043 // create the local file and directory 00044 fileStream.open(filePath.string(), std::ios_base::out | 00045 (append ? std::ios_base::app : std::ios::openmode())); 00046 } 00047 00048 void 00049 CloudSink:: 00050 close() 00051 { 00052 cloudStream.close(); 00053 fileStream.close(); 00054 fs::path filePath(tmpFileDir_ + currentUri_.substr(5)); 00055 cerr << "Erasing local file " << filePath.string() << endl; 00056 fs::remove(filePath); 00057 } 00058 00059 size_t 00060 CloudSink:: 00061 write(const char * data, size_t size) 00062 { 00063 //cerr << "CloudSink::write was called " << endl; 00064 fileStream.write(data, size); 00065 cloudStream.write(data, size); 00066 return size ; 00067 } 00068 00069 size_t 00070 CloudSink:: 00071 flush(FileFlushLevel flushLevel) 00072 { 00073 return 0; 00074 } 00075 std::shared_ptr<CompressingOutput::Sink> 00076 CloudOutput::createSink(const string & uri, bool append) 00077 { 00078 //cerr << "CloudOutput::createSink was called with uri " << uri << endl; 00079 return make_shared<CloudSink>(uri, append); 00080 } 00081 00082 RotatingCloudOutput::RotatingCloudOutput() 00083 : RotatingOutputAdaptor(std::bind(&RotatingCloudOutput::createFile, 00084 this, 00085 std::placeholders::_1)) 00086 00087 { 00088 00089 } 00090 00091 00092 void 00093 RotatingCloudOutput:: 00094 open(const std::string & filenamePattern, 00095 const std::string & periodPattern, 00096 const std::string & compression, 00097 int level) 00098 { 00099 this->compression = compression; 00100 this->level = level; 00101 00102 RotatingOutputAdaptor::open(filenamePattern, periodPattern); 00103 } 00104 00105 RotatingCloudOutput:: 00106 ~RotatingCloudOutput() 00107 { 00108 close(); 00109 } 00110 00111 CloudOutput * 00112 RotatingCloudOutput:: 00113 createFile(const string & filename) 00114 { 00115 //cerr << "RotatingCloudOutput::createFile. Entering..." << endl; 00116 std::unique_ptr<CloudOutput> result(new CloudOutput()); 00117 00118 result->onPreFileOpen = [=] (const string & fn) 00119 { 00120 if (this->onPreFileOpen) 00121 { 00122 this->onPreFileOpen(fn); 00123 } 00124 }; 00125 result->onPostFileOpen = [=] (const string & fn) 00126 { if (this->onPostFileOpen) this->onPostFileOpen(fn); }; 00127 result->onPreFileClose = [=] (const string & fn) 00128 { if (this->onPreFileClose) this->onPreFileClose(fn); }; 00129 result->onPostFileClose = [=] (const string & fn) 00130 { if (this->onPostFileClose) this->onPostFileClose(fn); }; 00131 result->onFileWrite = [=] (const string& channel, const std::size_t bytes) 00132 { if (this->onFileWrite) this->onFileWrite(channel, bytes); }; 00133 00134 result->open(filename, compression, level); 00135 return result.release(); 00136 } 00137 00138 00139 /*****************************************************************************/ 00140 /* CLOUD OUTPUT */ 00141 /*****************************************************************************/ 00142 00143 CloudOutput:: 00144 CloudOutput(const std::string & uri, 00145 size_t ringBufferSize) 00146 : NamedOutput(ringBufferSize) 00147 { 00148 } 00149 00150 CloudOutput:: 00151 ~CloudOutput() 00152 { 00153 } 00154 00155 } // namespace Datacratic
1.7.6.1