RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* json_feeder.cc 00002 Wolfgang Sourdeau, March 2013 00003 Copyright (c) 2013 Datacratic. All rights reserved. 00004 00005 A utility that listens to JSON requests on a specific port and logs them to 00006 disk 00007 */ 00008 00009 00010 #include <stdio.h> 00011 00012 #include <boost/program_options/cmdline.hpp> 00013 #include <boost/program_options/options_description.hpp> 00014 #include <boost/program_options/positional_options.hpp> 00015 #include <boost/program_options/parsers.hpp> 00016 #include <boost/program_options/variables_map.hpp> 00017 00018 #include <jml/utils/ring_buffer.h> 00019 00020 #include <soa/service/rest_service_endpoint.h> 00021 00022 00023 using namespace std; 00024 using namespace boost::program_options; 00025 using namespace Datacratic; 00026 00027 const int BufferSizePower = 4; 00028 00029 const int BufferSize = (1 << BufferSizePower); 00030 const int BufferMask = BufferSize - 1; 00031 00032 struct JsonListener : public RestServiceEndpoint { 00033 JsonListener(const std::shared_ptr<zmq::context_t> & context) 00034 : RestServiceEndpoint(context), 00035 logFile_(NULL), 00036 history_(10), position_(0) 00037 { 00038 history_.resize(BufferSize); 00039 } 00040 00041 ~JsonListener() 00042 { 00043 shutdown(); 00044 } 00045 00046 void init(std::shared_ptr<ConfigurationService> config, 00047 const std::string & endpointName, 00048 const std::string & filename) 00049 { 00050 logRequest = bind(&JsonListener::doLogRequest, 00051 this, placeholders::_1, placeholders::_2); 00052 00053 if (!logFile_) { 00054 logFile_ = fopen(filename.c_str(), "w"); 00055 if (!logFile_) 00056 throw ML::Exception(errno, 00057 "could not open log file '" + filename 00058 + "'", "init"); 00059 } 00060 RestServiceEndpoint::init(config, endpointName); 00061 00062 addPeriodic("JsonListener::updatePosition", 1.0, 00063 bind(&JsonListener::updatePosition, this), 00064 true /* single threaded */); 00065 } 00066 00067 void startSync() 00068 { 00069 startTime_ = Date::now(); 00070 MessageLoop::startSync(); 00071 } 00072 00073 bool isStatsRequest(const RestRequest & req) 00074 const 00075 { 00076 return (req.verb == "GET" && req.resource == "/stats"); 00077 } 00078 00079 void updatePosition() 00080 { 00081 uint32_t newPosition((position_ + 1) & BufferMask); 00082 uint32_t & counter_ = history_[newPosition]; 00083 counter_ = 0; 00084 position_ = newPosition; 00085 fflush (logFile_); 00086 } 00087 00088 void doLogRequest(const ConnectionId & conn, const RestRequest & req) 00089 { 00090 if (!isStatsRequest(req)) { 00091 Guard lock(historyLock); 00092 00093 /* request accounting */ 00094 uint32_t & counter_ = history_[position_]; 00095 counter_++; 00096 00097 Date now = Date::now(); 00098 fprintf (logFile_, "%f\n%s\n", 00099 now.secondsSinceEpoch(), req.payload.c_str()); 00100 } 00101 } 00102 00103 void handleRequest(const ConnectionId & conn, const RestRequest & req) 00104 const 00105 { 00106 Json::Value nothing; 00107 00108 if (isStatsRequest(req)) { 00109 uint32_t totalReq(0); 00110 float meanReqPerSec; 00111 00112 for (const uint32_t & reqPerSec: history_) { 00113 totalReq += reqPerSec; 00114 } 00115 meanReqPerSec = float(totalReq) / BufferSize; 00116 00117 string response("total (" + to_string(BufferSize) + " secs):" 00118 + to_string(totalReq) + "\n" 00119 + "mean: " + to_string(meanReqPerSec) + " req./sec\n"); 00120 00121 conn.sendResponse(200, response.c_str(), "text/plain"); 00122 } 00123 else { 00124 conn.sendResponse(204, ""); 00125 } 00126 } 00127 00128 void shutdown() 00129 { 00130 if (logFile_) { 00131 fflush(logFile_); 00132 fclose(logFile_); 00133 logFile_ = NULL; 00134 } 00135 } 00136 00137 Date startTime_; 00138 00139 std::FILE *logFile_; 00140 00141 /* request accounting */ 00142 typedef unique_lock<mutex> Guard; 00143 mutable mutex historyLock; 00144 00145 vector<uint32_t> history_; 00146 int position_; 00147 }; 00148 00149 00150 int main(int argc, char *argv[]) 00151 { 00152 int port(0); 00153 string filename; 00154 00155 { 00156 using namespace boost::program_options; 00157 00158 options_description configuration_options("Configuration options"); 00159 00160 configuration_options.add_options() 00161 ("port,p", value(&port), 00162 "port to listen on") 00163 ("filename,f", value(&filename), 00164 "filename"); 00165 00166 options_description all_opt; 00167 all_opt.add(configuration_options); 00168 all_opt.add_options() 00169 ("help,h", "print this message"); 00170 00171 variables_map vm; 00172 store(command_line_parser(argc, argv) 00173 .options(all_opt) 00174 .run(), 00175 vm); 00176 notify(vm); 00177 00178 if (vm.count("help")) { 00179 cerr << all_opt << endl; 00180 exit(1); 00181 } 00182 00183 if (!port) { 00184 cerr << "'port' parameter is required and must be > 0" << endl; 00185 exit(1); 00186 } 00187 00188 if (filename.empty()) { 00189 cerr << "'filename' parameter is required" << endl; 00190 exit(1); 00191 } 00192 } 00193 00194 auto proxies = make_shared<ServiceProxies>(); 00195 00196 JsonListener listener(proxies->zmqContext); 00197 listener.init(proxies->config, "listener", filename); 00198 listener.bindFixedHttpAddress("*", port); 00199 00200 listener.startSync(); 00201 00202 return 0; 00203 }