RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/cloud_output.cc
00001 /* cloud_output.cc
00002    Jeremy Barnes, 18 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005 */
00006 
00007 #include "cloud_output.h"
00008 #include <memory>
00009 #include <boost/filesystem.hpp>
00010 
00011 namespace Datacratic {
00012 using namespace std;
00013 namespace fs = boost::filesystem;
00014 
00015 CloudSink::
00016 CloudSink(const std::string & uri, bool append, bool disambiguate):
00017           currentUri_(uri),tmpFileDir_("./cloudfiles/")
00018 {
00019     if (uri != "")
00020         open(uri, append, disambiguate);
00021 }
00022 
00023 CloudSink::
00024 ~CloudSink()
00025 {
00026     //cerr << "CloudSink::~CloudSink::was called with uri " << currentUri_ << endl;
00027     close();
00028 }
00029 
00030 void
00031 CloudSink::
00032 open(const std::string & uri, bool append, bool disambiguate)
00033 {
00034     cloudStream.close();
00035     cloudStream.open(uri, std::ios_base::out |
00036                           (append ? std::ios_base::app : std::ios::openmode()));
00037     // Get the file name from the s3 uri. We want to preserve the path since
00038     // if we only get the filename we could overwrite files with the same name
00039     // but in a different directory. uri format is s3://
00040     fs::path filePath(tmpFileDir_ + uri.substr(5));
00041     // Get the path and create the directories
00042     fs::create_directories(filePath.parent_path());
00043     // create the local file and directory
00044     fileStream.open(filePath.string(), std::ios_base::out |
00045             (append ? std::ios_base::app : std::ios::openmode()));
00046 }
00047 
00048 void
00049 CloudSink::
00050 close()
00051 {
00052    cloudStream.close();
00053    fileStream.close();
00054    fs::path filePath(tmpFileDir_ + currentUri_.substr(5));
00055    cerr << "Erasing local file " << filePath.string() << endl;
00056    fs::remove(filePath);
00057 }
00058 
00059 size_t
00060 CloudSink::
00061 write(const char * data, size_t size)
00062 {
00063     //cerr << "CloudSink::write was called " << endl;
00064     fileStream.write(data, size);
00065     cloudStream.write(data, size);
00066     return size ;
00067 }
00068 
00069 size_t
00070 CloudSink::
00071 flush(FileFlushLevel flushLevel)
00072 {
00073     return 0;
00074 }
00075 std::shared_ptr<CompressingOutput::Sink>
00076 CloudOutput::createSink(const string & uri, bool append)
00077 {
00078     //cerr << "CloudOutput::createSink was called with uri " << uri << endl;
00079     return make_shared<CloudSink>(uri, append);
00080 }
00081 
00082 RotatingCloudOutput::RotatingCloudOutput()
00083 : RotatingOutputAdaptor(std::bind(&RotatingCloudOutput::createFile,
00084                                       this,
00085                                       std::placeholders::_1))
00086 
00087 {
00088 
00089 }
00090 
00091 
00092 void
00093 RotatingCloudOutput::
00094 open(const std::string & filenamePattern,
00095      const std::string & periodPattern,
00096      const std::string & compression,
00097      int level)
00098 {
00099     this->compression = compression;
00100     this->level = level;
00101 
00102     RotatingOutputAdaptor::open(filenamePattern, periodPattern);
00103 }
00104 
00105 RotatingCloudOutput::
00106 ~RotatingCloudOutput()
00107 {
00108     close();
00109 }
00110 
00111 CloudOutput *
00112 RotatingCloudOutput::
00113 createFile(const string & filename)
00114 {
00115     //cerr << "RotatingCloudOutput::createFile. Entering..." << endl;
00116     std::unique_ptr<CloudOutput> result(new CloudOutput());
00117 
00118     result->onPreFileOpen = [=] (const string & fn)
00119     {
00120         if (this->onPreFileOpen)
00121         {
00122             this->onPreFileOpen(fn);
00123         }
00124     };
00125     result->onPostFileOpen = [=] (const string & fn)
00126         { if (this->onPostFileOpen) this->onPostFileOpen(fn); };
00127     result->onPreFileClose = [=] (const string & fn)
00128         { if (this->onPreFileClose) this->onPreFileClose(fn); };
00129     result->onPostFileClose = [=] (const string & fn)
00130         { if (this->onPostFileClose) this->onPostFileClose(fn); };
00131     result->onFileWrite = [=] (const string& channel, const std::size_t bytes)
00132     { if (this->onFileWrite) this->onFileWrite(channel, bytes); };
00133 
00134     result->open(filename, compression, level);
00135     return result.release();
00136 }
00137 
00138 
00139 /*****************************************************************************/
00140 /* CLOUD OUTPUT                                                              */
00141 /*****************************************************************************/
00142 
00143 CloudOutput::
00144 CloudOutput(const std::string & uri,
00145             size_t ringBufferSize)
00146     : NamedOutput(ringBufferSize)
00147 {
00148 }
00149 
00150 CloudOutput::
00151 ~CloudOutput()
00152 {
00153 }
00154 
00155 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator