RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* router_runner.cc 00002 Jeremy Barnes, 13 December 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Tool to run the router. 00006 */ 00007 00008 #include "router_runner.h" 00009 00010 #include <boost/program_options/cmdline.hpp> 00011 #include <boost/program_options/options_description.hpp> 00012 #include <boost/program_options/positional_options.hpp> 00013 #include <boost/program_options/parsers.hpp> 00014 #include <boost/program_options/variables_map.hpp> 00015 #include <boost/algorithm/string.hpp> 00016 #include <boost/thread/thread.hpp> 00017 00018 #include "rtbkit/core/router/router.h" 00019 #include "rtbkit/core/banker/slave_banker.h" 00020 #include "jml/arch/timers.h" 00021 #include "jml/utils/file_functions.h" 00022 00023 using namespace std; 00024 using namespace ML; 00025 using namespace Datacratic; 00026 using namespace RTBKIT; 00027 00028 static inline Json::Value loadJsonFromFile(const std::string & filename) 00029 { 00030 ML::File_Read_Buffer buf(filename); 00031 return Json::parse(std::string(buf.start(), buf.end())); 00032 } 00033 00034 00035 /*****************************************************************************/ 00036 /* ROUTER RUNNER */ 00037 /*****************************************************************************/ 00038 00039 00040 RouterRunner:: 00041 RouterRunner() 00042 : lossSeconds(15.0) 00043 { 00044 } 00045 00046 void 00047 RouterRunner:: 00048 doOptions(int argc, char ** argv, 00049 const boost::program_options::options_description & opts) 00050 { 00051 using namespace boost::program_options; 00052 00053 options_description router_options("Router options"); 00054 router_options.add_options() 00055 ("loss-seconds,l", value<float>(&lossSeconds), 00056 "number of seconds after which a loss is assumed") 00057 ("log-uri", value<vector<string> >(&logUris), 00058 "URI to publish logs to") 00059 ("exchange-configuration,x", value<string>(&exchangeConfigurationFile), 00060 "configuration file with exchange data"); 00061 00062 options_description all_opt = opts; 00063 all_opt 00064 .add(serviceArgs.makeProgramOptions()) 00065 .add(router_options); 00066 all_opt.add_options() 00067 ("help,h", "print this message"); 00068 00069 variables_map vm; 00070 store(command_line_parser(argc, argv) 00071 .options(all_opt) 00072 //.positional(p) 00073 .run(), 00074 vm); 00075 notify(vm); 00076 00077 if (vm.count("help")) { 00078 cerr << all_opt << endl; 00079 exit(1); 00080 } 00081 } 00082 00083 void 00084 RouterRunner:: 00085 init() 00086 { 00087 auto proxies = serviceArgs.makeServiceProxies(); 00088 00089 string servicePrefix("router"); 00090 banker = std::make_shared<SlaveBanker>(proxies->zmqContext, 00091 proxies->config, 00092 servicePrefix + ".slaveBanker"); 00093 00094 exchangeConfig = loadJsonFromFile(exchangeConfigurationFile); 00095 00096 router = std::make_shared<Router>(proxies, servicePrefix); 00097 router->init(); 00098 router->setBanker(banker); 00099 router->bindTcp(); 00100 } 00101 00102 void 00103 RouterRunner:: 00104 start() 00105 { 00106 banker->start(); 00107 router->start(); 00108 00109 // Start all exchanges 00110 for (auto & exchange: exchangeConfig) 00111 router->startExchange(exchange); 00112 } 00113 00114 void 00115 RouterRunner:: 00116 shutdown() 00117 { 00118 router->shutdown(); 00119 banker->shutdown(); 00120 }