![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* cloud_output.h -*- C++ -*- 00002 Jeremy Barnes, 18 September 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Output that puts files into... the CLOUD! 00006 */ 00007 00008 #include "file_output.h" 00009 00010 00011 #include "logger.h" 00012 #include "jml/arch/timers.h" 00013 #include "soa/types/date.h" 00014 #include <boost/thread/recursive_mutex.hpp> 00015 #include "soa/types/periodic_utils.h" 00016 #include "compressor.h" 00017 00018 00019 namespace Datacratic { 00020 00021 00022 /*****************************************************************************/ 00023 /* CLOUD SINK */ 00024 /*****************************************************************************/ 00025 00028 struct CloudSink : public CompressingOutput::Sink { 00029 CloudSink(const std::string & uri = "", 00030 bool append = true, 00031 bool disambiguate = true); 00032 00033 virtual ~CloudSink(); 00034 00035 void open(const std::string & uri, 00036 bool append, 00037 bool disambiguate); 00038 00039 virtual void close(); 00040 00041 virtual size_t write(const char * data, size_t size); 00042 00043 virtual size_t flush(FileFlushLevel flushLevel); 00044 00046 std::string currentUri_; 00047 std::string tmpFileDir_; 00048 00050 ML::filter_ostream cloudStream; 00051 // we write to a temporary file on local disk which we delete when 00052 // the corresponding cloud stream is closed. 00053 ML::filter_ostream fileStream; 00054 00056 //int fd; 00057 }; 00058 00059 00060 /*****************************************************************************/ 00061 /* CLOUD OUTPUT */ 00062 /*****************************************************************************/ 00063 00078 struct CloudOutput : public NamedOutput { 00079 00080 CloudOutput(const std::string & uri = "", 00081 size_t ringBufferSize = 65536); 00082 00083 virtual ~CloudOutput(); 00084 00085 virtual std::shared_ptr<Sink> 00086 createSink(const std::string & uri, bool append); 00087 }; 00088 00089 00090 /*****************************************************************************/ 00091 /* ROTATING CLOUD OUTPUT */ 00092 /*****************************************************************************/ 00093 00096 struct RotatingCloudOutput : public RotatingOutputAdaptor { 00097 00098 RotatingCloudOutput(); 00099 00100 virtual ~RotatingCloudOutput(); 00101 00102 /* Open the cloud for rotation. */ 00103 /* uriPattern: The URI of the s3 bucket 00104 * periodPattern: The frequency of rotation i.e "2s" means "rotate 00105 * every 2 seconds." The accepted symbols are 00106 * x: milliseconds 00107 * s: seconds 00108 * m: minutes 00109 * h: hours 00110 * d: days 00111 * w: weeks 00112 * M: months 00113 * y: years 00114 */ 00115 void open(const std::string & uriPattern, 00116 const std::string & periodPattern, 00117 const std::string & compression = "", 00118 int level = -1); 00119 00120 private: 00121 CloudOutput * createFile(const std::string & filename); 00122 00123 std::string compression; 00124 int level; 00125 }; 00126 00127 } // namespace Datacratic 00128 00129
1.7.6.1