RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
testing/json_listener.cc
00001 /* json_feeder.cc
00002    Wolfgang Sourdeau, March 2013
00003    Copyright (c) 2013 Datacratic.  All rights reserved.
00004 
00005    A utility that listens to JSON requests on a specific port and logs them to
00006    disk
00007  */
00008 
00009 
00010 #include <stdio.h>
00011 
00012 #include <boost/program_options/cmdline.hpp>
00013 #include <boost/program_options/options_description.hpp>
00014 #include <boost/program_options/positional_options.hpp>
00015 #include <boost/program_options/parsers.hpp>
00016 #include <boost/program_options/variables_map.hpp>
00017 
00018 #include <jml/utils/ring_buffer.h>
00019 
00020 #include <soa/service/rest_service_endpoint.h>
00021 
00022 
00023 using namespace std;
00024 using namespace boost::program_options;
00025 using namespace Datacratic;
00026 
00027 const int BufferSizePower = 4;
00028 
00029 const int BufferSize = (1 << BufferSizePower);
00030 const int BufferMask = BufferSize - 1;
00031 
00032 struct JsonListener : public RestServiceEndpoint {
00033     JsonListener(const std::shared_ptr<zmq::context_t> & context)
00034         : RestServiceEndpoint(context),
00035           logFile_(NULL),
00036           history_(10), position_(0)
00037     {
00038         history_.resize(BufferSize);
00039     }
00040 
00041     ~JsonListener()
00042     {
00043         shutdown();
00044     }
00045 
00046     void init(std::shared_ptr<ConfigurationService> config,
00047               const std::string & endpointName,
00048               const std::string & filename)
00049     {
00050         logRequest = bind(&JsonListener::doLogRequest,
00051                           this, placeholders::_1, placeholders::_2);
00052 
00053         if (!logFile_) {
00054             logFile_ = fopen(filename.c_str(), "w");
00055             if (!logFile_)
00056                 throw ML::Exception(errno,
00057                                     "could not open log file '" + filename
00058                                     + "'", "init");
00059         }
00060         RestServiceEndpoint::init(config, endpointName);
00061 
00062         addPeriodic("JsonListener::updatePosition", 1.0,
00063                     bind(&JsonListener::updatePosition, this),
00064                     true /* single threaded */);
00065     }
00066 
00067     void startSync()
00068     {
00069         startTime_ = Date::now();
00070         MessageLoop::startSync();
00071     }
00072 
00073     bool isStatsRequest(const RestRequest & req)
00074         const
00075     {
00076         return (req.verb == "GET" && req.resource == "/stats");
00077     }
00078 
00079     void updatePosition()
00080     {
00081         uint32_t newPosition((position_ + 1) & BufferMask);
00082         uint32_t & counter_ = history_[newPosition];
00083         counter_ = 0;
00084         position_ = newPosition;
00085         fflush (logFile_);
00086     }
00087 
00088     void doLogRequest(const ConnectionId & conn, const RestRequest & req)
00089     {
00090         if (!isStatsRequest(req)) {
00091             Guard lock(historyLock);
00092 
00093             /* request accounting */
00094             uint32_t & counter_ = history_[position_];
00095             counter_++;
00096 
00097             Date now = Date::now();
00098             fprintf (logFile_, "%f\n%s\n",
00099                      now.secondsSinceEpoch(), req.payload.c_str());
00100         }
00101     }
00102 
00103     void handleRequest(const ConnectionId & conn, const RestRequest & req)
00104         const
00105     {
00106         Json::Value nothing;
00107 
00108         if (isStatsRequest(req)) {
00109             uint32_t totalReq(0);
00110             float meanReqPerSec;
00111 
00112             for (const uint32_t & reqPerSec: history_) {
00113                 totalReq += reqPerSec;
00114             }
00115             meanReqPerSec = float(totalReq) / BufferSize;
00116 
00117             string response("total (" + to_string(BufferSize) + " secs):"
00118                             + to_string(totalReq) + "\n"
00119                             + "mean: " + to_string(meanReqPerSec) + " req./sec\n");
00120 
00121             conn.sendResponse(200, response.c_str(), "text/plain");
00122         }
00123         else {
00124             conn.sendResponse(204, "");
00125         }
00126     }
00127 
00128     void shutdown()
00129     {
00130         if (logFile_) {
00131             fflush(logFile_);
00132             fclose(logFile_);
00133             logFile_ = NULL;
00134         }
00135     }
00136 
00137     Date startTime_;
00138 
00139     std::FILE *logFile_;
00140 
00141     /* request accounting */
00142     typedef unique_lock<mutex> Guard;
00143     mutable mutex historyLock;
00144 
00145     vector<uint32_t> history_;
00146     int position_;
00147 };
00148 
00149 
00150 int main(int argc, char *argv[])
00151 {
00152     int port(0);
00153     string filename;
00154 
00155     {
00156         using namespace boost::program_options;
00157 
00158         options_description configuration_options("Configuration options");
00159 
00160         configuration_options.add_options()
00161             ("port,p", value(&port),
00162              "port to listen on")
00163             ("filename,f", value(&filename),
00164              "filename");
00165  
00166         options_description all_opt;
00167         all_opt.add(configuration_options);
00168         all_opt.add_options()
00169             ("help,h", "print this message");
00170 
00171         variables_map vm;
00172         store(command_line_parser(argc, argv)
00173               .options(all_opt)
00174               .run(),
00175               vm);
00176         notify(vm);
00177 
00178         if (vm.count("help")) {
00179             cerr << all_opt << endl;
00180             exit(1);
00181         }
00182 
00183         if (!port) {
00184             cerr << "'port' parameter is required and must be > 0" << endl;
00185             exit(1);
00186         }
00187 
00188         if (filename.empty()) {
00189             cerr << "'filename' parameter is required" << endl;
00190             exit(1);
00191         }
00192     }
00193 
00194     auto proxies = make_shared<ServiceProxies>();
00195 
00196     JsonListener listener(proxies->zmqContext);
00197     listener.init(proxies->config, "listener", filename);
00198     listener.bindFixedHttpAddress("*", port);
00199 
00200     listener.startSync();
00201 
00202     return 0;
00203 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator