![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00009 #include "soa/service/s3.h" 00010 #include "jml/utils/filter_streams.h" 00011 #include <boost/program_options/cmdline.hpp> 00012 #include <boost/program_options/options_description.hpp> 00013 #include <boost/program_options/positional_options.hpp> 00014 #include <boost/program_options/parsers.hpp> 00015 #include <boost/program_options/variables_map.hpp> 00016 00017 #include <poll.h> 00018 00019 namespace po = boost::program_options; 00020 00021 using namespace std; 00022 using namespace Datacratic; 00023 using namespace ML; 00024 00025 int main(int argc, char* argv[]) 00026 { 00027 ios::sync_with_stdio(true); 00028 00029 vector<string> outputFiles; 00030 string localFile; 00031 string s3KeyId; 00032 string s3Key; 00033 00034 po::options_description desc("Main options"); 00035 desc.add_options() 00036 ("output-uri,o", po::value(&outputFiles), "Output files/uris (can have multiple file/s3://bucket/object)") 00037 ("s3-key-id,I", po::value<string>(&s3KeyId)->required(), "S3 key id") 00038 ("s3-key,K", po::value<string>(&s3Key)->required(), "S3 key") 00039 ("help,h", "Produce help message"); 00040 00041 po::positional_options_description pos; 00042 pos.add("output-uri", -1); 00043 po::variables_map vm; 00044 bool showHelp = false; 00045 00046 try{ 00047 po::parsed_options parsed = po::command_line_parser(argc, argv) 00048 .options(desc) 00049 .positional(pos) 00050 .run(); 00051 po::store(parsed, vm); 00052 po::notify(vm); 00053 }catch(const std::exception & exc){ 00054 //invalid command line param 00055 cerr << "command line parsing error: " << exc.what() << endl; 00056 showHelp = true; 00057 } 00058 00059 //If one of the options is set to 'help'... 00060 if (showHelp || vm.count("help")){ 00061 //Display the options_description 00062 cout << desc << "\n"; 00063 return showHelp ? 1 : 0; 00064 } 00065 00066 registerS3Buckets(s3KeyId, s3Key); 00067 00068 std::vector<filter_ostream> streams; 00069 streams.reserve(outputFiles.size() + 1); 00070 00071 streams.emplace_back("-"); 00072 00073 for (auto f: outputFiles) 00074 streams.emplace_back(f); 00075 00076 size_t bufSize = 4096 * 16; 00077 char buf[bufSize]; 00078 00079 for (;;) { 00080 ssize_t res = read(0, buf, bufSize); 00081 if (res == 0) 00082 break; 00083 if (res == -1 && errno == EINTR) 00084 continue; 00085 if (res == -1) 00086 throw ML::Exception(errno, "read"); 00087 for (unsigned s = 0; s < streams.size(); ++s) 00088 streams[s].write(buf, res); 00089 streams[0] << std::flush; 00090 } 00091 00092 return 0; 00093 } 00094