RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/cloud_output.h
00001 /* cloud_output.h                                                  -*- C++ -*-
00002    Jeremy Barnes, 18 September 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Output that puts files into... the CLOUD!
00006 */
00007 
00008 #include "file_output.h"
00009 
00010 
00011 #include "logger.h"
00012 #include "jml/arch/timers.h"
00013 #include "soa/types/date.h"
00014 #include <boost/thread/recursive_mutex.hpp>
00015 #include "soa/types/periodic_utils.h"
00016 #include "compressor.h"
00017 
00018 
00019 namespace Datacratic {
00020 
00021 
00022 /*****************************************************************************/
00023 /* CLOUD SINK                                                                 */
00024 /*****************************************************************************/
00025 
00028 struct CloudSink : public CompressingOutput::Sink {
00029     CloudSink(const std::string & uri = "",
00030               bool append = true,
00031               bool disambiguate = true);
00032 
00033     virtual ~CloudSink();
00034 
00035     void open(const std::string & uri,
00036               bool append,
00037               bool disambiguate);
00038 
00039     virtual void close();
00040 
00041     virtual size_t write(const char * data, size_t size);
00042 
00043     virtual size_t flush(FileFlushLevel flushLevel);
00044 
00046     std::string currentUri_;
00047     std::string tmpFileDir_;
00048 
00050     ML::filter_ostream cloudStream;
00051     // we write to a temporary file on local disk which we delete when
00052     // the corresponding cloud stream is closed.
00053     ML::filter_ostream fileStream;
00054 
00056     //int fd;
00057 };
00058 
00059 
00060 /*****************************************************************************/
00061 /* CLOUD OUTPUT                                                              */
00062 /*****************************************************************************/
00063 
00078 struct CloudOutput : public NamedOutput {
00079 
00080     CloudOutput(const std::string & uri = "",
00081                size_t ringBufferSize = 65536);
00082 
00083     virtual ~CloudOutput();
00084 
00085     virtual std::shared_ptr<Sink>
00086     createSink(const std::string & uri, bool append);
00087 };
00088 
00089 
00090 /*****************************************************************************/
00091 /* ROTATING CLOUD OUTPUT                                                      */
00092 /*****************************************************************************/
00093 
00096 struct RotatingCloudOutput : public RotatingOutputAdaptor {
00097 
00098     RotatingCloudOutput();
00099 
00100     virtual ~RotatingCloudOutput();
00101 
00102     /* Open the cloud for rotation. */
00103     /* uriPattern: The URI of the s3 bucket
00104      * periodPattern: The frequency of rotation i.e "2s" means "rotate
00105      * every 2 seconds." The accepted symbols are
00106      * x: milliseconds
00107      * s: seconds
00108      * m: minutes
00109      * h: hours
00110      * d: days
00111      * w: weeks
00112      * M: months
00113      * y: years
00114      */
00115     void open(const std::string & uriPattern,
00116               const std::string & periodPattern,
00117               const std::string & compression = "",
00118               int level = -1);
00119 
00120 private:
00121     CloudOutput * createFile(const std::string & filename);
00122 
00123     std::string compression;
00124     int level;
00125 };
00126 
00127 } // namespace Datacratic
00128 
00129 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator