![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* carbon_connector.h -*- C++ -*- 00002 Jeremy Barnes, 3 August 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Class to accumulate operational stats and connect to carbon (directly). 00006 */ 00007 00008 #pragma once 00009 00010 #include "soa/service/stat_aggregator.h" 00011 #include "soa/service/stats_events.h" 00012 #include "ace/INET_Addr.h" 00013 #include "jml/stats/distribution.h" 00014 #include "soa/types/date.h" 00015 #include <unordered_map> 00016 #include <map> 00017 #include <memory> 00018 #include <thread> 00019 #include <condition_variable> 00020 #include <boost/thread.hpp> 00021 00022 00023 namespace Datacratic { 00024 00025 00026 /*****************************************************************************/ 00027 /* MULTI AGGREGATOR */ 00028 /*****************************************************************************/ 00029 00032 struct MultiAggregator { 00033 MultiAggregator(); 00034 00035 typedef std::function<void (const std::vector<StatReading> &)> 00036 OutputFn; 00037 00038 MultiAggregator(const std::string & path, 00039 const OutputFn & output = OutputFn(), 00040 double dumpInterval = 1.0, 00041 std::function<void ()> onStop 00042 = std::function<void ()>()); 00043 00044 ~MultiAggregator(); 00045 00046 void open(const std::string & path, 00047 const OutputFn & output = OutputFn(), 00048 double dumpInterval = 1.0, 00049 std::function<void ()> onStop 00050 = std::function<void ()>()); 00051 00052 OutputFn outputFn; 00053 00057 virtual void doStat(const std::vector<StatReading> & value) const; 00058 00060 void record(const std::string & stat, 00061 EventType type = ET_COUNT, 00062 float value = 1.0); 00063 00070 void recordLevel(const std::string & stat, 00071 float level); 00072 00077 void recordOccurrence(const std::string & stat); 00078 00082 void recordQuantity(const std::string & stat, 00083 float quantity); 00084 00088 void recordOutcome(const std::string & stat, 00089 float measure); 00090 00094 void dumpSync(std::ostream & stream) const; 00095 00099 void dump(); 00100 00102 void stop(); 00103 00104 protected: 00105 // Prefix to add to each stat to put it in its namespace 00106 std::string prefix; 00107 00108 // Function to call when it's stopped/shutdown 00109 std::function<void ()> onStop; 00110 00111 // Functions to implement the shutdown 00112 std::function<void ()> onPreShutdown, onPostShutdown; 00113 00114 private: 00115 // This map can only have things removed from it, never added to 00116 typedef std::map<std::string, std::shared_ptr<StatAggregator> > Stats; 00117 Stats stats; 00118 00119 // R/W mutex for reading/writing stats. Read to look up, write to 00120 // add a new stat. 00121 typedef std::mutex Lock; 00122 mutable Lock lock; 00123 00124 typedef std::unordered_map<std::string, Stats::iterator> LookupCache; 00125 00126 // Cache of lookups for each thread to avoid needing to acquire a lock 00127 // very much. 00128 boost::thread_specific_ptr<LookupCache> lookupCache; 00129 00131 void runDumpingThread(); 00132 00134 void shutdown(); 00135 00139 StatAggregator & getAggregator(const std::string & stat, 00140 StatAggregator * (*createFn) ()); 00141 00142 std::unique_ptr<std::thread> dumpingThread; 00143 00144 std::condition_variable cond; // to wake up dumping thread 00145 std::mutex m; 00146 bool doShutdown; // thread woken up to shutdown 00147 bool doDump; // thread woken up to dump 00148 00152 double dumpInterval; 00153 }; 00154 00155 00156 /*****************************************************************************/ 00157 /* CARBON CONNECTOR */ 00158 /*****************************************************************************/ 00159 00164 struct CarbonConnector : public MultiAggregator { 00165 CarbonConnector(); 00166 00167 CarbonConnector(const std::string & carbonAddr, 00168 const std::string & path, 00169 std::function<void ()> onStop 00170 = std::function<void ()>()); 00171 00172 CarbonConnector(const std::vector<std::string> & carbonAddrs, 00173 const std::string & path, 00174 std::function<void ()> onStop 00175 = std::function<void ()>()); 00176 00177 ~CarbonConnector(); 00178 00179 void open(const std::string & carbonAddr, 00180 const std::string & path, 00181 std::function<void ()> onStop 00182 = std::function<void ()>()); 00183 00184 void open(const std::vector<std::string> & carbonAddrs, 00185 const std::string & path, 00186 std::function<void ()> onStop 00187 = std::function<void ()>()); 00188 00190 virtual void doStat(const std::vector<StatReading> & value) const; 00191 00192 private: 00193 // Do our own internal shutdown 00194 void doShutdown(); 00195 00196 struct Connection { 00197 00198 Connection(const std::string & addr) 00199 : fd(-1), addr(addr), shutdown(0) 00200 { 00201 } 00202 00203 ~Connection(); 00204 00209 std::string connect(); 00210 void reconnect(); 00211 00212 void send(const std::string & message); 00213 00214 void close(); 00215 00217 void runReconnectionThread(); 00218 00219 // Networky things 00220 int fd; 00221 std::string addr; 00222 ACE_INET_Addr ip; 00223 00224 // Reconnection thread if we are trying to reconnect 00225 std::shared_ptr<std::thread> reconnectionThread; 00226 bool reconnectionThreadActive; 00227 bool reconnectionThreadJoinable; 00228 int shutdown; 00229 }; 00230 00231 std::vector<std::shared_ptr<Connection> > connections; 00232 }; 00233 00234 00235 } // namespace Datacratic