RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/service_base.h
00001 /* service_base.h                                                  -*- C++ -*-
00002    Jeremy Barnes, 29 May 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #ifndef __service__service_base_h__
00008 #define __service__service_base_h__
00009 
00010 #include "port_range_service.h"
00011 #include "soa/service/stats_events.h"
00012 #include "stdarg.h"
00013 #include "jml/compiler/compiler.h"
00014 #include <string>
00015 #include <boost/shared_ptr.hpp>
00016 #include "jml/arch/exception.h"
00017 #include "jml/arch/format.h"
00018 #include "jml/arch/spinlock.h"
00019 #include <map>
00020 #include <mutex>
00021 #include "soa/jsoncpp/json.h"
00022 #include <unordered_map>
00023 #include <mutex>
00024 #include <thread>
00025 #include "jml/utils/exc_assert.h"
00026 #include "jml/utils/unnamed_bool.h"
00027 
00028 
00029 namespace zmq {
00030 struct context_t;
00031 } // namespace zmq
00032 
00033 namespace Datacratic {
00034 
00035 
00036 class MultiAggregator;
00037 class CarbonConnector;
00038 
00039 /*****************************************************************************/
00040 /* EVENT SERVICE                                                             */
00041 /*****************************************************************************/
00042 
00043 struct EventService {
00044     virtual ~EventService()
00045     {
00046     }
00047     
00048     virtual void onEvent(const std::string & name,
00049                          const char * event,
00050                          EventType type,
00051                          float value) = 0;
00052 
00053     virtual void dump(std::ostream & stream) const
00054     {
00055     }
00056 };
00057 
00058 
00059 /*****************************************************************************/
00060 /* NULL EVENT SERVICE                                                        */
00061 /*****************************************************************************/
00062 
00063 struct NullEventService : public EventService {
00064 
00065     NullEventService();
00066     ~NullEventService();
00067     
00068     virtual void onEvent(const std::string & name,
00069                          const char * event,
00070                          EventType type,
00071                          float value);
00072 
00073     virtual void dump(std::ostream & stream) const;
00074 
00075     std::unique_ptr<MultiAggregator> stats;
00076 };
00077 
00078 
00079 /*****************************************************************************/
00080 /* CARBON EVENT SERVICE                                                      */
00081 /*****************************************************************************/
00082 
00083 struct CarbonEventService : public EventService {
00084 
00085     CarbonEventService(std::shared_ptr<CarbonConnector> conn);
00086     CarbonEventService(const std::string & connection,
00087                        const std::string & prefix = "");
00088     CarbonEventService(const std::vector<std::string> & connections,
00089                        const std::string & prefix = "");
00090 
00091     virtual void onEvent(const std::string & name,
00092                          const char * event,
00093                          EventType type,
00094                          float value);
00095 
00096     std::shared_ptr<CarbonConnector> connector;
00097 };
00098 
00099 
00100 /*****************************************************************************/
00101 /* CONFIGURATION SERVICE                                                     */
00102 /*****************************************************************************/
00103 
00104 struct ConfigurationService {
00105 
00106     virtual ~ConfigurationService()
00107     {
00108     }
00109 
00110     enum ChangeType {
00111         VALUE_CHANGED,   
00112         DELETED,         
00113         CREATED,         
00114         NEW_CHILD        
00115 
00116     };
00117     
00119     typedef std::function<void (std::string path,
00120                                 ChangeType changeType)> OnChange;
00121 
00123     struct Watch {
00124         struct Data {
00125             Data(const OnChange & onChange)
00126                 : onChange(onChange),
00127                   watchReferences(1)
00128             {
00129             }
00130 
00131             OnChange onChange;
00132             int watchReferences;  
00133         };
00134         
00135         Watch()
00136             : data(0)
00137         {
00138         }
00139 
00143         Watch(const OnChange & onChange)
00144             : data(new Data(onChange))
00145         {
00146         }
00147 
00148         Watch(const Watch & other)
00149             : data(other.data)
00150         {
00151             if (data)
00152                 ++data->watchReferences;
00153         }
00154 
00155         Watch(Watch && other)
00156             : data(other.data)
00157         {
00158             other.data = 0;
00159         }
00160 
00161         void swap(Watch & other)
00162         {
00163             using std::swap;
00164             swap(data, other.data);
00165         }
00166 
00167         Watch & operator = (const Watch & other)
00168         {
00169             Watch newMe(other);
00170             swap(newMe);
00171             return *this;
00172         }
00173 
00174         Watch & operator = (Watch && other)
00175         {
00176             Watch newMe(other);
00177             swap(newMe);
00178             return *this;
00179         }
00180 
00181         void init(const OnChange & onChange)
00182         {
00183             if (data)
00184                 throw ML::Exception("double initializing watch");
00185             data.reset(new Data(onChange));
00186         }
00187 
00188         void trigger(const std::string & path, ChangeType change)
00189         {
00190             if (!data)
00191                 throw ML::Exception("triggered unused watch");
00192             if (!data->watchReferences <= 0)
00193                 return;
00194             ExcAssert(data);
00195             data->onChange(path, change);
00196         }
00197 
00198         void disable()
00199         {
00200             if (!data)
00201                 return;
00202             data->watchReferences = 0;
00203         }
00204 
00205         std::shared_ptr<Data> * get()
00206         {
00207             if (!data)
00208                 return nullptr;
00209             return new std::shared_ptr<Data>(data);
00210         }
00211 
00212         ~Watch()
00213         {
00214             if (data) {
00215                 __sync_add_and_fetch(&data->watchReferences, -1);
00216             }
00217             // TODO: we need to unregister ourselves here so that we don't get a
00218             // callback to something that doesn't exist
00219         }
00220 
00221         JML_IMPLEMENT_OPERATOR_BOOL(data.get());
00222 
00223     private:
00224         std::shared_ptr<Data> data;
00225     };
00226 
00232     virtual void set(const std::string & key,
00233                      const Json::Value & value) = 0;
00234 
00239     virtual std::string setUnique(const std::string & key,
00240                                   const Json::Value & value) = 0;
00241 
00243     virtual Json::Value
00244     getJson(const std::string & key, Watch watch = Watch()) = 0;
00245     
00249     typedef std::function<bool (std::string key, Json::Value value)>
00250     OnEntry;
00251 
00256     virtual std::vector<std::string>
00257     getChildren(const std::string & key,
00258                 Watch watch = Watch()) = 0;
00259 
00264     virtual bool forEachEntry(const OnEntry & onEntry,
00265                               const std::string & startPrefix = "") const = 0;
00266 
00268     virtual void removePath(const std::string & path) = 0;
00269 
00273     void dump(std::ostream & stream) const;
00274 
00277     Json::Value jsonDump() const;
00278 
00279     static std::pair<std::string, std::string>
00280     splitPath(const std::string & path);
00281 };
00282 
00283 
00284 /*****************************************************************************/
00285 /* INTERNAL CONFIGURATION SERVICE                                            */
00286 /*****************************************************************************/
00287 
00290 struct InternalConfigurationService : public ConfigurationService {
00291 
00292     virtual ~InternalConfigurationService()
00293     {
00294     }
00295 
00299     virtual Json::Value getJson(const std::string & value,
00300                                 Watch watch = Watch());
00301     
00303     virtual void set(const std::string & key,
00304                      const Json::Value & value);
00305 
00310     virtual std::string setUnique(const std::string & key,
00311                                   const Json::Value & value);
00312 
00313     virtual std::vector<std::string>
00314     getChildren(const std::string & key,
00315                 Watch watch = Watch());
00316 
00317     virtual bool forEachEntry(const OnEntry & onEntry,
00318                               const std::string & startPrefix = "") const;
00319 
00321     virtual void removePath(const std::string & path);
00322 
00323 private:
00324     struct Entry {
00325         Entry()
00326             : hasValue(false)
00327         {
00328         }
00329 
00330         bool hasValue;
00331         Json::Value value;
00332         Watch watch;
00333         std::unordered_map<std::string, std::shared_ptr<Entry> > children;
00334     };
00335 
00336     Entry & createNode(Entry & node, const std::string & key);
00337     const Entry * getNode(const Entry & node, const std::string & key) const;
00338     Entry * getNode(Entry & node, const std::string & key);
00339 
00341     Entry root;
00342     
00343     typedef std::recursive_mutex Lock;
00344     typedef std::unique_lock<Lock> Guard;
00345     mutable Lock lock;
00346 };
00347 
00348 
00349 /*****************************************************************************/
00350 /* SERVICE PROXIES                                                           */
00351 /*****************************************************************************/
00352 
00353 struct ServiceProxies {
00354     ServiceProxies();
00355 
00356     std::shared_ptr<EventService> events;
00357     std::shared_ptr<ConfigurationService> config;
00358     std::shared_ptr<PortRangeService> ports;
00359 
00361     std::shared_ptr<zmq::context_t> zmqContext;
00362 
00363     void logToCarbon(std::shared_ptr<CarbonConnector> conn);
00364     void logToCarbon(const std::string & carbonConnection,
00365                      const std::string & prefix = "");
00366     void logToCarbon(const std::vector<std::string> & carbonConnections,
00367                      const std::string & prefix = "");
00368 
00369     void useZookeeper(std::string hostname = "localhost:2181",
00370                       std::string prefix = "CWD");
00371 
00372     void usePortRanges(const std::string& path);
00373     void usePortRanges(const Json::Value& config);
00374 
00375     std::vector<std::string>
00376     getServiceClassInstances(std::string const & name,
00377                              std::string const & protocol = "http");
00378 
00379     std::vector<std::string>
00380     getEndpointInstances(std::string const & name,
00381                          std::string const & protocol = "http");
00382 
00383     // Bootstrap the proxies services using a json configuration.
00384     void bootstrap(const std::string& path);
00385     void bootstrap(const Json::Value& config);
00386 };
00387 
00388 
00389 /*****************************************************************************/
00390 /* EVENT RECORDER                                                            */
00391 /*****************************************************************************/
00392 
00395 struct EventRecorder {
00396 
00397     EventRecorder(const std::string & eventPrefix,
00398                   const std::shared_ptr<EventService> & events);
00399 
00400     EventRecorder(const std::string & eventPrefix,
00401                   const std::shared_ptr<ServiceProxies> & services);
00402 
00403 
00404     /*************************************************************************/
00405     /* EVENT RECORDING                                                       */
00406     /*************************************************************************/
00407 
00416     void recordEvent(const char * eventName,
00417                      EventType type = ET_COUNT,
00418                      float value = 1.0) const
00419     {
00420         EventService * es = 0;
00421         if (events_)
00422             es = events_.get();
00423         if (!es && services_)
00424             es = services_->events.get();
00425         if (!es)
00426         {
00427             std::cerr << "no services configured!!!!" << std::endl;
00428             return;
00429         }
00430         es->onEvent(eventPrefix_, eventName, type, value);
00431     }
00432 
00433     void recordEventFmt(EventType type,
00434                         float value,
00435                         const char * fmt, ...) const JML_FORMAT_STRING(4, 5);
00436 
00437     template<typename... Args>
00438     void recordHit(const std::string & event, Args... args) const
00439     {
00440         return recordEventFmt(ET_COUNT, 1.0, event.c_str(),
00441                               ML::forwardForPrintf(args)...);
00442     }
00443 
00444     template<typename... Args>
00445     JML_ALWAYS_INLINE
00446     void recordHit(const char * event, Args... args) const
00447     {
00448         return recordEventFmt(ET_COUNT, 1.0, event,
00449                               ML::forwardForPrintf(args)...);
00450     }
00451 
00452     void recordHit(const char * event) const
00453     {
00454         recordEvent(event, ET_COUNT, 1.0);
00455     }
00456 
00457     void recordHit(const std::string & event) const
00458     {
00459         recordEvent(event.c_str(), ET_COUNT, 1.0);
00460     }
00461 
00462     template<typename... Args>
00463     void recordCount(float count, const std::string & event, Args... args) const
00464     {
00465         return recordEventFmt(ET_COUNT, count, event.c_str(),
00466                               ML::forwardForPrintf(args)...);
00467     }
00468     
00469     template<typename... Args>
00470     JML_ALWAYS_INLINE
00471     void recordCount(float count, const char * event, Args... args) const
00472     {
00473         return recordEventFmt(ET_COUNT, count, event,
00474                               ML::forwardForPrintf(args)...);
00475     }
00476 
00477     void recordCount(float count, const char * event) const
00478     {
00479         recordEvent(event, ET_COUNT, count);
00480     }
00481 
00482     void recordCount(float count, const std::string & event) const
00483     {
00484         recordEvent(event.c_str(), ET_COUNT, count);
00485     }
00486 
00487     template<typename... Args>
00488     void recordOutcome(float outcome, const std::string & event, Args... args) const
00489     {
00490         return recordEventmt(ET_OUTCOME, outcome, event.c_str(),
00491                              ML::forwardForPrintf(args)...);
00492     }
00493     
00494     template<typename... Args>
00495     void recordOutcome(float outcome, const char * event, Args... args) const
00496     {
00497         return recordEventFmt(ET_OUTCOME, outcome, event,
00498                               ML::forwardForPrintf(args)...);
00499     }
00500 
00501     void recordOutcome(float outcome, const char * event) const
00502     {
00503         recordEvent(event, ET_OUTCOME, outcome);
00504     }
00505 
00506     void recordOutcome(float outcome, const std::string & event) const
00507     {
00508         recordEvent(event.c_str(), ET_OUTCOME, outcome);
00509     }
00510     
00511     template<typename... Args>
00512     void recordLevel(float level, const std::string & event, Args... args) const
00513     {
00514         return recordEventmt(ET_LEVEL, level, event.c_str(),
00515                              ML::forwardForPrintf(args)...);
00516     }
00517     
00518     template<typename... Args>
00519     void recordLevel(float level, const char * event, Args... args) const
00520     {
00521         return recordEventFmt(ET_LEVEL, level, event,
00522                               ML::forwardForPrintf(args)...);
00523     }
00524 
00525     void recordLevel(float level, const char * event) const
00526     {
00527         recordEvent(event, ET_LEVEL, level);
00528     }
00529 
00530     void recordLevel(float level, const std::string & event) const
00531     {
00532         recordEvent(event.c_str(), ET_LEVEL, level);
00533     }
00534 
00535 protected:
00536     std::string eventPrefix_;
00537     std::shared_ptr<EventService> events_;
00538     std::shared_ptr<ServiceProxies> services_;
00539 };
00540 
00541 
00542 /*****************************************************************************/
00543 /* SERVICE BASE                                                              */
00544 /*****************************************************************************/
00545 
00546 struct ServiceBase: public EventRecorder {
00548     ServiceBase(const std::string & serviceName,
00549                 std::shared_ptr<ServiceProxies>
00550                     = std::shared_ptr<ServiceProxies>());
00551 
00553     ServiceBase(const std::string & subServiceName,
00554                 ServiceBase & parent);
00555 
00556     virtual ~ServiceBase();
00557 
00558     void setServices(std::shared_ptr<ServiceProxies> services)
00559     {
00560         services_ = services;
00561     }
00562 
00563     std::shared_ptr<ServiceProxies> getServices() const
00564     {
00565         return services_;
00566     }
00567 
00568     std::string serviceName() const
00569     {
00570         return serviceName_;
00571     }
00572 
00573     /*************************************************************************/
00574     /* REGISTRATION                                                          */
00575     /*************************************************************************/
00576 
00584     void registerServiceProvider(const std::string & name,
00585                                  const std::vector<std::string> & serviceClasses);
00586 
00589     void unregisterServiceProvider(const std::string & name,
00590                                    const std::vector<std::string> & serviceClasses);
00591 
00592 
00593     /*************************************************************************/
00594     /* ZEROMQ CONTEXT                                                        */
00595     /*************************************************************************/
00596 
00597     std::shared_ptr<zmq::context_t> getZmqContext() const
00598     {
00599         return services_->zmqContext;
00600     }
00601 
00602     /*************************************************************************/
00603     /* EXCEPTION LOGGING                                                     */
00604     /*************************************************************************/
00605 
00606     void logException(std::exception_ptr exc,
00607                       const std::string & context)
00608     {
00609         std::cerr << "error: exception in context " << context << std::endl;
00610         std::rethrow_exception(exc);
00611     }
00612 
00613 
00614     /*************************************************************************/
00615     /* STATUS                                                                */
00616     /*************************************************************************/
00617     
00622     virtual Json::Value getServiceStatus() const;
00623 
00629     virtual void addChildServiceStatus(Json::Value & result) const;
00630 
00631 protected:
00632     std::shared_ptr<ServiceProxies> services_;
00633     std::string serviceName_;
00634     ServiceBase * parent_;
00635     std::vector<ServiceBase *> children_;
00636 };
00637 
00638 
00639 /*****************************************************************************/
00640 /* SUB SERVICE BASE                                                          */
00641 /*****************************************************************************/
00642 
00643 struct SubServiceBase : public ServiceBase {
00644 };
00645 
00646 } // namespace Datacratic
00647 
00648 
00649 #endif /* __service__service_base_h__ */
00650    
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator