RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/stats_output.cc
00001 /* stats_output.cc
00002    Jeremy Barnes, 8 February 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Various outputs to write stats.
00006 */
00007 
00008 #include "stats_output.h"
00009 #include <boost/make_shared.hpp>
00010 
00011 
00012 using namespace std;
00013 using namespace ML;
00014 
00015 
00016 namespace Datacratic {
00017 
00018 
00019 /*****************************************************************************/
00020 /* CONSOLE STATS OUTPUT                                                      */
00021 /*****************************************************************************/
00022 
00023 ConsoleStatsOutput::
00024 ConsoleStatsOutput (bool debug)
00025     : debug(debug),
00026       lastSeconds(Date::now())
00027 {
00028 }
00029 
00030 void
00031 ConsoleStatsOutput::
00032 logMessage(const string & channel, const string & message)
00033 {
00034     if (debug) {
00035         cerr << channel << " => " << message << endl;
00036     }
00037 
00038     lock_guard<mutex> guard (lock);
00039 
00040     auto it = logStats.find(channel);
00041     if (it == logStats.end()) {
00042         it = logStats.insert(make_pair(channel, StatItem(channel))).first;
00043     }
00044 
00045     it->second.messages++;
00046     it->second.bytes += message.size();
00047 }
00048 
00049 void
00050 ConsoleStatsOutput::
00051 dumpStats ()
00052 {
00053     vector<StatItem> stats;
00054 
00055     {
00056         lock_guard<mutex> guard (lock);
00057         for (auto it = logStats.begin(), end = logStats.end(); it != end; ++it) {
00058             stats.push_back(it->second);
00059         }
00060         logStats.clear();
00061     }
00062 
00063     double seconds = Date::now().secondsSince(lastSeconds);
00064     sort(stats.begin(), stats.end(), [](const StatItem& lhs, const StatItem& rhs) -> bool {
00065             return lhs.bytes > rhs.bytes;
00066         });
00067 
00068     cerr << Date::now() << endl;
00069     for (auto it = stats.begin(), end = stats.end(); it != end; ++it) {
00070         uint64_t count = it->messages;
00071         double kb = it->bytes / 1024.0;
00072 
00073         cerr << ML::format("%-20s: %6lld msgs %8.2fkb\trate: %8.2f/s %8.2fkb/s",
00074                            it->channel.c_str(), (long long)count,
00075                            kb, count/seconds, kb/seconds) << endl;
00076     }
00077 
00078     lastSeconds = Date::now();
00079 }
00080 
00081 
00082 /*****************************************************************************/
00083 /* CARBON STATS OUTPUT                                                       */
00084 /*****************************************************************************/
00085 
00086 CarbonStatsOutput::
00087 CarbonStatsOutput(const string& carbonConnection,
00088                   const string& carbonPrefix)
00089     : EventRecorder("",
00090                     std::make_shared<CarbonEventService>
00091                     (carbonConnection, carbonPrefix))
00092 {
00093     recordHit("loggerUp");        
00094 }
00095 
00096 CarbonStatsOutput::
00097 CarbonStatsOutput(std::shared_ptr<EventService> events,
00098                   string prefix)
00099     : EventRecorder(prefix, events)
00100 {
00101     recordHit("loggerUp");        
00102 }
00103 
00104 void
00105 CarbonStatsOutput::
00106 logMessage(const string & channel, const string & message)
00107 {
00108     recordHit(channel);
00109 }
00110 
00111 void
00112 CarbonStatsOutput::
00113 recordBytesWrittenToFile (const string& file, size_t bytes)
00114 {
00115     static const string prefix ("bytesWrittenToDisk.");
00116     EventRecorder::recordCount(bytes, prefix + file);
00117 }
00118 
00119 void
00120 CarbonStatsOutput::
00121 recordLevel (const string& name, double val)
00122 {
00123     EventRecorder::recordLevel(val, name);
00124 }
00125 
00126 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator