RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* stat_aggregator.cc 00002 Jeremy Banres, 3 August 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "soa/service/stat_aggregator.h" 00008 #include "ace/INET_Addr.h" 00009 #include "jml/arch/exception.h" 00010 #include "jml/arch/format.h" 00011 #include <iostream> 00012 #include "jml/arch/cmp_xchg.h" 00013 #include "jml/arch/atomic_ops.h" 00014 #include "jml/utils/floating_point.h" 00015 #include "jml/utils/smart_ptr_utils.h" 00016 #include <boost/tuple/tuple.hpp> 00017 #include <algorithm> 00018 00019 00020 using namespace std; 00021 using namespace ML; 00022 00023 namespace Datacratic { 00024 00025 00026 /*****************************************************************************/ 00027 /* COUNTER AGGREGATOR */ 00028 /*****************************************************************************/ 00029 00030 CounterAggregator:: 00031 CounterAggregator() 00032 : start(Date::now()), total(0.0), 00033 totalsBuffer() // Keep 10sec of data. 00034 { 00035 } 00036 00037 CounterAggregator:: 00038 ~CounterAggregator() 00039 { 00040 } 00041 00042 void 00043 CounterAggregator:: 00044 record(float value) 00045 { 00046 double oldval = total; 00047 00048 while (!ML::cmp_xchg(total, oldval, oldval + value)); 00049 } 00050 00051 std::pair<double, Date> 00052 CounterAggregator:: 00053 reset() 00054 { 00055 double oldval = total; 00056 00057 while (!ML::cmp_xchg(total, oldval, 0.0)); 00058 00059 Date oldStart = start; 00060 start = Date::now(); 00061 00062 return make_pair(oldval, oldStart); 00063 } 00064 00065 std::vector<StatReading> 00066 CounterAggregator:: 00067 read(const std::string & prefix) 00068 { 00069 double current; 00070 Date oldStart; 00071 00072 boost::tie(current, oldStart) = reset(); 00073 00074 if (totalsBuffer.size() >= 10) 00075 totalsBuffer.pop_front(); 00076 totalsBuffer.push_back(current); 00077 00078 // Grab the average of the last x seconds to make sure that sparse values 00079 // show up in at least one of the x second carbon window. (x = 10 for now). 00080 double value = accumulate(totalsBuffer.begin(), totalsBuffer.end(), 0.0); 00081 value /= totalsBuffer.size(); 00082 00083 return vector<StatReading>(1, StatReading(prefix, value, start)); 00084 } 00085 00086 00087 /*****************************************************************************/ 00088 /* GAUGE AGGREGATOR */ 00089 /*****************************************************************************/ 00090 00091 GaugeAggregator:: 00092 GaugeAggregator() 00093 : values(new ML::distribution<float>()) 00094 { 00095 values->reserve(100); 00096 } 00097 00098 GaugeAggregator:: 00099 ~GaugeAggregator() 00100 { 00101 delete values; 00102 } 00103 00104 void 00105 GaugeAggregator:: 00106 record(float value) 00107 { 00108 ML::distribution<float> * current = values; 00109 while ((current = values) == 0 || !cmp_xchg(values, current, 00110 (ML::distribution<float>*)0)); 00111 00112 current->push_back(value); 00113 00114 memory_barrier(); 00115 00116 values = current; 00117 } 00118 00119 std::pair<ML::distribution<float> *, Date> 00120 GaugeAggregator:: 00121 reset() 00122 { 00123 ML::distribution<float> * current = values; 00124 ML::distribution<float> * new_current = new ML::distribution<float>(); 00125 00126 // TODO: reserve memory for new_current 00127 00128 while ((current = values) == 0 || !cmp_xchg(values, current, new_current)); 00129 00130 Date oldStart = start; 00131 start = Date::now(); 00132 00133 return make_pair(current, start); 00134 } 00135 00136 std::vector<StatReading> 00137 GaugeAggregator:: 00138 read(const std::string & prefix) 00139 { 00140 ML::distribution<float> * values; 00141 Date oldStart; 00142 00143 boost::tie(values, oldStart) = reset(); 00144 00145 std::auto_ptr<ML::distribution<float> > vptr(values); 00146 00147 if (values->empty()) 00148 return vector<StatReading>(); 00149 00150 vector<StatReading> result; 00151 00152 auto addMetric = [&] (const char * name, double value) 00153 { 00154 result.push_back(StatReading(prefix + "." + name, 00155 value, start)); 00156 }; 00157 00158 auto percentile = [&] (float outOf100) -> double 00159 { 00160 int element 00161 = std::max(0, 00162 std::min<int>(values->size() - 1, 00163 outOf100 / 100.0 * values->size())); 00164 return (*values)[element]; 00165 }; 00166 00167 std::sort(values->begin(), values->end(), 00168 ML::safe_less<float>()); 00169 00170 addMetric("mean", values->mean()); 00171 addMetric("std", values->std()); 00172 addMetric("upper", values->back()); 00173 addMetric("lower", values->front()); 00174 addMetric("count", values->size()); 00175 addMetric("upper_90", percentile(90)); 00176 addMetric("upper_95", percentile(95)); 00177 addMetric("upper_98", percentile(98)); 00178 00179 return result; 00180 } 00181 00182 } // namespace Datacratic