RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/carbon_connector.h
00001 /* carbon_connector.h                                            -*- C++ -*-
00002    Jeremy Barnes, 3 August 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Class to accumulate operational stats and connect to carbon (directly).
00006 */
00007 
00008 #pragma once
00009 
00010 #include "soa/service/stat_aggregator.h"
00011 #include "soa/service/stats_events.h"
00012 #include "ace/INET_Addr.h"
00013 #include "jml/stats/distribution.h"
00014 #include "soa/types/date.h"
00015 #include <unordered_map>
00016 #include <map>
00017 #include <memory>
00018 #include <thread>
00019 #include <condition_variable>
00020 #include <boost/thread.hpp>
00021 
00022 
00023 namespace Datacratic {
00024 
00025 
00026 /*****************************************************************************/
00027 /* MULTI AGGREGATOR                                                          */
00028 /*****************************************************************************/
00029 
00032 struct MultiAggregator {
00033     MultiAggregator();
00034 
00035     typedef std::function<void (const std::vector<StatReading> &)>
00036         OutputFn;
00037 
00038     MultiAggregator(const std::string & path,
00039                     const OutputFn & output = OutputFn(),
00040                     double dumpInterval = 1.0,
00041                     std::function<void ()> onStop
00042                         = std::function<void ()>());
00043     
00044     ~MultiAggregator();
00045 
00046     void open(const std::string & path,
00047               const OutputFn & output = OutputFn(),
00048               double dumpInterval = 1.0,
00049               std::function<void ()> onStop
00050                   = std::function<void ()>());
00051 
00052     OutputFn outputFn;
00053 
00057     virtual void doStat(const std::vector<StatReading> & value) const;
00058 
00060     void record(const std::string & stat,
00061                 EventType type = ET_COUNT,
00062                 float value = 1.0);
00063 
00070     void recordLevel(const std::string & stat,
00071                      float level);
00072 
00077     void recordOccurrence(const std::string & stat);
00078 
00082     void recordQuantity(const std::string & stat,
00083                         float quantity);
00084     
00088     void recordOutcome(const std::string & stat,
00089                        float measure);
00090 
00094     void dumpSync(std::ostream & stream) const;
00095 
00099     void dump();
00100 
00102     void stop();
00103     
00104 protected:
00105     // Prefix to add to each stat to put it in its namespace
00106     std::string prefix;
00107 
00108     // Function to call when it's stopped/shutdown
00109     std::function<void ()> onStop;
00110 
00111     // Functions to implement the shutdown
00112     std::function<void ()> onPreShutdown, onPostShutdown;
00113 
00114 private:
00115     // This map can only have things removed from it, never added to
00116     typedef std::map<std::string, std::shared_ptr<StatAggregator> > Stats;
00117     Stats stats;
00118 
00119     // R/W mutex for reading/writing stats.  Read to look up, write to
00120     // add a new stat.
00121     typedef std::mutex Lock;
00122     mutable Lock lock;
00123 
00124     typedef std::unordered_map<std::string, Stats::iterator> LookupCache;
00125 
00126     // Cache of lookups for each thread to avoid needing to acquire a lock
00127     // very much.
00128     boost::thread_specific_ptr<LookupCache> lookupCache;
00129 
00131     void runDumpingThread();
00132 
00134     void shutdown();
00135 
00139     StatAggregator & getAggregator(const std::string & stat,
00140                                    StatAggregator * (*createFn) ());
00141     
00142     std::unique_ptr<std::thread> dumpingThread;
00143 
00144     std::condition_variable cond;  // to wake up dumping thread
00145     std::mutex m;
00146     bool doShutdown;                 // thread woken up to shutdown
00147     bool doDump;                     // thread woken up to dump
00148 
00152     double dumpInterval;
00153 };
00154 
00155 
00156 /*****************************************************************************/
00157 /* CARBON CONNECTOR                                                        */
00158 /*****************************************************************************/
00159 
00164 struct CarbonConnector : public MultiAggregator {
00165     CarbonConnector();
00166 
00167     CarbonConnector(const std::string & carbonAddr,
00168                     const std::string & path,
00169                     std::function<void ()> onStop
00170                         = std::function<void ()>());
00171 
00172     CarbonConnector(const std::vector<std::string> & carbonAddrs,
00173                     const std::string & path,
00174                     std::function<void ()> onStop
00175                         = std::function<void ()>());
00176 
00177     ~CarbonConnector();
00178 
00179     void open(const std::string & carbonAddr,
00180               const std::string & path,
00181               std::function<void ()> onStop
00182                   = std::function<void ()>());
00183 
00184     void open(const std::vector<std::string> & carbonAddrs,
00185               const std::string & path,
00186               std::function<void ()> onStop
00187                   = std::function<void ()>());
00188 
00190     virtual void doStat(const std::vector<StatReading> & value) const;
00191 
00192 private:
00193     // Do our own internal shutdown
00194     void doShutdown();
00195 
00196     struct Connection {
00197 
00198         Connection(const std::string & addr)
00199             : fd(-1), addr(addr), shutdown(0)
00200         {
00201         }
00202 
00203         ~Connection();
00204 
00209         std::string connect();
00210         void reconnect();
00211 
00212         void send(const std::string & message);
00213 
00214         void close();
00215 
00217         void runReconnectionThread();
00218 
00219         // Networky things
00220         int fd;
00221         std::string addr;
00222         ACE_INET_Addr ip;
00223 
00224         // Reconnection thread if we are trying to reconnect
00225         std::shared_ptr<std::thread> reconnectionThread;
00226         bool reconnectionThreadActive;
00227         bool reconnectionThreadJoinable;
00228         int shutdown;
00229     };
00230 
00231     std::vector<std::shared_ptr<Connection> > connections;
00232 };
00233 
00234 
00235 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator