RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/s3tee.cc
00001 
00009 #include "soa/service/s3.h"
00010 #include "jml/utils/filter_streams.h"
00011 #include <boost/program_options/cmdline.hpp>
00012 #include <boost/program_options/options_description.hpp>
00013 #include <boost/program_options/positional_options.hpp> 
00014 #include <boost/program_options/parsers.hpp>
00015 #include <boost/program_options/variables_map.hpp>
00016 
00017 #include <poll.h>
00018 
00019 namespace po = boost::program_options;
00020 
00021 using namespace std;
00022 using namespace Datacratic;
00023 using namespace ML;
00024 
00025 int main(int argc, char* argv[])
00026 {
00027     ios::sync_with_stdio(true);
00028 
00029     vector<string> outputFiles;
00030     string localFile;
00031     string s3KeyId;
00032     string s3Key;
00033     
00034     po::options_description desc("Main options");
00035     desc.add_options()
00036         ("output-uri,o", po::value(&outputFiles), "Output files/uris (can have multiple file/s3://bucket/object)")
00037         ("s3-key-id,I", po::value<string>(&s3KeyId)->required(), "S3 key id")
00038         ("s3-key,K", po::value<string>(&s3Key)->required(), "S3 key")
00039         ("help,h", "Produce help message");
00040     
00041     po::positional_options_description pos;
00042     pos.add("output-uri", -1);
00043     po::variables_map vm;
00044     bool showHelp = false;
00045 
00046     try{
00047         po::parsed_options parsed = po::command_line_parser(argc, argv)
00048             .options(desc)
00049             .positional(pos)
00050             .run();
00051         po::store(parsed, vm);
00052         po::notify(vm);
00053     }catch(const std::exception & exc){
00054         //invalid command line param
00055         cerr << "command line parsing error: " << exc.what() << endl;
00056         showHelp = true;
00057     }
00058 
00059     //If one of the options is set to 'help'...
00060     if (showHelp || vm.count("help")){
00061         //Display the options_description
00062         cout << desc << "\n";
00063         return showHelp ? 1 : 0;
00064     }
00065 
00066     registerS3Buckets(s3KeyId, s3Key);
00067 
00068     std::vector<filter_ostream> streams;
00069     streams.reserve(outputFiles.size() + 1);
00070 
00071     streams.emplace_back("-");
00072 
00073     for (auto f: outputFiles)
00074         streams.emplace_back(f);
00075 
00076     size_t bufSize = 4096 * 16;
00077     char buf[bufSize];
00078 
00079     for (;;) {
00080         ssize_t res = read(0, buf, bufSize);
00081         if (res == 0)
00082             break;
00083         if (res == -1 && errno == EINTR)
00084             continue;
00085         if (res == -1)
00086             throw ML::Exception(errno, "read");
00087         for (unsigned s = 0;  s < streams.size();  ++s)
00088             streams[s].write(buf, res);
00089         streams[0] << std::flush;
00090     }
00091 
00092     return 0;
00093 }
00094 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator