RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/stat_aggregator.cc
00001 /* stat_aggregator.cc
00002    Jeremy Banres, 3 August 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "soa/service/stat_aggregator.h"
00008 #include "ace/INET_Addr.h"
00009 #include "jml/arch/exception.h"
00010 #include "jml/arch/format.h"
00011 #include <iostream>
00012 #include "jml/arch/cmp_xchg.h"
00013 #include "jml/arch/atomic_ops.h"
00014 #include "jml/utils/floating_point.h"
00015 #include "jml/utils/smart_ptr_utils.h"
00016 #include <boost/tuple/tuple.hpp>
00017 #include <algorithm>
00018 
00019 
00020 using namespace std;
00021 using namespace ML;
00022 
00023 namespace Datacratic {
00024 
00025 
00026 /*****************************************************************************/
00027 /* COUNTER AGGREGATOR                                                        */
00028 /*****************************************************************************/
00029 
00030 CounterAggregator::
00031 CounterAggregator()
00032     : start(Date::now()), total(0.0),
00033       totalsBuffer() // Keep 10sec of data.
00034 {
00035 }
00036 
00037 CounterAggregator::
00038 ~CounterAggregator()
00039 {
00040 }
00041 
00042 void
00043 CounterAggregator::
00044 record(float value)
00045 {
00046     double oldval = total;
00047 
00048     while (!ML::cmp_xchg(total, oldval, oldval + value));
00049 }
00050 
00051 std::pair<double, Date>
00052 CounterAggregator::
00053 reset()
00054 {
00055     double oldval = total;
00056 
00057     while (!ML::cmp_xchg(total, oldval, 0.0));
00058 
00059     Date oldStart = start;
00060     start = Date::now();
00061 
00062     return make_pair(oldval, oldStart);
00063 }
00064 
00065 std::vector<StatReading>
00066 CounterAggregator::
00067 read(const std::string & prefix)
00068 {
00069     double current;
00070     Date oldStart;
00071 
00072     boost::tie(current, oldStart) = reset();
00073 
00074     if (totalsBuffer.size() >= 10)
00075         totalsBuffer.pop_front();
00076     totalsBuffer.push_back(current);
00077 
00078     // Grab the average of the last x seconds to make sure that sparse values
00079     // show up in at least one of the x second carbon window. (x = 10 for now).
00080     double value = accumulate(totalsBuffer.begin(), totalsBuffer.end(), 0.0);
00081     value /= totalsBuffer.size();
00082 
00083     return vector<StatReading>(1, StatReading(prefix, value, start));
00084 }
00085 
00086 
00087 /*****************************************************************************/
00088 /* GAUGE AGGREGATOR                                                          */
00089 /*****************************************************************************/
00090 
00091 GaugeAggregator::
00092 GaugeAggregator()
00093     : values(new ML::distribution<float>())
00094 {
00095     values->reserve(100);
00096 }
00097 
00098 GaugeAggregator::
00099 ~GaugeAggregator()
00100 {
00101     delete values;
00102 }
00103 
00104 void
00105 GaugeAggregator::
00106 record(float value)
00107 {
00108     ML::distribution<float> * current = values;
00109     while ((current = values) == 0 || !cmp_xchg(values, current,
00110                                      (ML::distribution<float>*)0));
00111     
00112     current->push_back(value);
00113 
00114     memory_barrier();
00115 
00116     values = current;
00117 }
00118 
00119 std::pair<ML::distribution<float> *, Date>
00120 GaugeAggregator::
00121 reset()
00122 {
00123     ML::distribution<float> * current = values;
00124     ML::distribution<float> * new_current = new ML::distribution<float>();
00125 
00126     // TODO: reserve memory for new_current
00127 
00128     while ((current = values) == 0 || !cmp_xchg(values, current, new_current));
00129 
00130     Date oldStart = start;
00131     start = Date::now();
00132 
00133     return make_pair(current, start);
00134 }
00135 
00136 std::vector<StatReading>
00137 GaugeAggregator::
00138 read(const std::string & prefix)
00139 {
00140     ML::distribution<float> * values;
00141     Date oldStart;
00142 
00143     boost::tie(values, oldStart) = reset();
00144 
00145     std::auto_ptr<ML::distribution<float> > vptr(values);
00146 
00147     if (values->empty())
00148         return vector<StatReading>();
00149     
00150     vector<StatReading> result;
00151 
00152     auto addMetric = [&] (const char * name, double value)
00153         {
00154             result.push_back(StatReading(prefix + "." + name,
00155                                          value, start));
00156         };
00157     
00158     auto percentile = [&] (float outOf100) -> double
00159         {
00160             int element
00161                 = std::max(0,
00162                            std::min<int>(values->size() - 1,
00163                                          outOf100 / 100.0 * values->size()));
00164             return (*values)[element];
00165         };
00166     
00167     std::sort(values->begin(), values->end(),
00168               ML::safe_less<float>());
00169     
00170     addMetric("mean", values->mean());
00171     addMetric("std", values->std());
00172     addMetric("upper", values->back());
00173     addMetric("lower", values->front());
00174     addMetric("count", values->size());
00175     addMetric("upper_90", percentile(90));
00176     addMetric("upper_95", percentile(95));
00177     addMetric("upper_98", percentile(98));
00178 
00179     return result;
00180 }
00181 
00182 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator