![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* service_base.h -*- C++ -*- 00002 Jeremy Barnes, 29 May 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #ifndef __service__service_base_h__ 00008 #define __service__service_base_h__ 00009 00010 #include "port_range_service.h" 00011 #include "soa/service/stats_events.h" 00012 #include "stdarg.h" 00013 #include "jml/compiler/compiler.h" 00014 #include <string> 00015 #include <boost/shared_ptr.hpp> 00016 #include "jml/arch/exception.h" 00017 #include "jml/arch/format.h" 00018 #include "jml/arch/spinlock.h" 00019 #include <map> 00020 #include <mutex> 00021 #include "soa/jsoncpp/json.h" 00022 #include <unordered_map> 00023 #include <mutex> 00024 #include <thread> 00025 #include "jml/utils/exc_assert.h" 00026 #include "jml/utils/unnamed_bool.h" 00027 00028 00029 namespace zmq { 00030 struct context_t; 00031 } // namespace zmq 00032 00033 namespace Datacratic { 00034 00035 00036 class MultiAggregator; 00037 class CarbonConnector; 00038 00039 /*****************************************************************************/ 00040 /* EVENT SERVICE */ 00041 /*****************************************************************************/ 00042 00043 struct EventService { 00044 virtual ~EventService() 00045 { 00046 } 00047 00048 virtual void onEvent(const std::string & name, 00049 const char * event, 00050 EventType type, 00051 float value) = 0; 00052 00053 virtual void dump(std::ostream & stream) const 00054 { 00055 } 00056 }; 00057 00058 00059 /*****************************************************************************/ 00060 /* NULL EVENT SERVICE */ 00061 /*****************************************************************************/ 00062 00063 struct NullEventService : public EventService { 00064 00065 NullEventService(); 00066 ~NullEventService(); 00067 00068 virtual void onEvent(const std::string & name, 00069 const char * event, 00070 EventType type, 00071 float value); 00072 00073 virtual void dump(std::ostream & stream) const; 00074 00075 std::unique_ptr<MultiAggregator> stats; 00076 }; 00077 00078 00079 /*****************************************************************************/ 00080 /* CARBON EVENT SERVICE */ 00081 /*****************************************************************************/ 00082 00083 struct CarbonEventService : public EventService { 00084 00085 CarbonEventService(std::shared_ptr<CarbonConnector> conn); 00086 CarbonEventService(const std::string & connection, 00087 const std::string & prefix = ""); 00088 CarbonEventService(const std::vector<std::string> & connections, 00089 const std::string & prefix = ""); 00090 00091 virtual void onEvent(const std::string & name, 00092 const char * event, 00093 EventType type, 00094 float value); 00095 00096 std::shared_ptr<CarbonConnector> connector; 00097 }; 00098 00099 00100 /*****************************************************************************/ 00101 /* CONFIGURATION SERVICE */ 00102 /*****************************************************************************/ 00103 00104 struct ConfigurationService { 00105 00106 virtual ~ConfigurationService() 00107 { 00108 } 00109 00110 enum ChangeType { 00111 VALUE_CHANGED, 00112 DELETED, 00113 CREATED, 00114 NEW_CHILD 00115 00116 }; 00117 00119 typedef std::function<void (std::string path, 00120 ChangeType changeType)> OnChange; 00121 00123 struct Watch { 00124 struct Data { 00125 Data(const OnChange & onChange) 00126 : onChange(onChange), 00127 watchReferences(1) 00128 { 00129 } 00130 00131 OnChange onChange; 00132 int watchReferences; 00133 }; 00134 00135 Watch() 00136 : data(0) 00137 { 00138 } 00139 00143 Watch(const OnChange & onChange) 00144 : data(new Data(onChange)) 00145 { 00146 } 00147 00148 Watch(const Watch & other) 00149 : data(other.data) 00150 { 00151 if (data) 00152 ++data->watchReferences; 00153 } 00154 00155 Watch(Watch && other) 00156 : data(other.data) 00157 { 00158 other.data = 0; 00159 } 00160 00161 void swap(Watch & other) 00162 { 00163 using std::swap; 00164 swap(data, other.data); 00165 } 00166 00167 Watch & operator = (const Watch & other) 00168 { 00169 Watch newMe(other); 00170 swap(newMe); 00171 return *this; 00172 } 00173 00174 Watch & operator = (Watch && other) 00175 { 00176 Watch newMe(other); 00177 swap(newMe); 00178 return *this; 00179 } 00180 00181 void init(const OnChange & onChange) 00182 { 00183 if (data) 00184 throw ML::Exception("double initializing watch"); 00185 data.reset(new Data(onChange)); 00186 } 00187 00188 void trigger(const std::string & path, ChangeType change) 00189 { 00190 if (!data) 00191 throw ML::Exception("triggered unused watch"); 00192 if (!data->watchReferences <= 0) 00193 return; 00194 ExcAssert(data); 00195 data->onChange(path, change); 00196 } 00197 00198 void disable() 00199 { 00200 if (!data) 00201 return; 00202 data->watchReferences = 0; 00203 } 00204 00205 std::shared_ptr<Data> * get() 00206 { 00207 if (!data) 00208 return nullptr; 00209 return new std::shared_ptr<Data>(data); 00210 } 00211 00212 ~Watch() 00213 { 00214 if (data) { 00215 __sync_add_and_fetch(&data->watchReferences, -1); 00216 } 00217 // TODO: we need to unregister ourselves here so that we don't get a 00218 // callback to something that doesn't exist 00219 } 00220 00221 JML_IMPLEMENT_OPERATOR_BOOL(data.get()); 00222 00223 private: 00224 std::shared_ptr<Data> data; 00225 }; 00226 00232 virtual void set(const std::string & key, 00233 const Json::Value & value) = 0; 00234 00239 virtual std::string setUnique(const std::string & key, 00240 const Json::Value & value) = 0; 00241 00243 virtual Json::Value 00244 getJson(const std::string & key, Watch watch = Watch()) = 0; 00245 00249 typedef std::function<bool (std::string key, Json::Value value)> 00250 OnEntry; 00251 00256 virtual std::vector<std::string> 00257 getChildren(const std::string & key, 00258 Watch watch = Watch()) = 0; 00259 00264 virtual bool forEachEntry(const OnEntry & onEntry, 00265 const std::string & startPrefix = "") const = 0; 00266 00268 virtual void removePath(const std::string & path) = 0; 00269 00273 void dump(std::ostream & stream) const; 00274 00277 Json::Value jsonDump() const; 00278 00279 static std::pair<std::string, std::string> 00280 splitPath(const std::string & path); 00281 }; 00282 00283 00284 /*****************************************************************************/ 00285 /* INTERNAL CONFIGURATION SERVICE */ 00286 /*****************************************************************************/ 00287 00290 struct InternalConfigurationService : public ConfigurationService { 00291 00292 virtual ~InternalConfigurationService() 00293 { 00294 } 00295 00299 virtual Json::Value getJson(const std::string & value, 00300 Watch watch = Watch()); 00301 00303 virtual void set(const std::string & key, 00304 const Json::Value & value); 00305 00310 virtual std::string setUnique(const std::string & key, 00311 const Json::Value & value); 00312 00313 virtual std::vector<std::string> 00314 getChildren(const std::string & key, 00315 Watch watch = Watch()); 00316 00317 virtual bool forEachEntry(const OnEntry & onEntry, 00318 const std::string & startPrefix = "") const; 00319 00321 virtual void removePath(const std::string & path); 00322 00323 private: 00324 struct Entry { 00325 Entry() 00326 : hasValue(false) 00327 { 00328 } 00329 00330 bool hasValue; 00331 Json::Value value; 00332 Watch watch; 00333 std::unordered_map<std::string, std::shared_ptr<Entry> > children; 00334 }; 00335 00336 Entry & createNode(Entry & node, const std::string & key); 00337 const Entry * getNode(const Entry & node, const std::string & key) const; 00338 Entry * getNode(Entry & node, const std::string & key); 00339 00341 Entry root; 00342 00343 typedef std::recursive_mutex Lock; 00344 typedef std::unique_lock<Lock> Guard; 00345 mutable Lock lock; 00346 }; 00347 00348 00349 /*****************************************************************************/ 00350 /* SERVICE PROXIES */ 00351 /*****************************************************************************/ 00352 00353 struct ServiceProxies { 00354 ServiceProxies(); 00355 00356 std::shared_ptr<EventService> events; 00357 std::shared_ptr<ConfigurationService> config; 00358 std::shared_ptr<PortRangeService> ports; 00359 00361 std::shared_ptr<zmq::context_t> zmqContext; 00362 00363 void logToCarbon(std::shared_ptr<CarbonConnector> conn); 00364 void logToCarbon(const std::string & carbonConnection, 00365 const std::string & prefix = ""); 00366 void logToCarbon(const std::vector<std::string> & carbonConnections, 00367 const std::string & prefix = ""); 00368 00369 void useZookeeper(std::string hostname = "localhost:2181", 00370 std::string prefix = "CWD"); 00371 00372 void usePortRanges(const std::string& path); 00373 void usePortRanges(const Json::Value& config); 00374 00375 std::vector<std::string> 00376 getServiceClassInstances(std::string const & name, 00377 std::string const & protocol = "http"); 00378 00379 std::vector<std::string> 00380 getEndpointInstances(std::string const & name, 00381 std::string const & protocol = "http"); 00382 00383 // Bootstrap the proxies services using a json configuration. 00384 void bootstrap(const std::string& path); 00385 void bootstrap(const Json::Value& config); 00386 }; 00387 00388 00389 /*****************************************************************************/ 00390 /* EVENT RECORDER */ 00391 /*****************************************************************************/ 00392 00395 struct EventRecorder { 00396 00397 EventRecorder(const std::string & eventPrefix, 00398 const std::shared_ptr<EventService> & events); 00399 00400 EventRecorder(const std::string & eventPrefix, 00401 const std::shared_ptr<ServiceProxies> & services); 00402 00403 00404 /*************************************************************************/ 00405 /* EVENT RECORDING */ 00406 /*************************************************************************/ 00407 00416 void recordEvent(const char * eventName, 00417 EventType type = ET_COUNT, 00418 float value = 1.0) const 00419 { 00420 EventService * es = 0; 00421 if (events_) 00422 es = events_.get(); 00423 if (!es && services_) 00424 es = services_->events.get(); 00425 if (!es) 00426 { 00427 std::cerr << "no services configured!!!!" << std::endl; 00428 return; 00429 } 00430 es->onEvent(eventPrefix_, eventName, type, value); 00431 } 00432 00433 void recordEventFmt(EventType type, 00434 float value, 00435 const char * fmt, ...) const JML_FORMAT_STRING(4, 5); 00436 00437 template<typename... Args> 00438 void recordHit(const std::string & event, Args... args) const 00439 { 00440 return recordEventFmt(ET_COUNT, 1.0, event.c_str(), 00441 ML::forwardForPrintf(args)...); 00442 } 00443 00444 template<typename... Args> 00445 JML_ALWAYS_INLINE 00446 void recordHit(const char * event, Args... args) const 00447 { 00448 return recordEventFmt(ET_COUNT, 1.0, event, 00449 ML::forwardForPrintf(args)...); 00450 } 00451 00452 void recordHit(const char * event) const 00453 { 00454 recordEvent(event, ET_COUNT, 1.0); 00455 } 00456 00457 void recordHit(const std::string & event) const 00458 { 00459 recordEvent(event.c_str(), ET_COUNT, 1.0); 00460 } 00461 00462 template<typename... Args> 00463 void recordCount(float count, const std::string & event, Args... args) const 00464 { 00465 return recordEventFmt(ET_COUNT, count, event.c_str(), 00466 ML::forwardForPrintf(args)...); 00467 } 00468 00469 template<typename... Args> 00470 JML_ALWAYS_INLINE 00471 void recordCount(float count, const char * event, Args... args) const 00472 { 00473 return recordEventFmt(ET_COUNT, count, event, 00474 ML::forwardForPrintf(args)...); 00475 } 00476 00477 void recordCount(float count, const char * event) const 00478 { 00479 recordEvent(event, ET_COUNT, count); 00480 } 00481 00482 void recordCount(float count, const std::string & event) const 00483 { 00484 recordEvent(event.c_str(), ET_COUNT, count); 00485 } 00486 00487 template<typename... Args> 00488 void recordOutcome(float outcome, const std::string & event, Args... args) const 00489 { 00490 return recordEventmt(ET_OUTCOME, outcome, event.c_str(), 00491 ML::forwardForPrintf(args)...); 00492 } 00493 00494 template<typename... Args> 00495 void recordOutcome(float outcome, const char * event, Args... args) const 00496 { 00497 return recordEventFmt(ET_OUTCOME, outcome, event, 00498 ML::forwardForPrintf(args)...); 00499 } 00500 00501 void recordOutcome(float outcome, const char * event) const 00502 { 00503 recordEvent(event, ET_OUTCOME, outcome); 00504 } 00505 00506 void recordOutcome(float outcome, const std::string & event) const 00507 { 00508 recordEvent(event.c_str(), ET_OUTCOME, outcome); 00509 } 00510 00511 template<typename... Args> 00512 void recordLevel(float level, const std::string & event, Args... args) const 00513 { 00514 return recordEventmt(ET_LEVEL, level, event.c_str(), 00515 ML::forwardForPrintf(args)...); 00516 } 00517 00518 template<typename... Args> 00519 void recordLevel(float level, const char * event, Args... args) const 00520 { 00521 return recordEventFmt(ET_LEVEL, level, event, 00522 ML::forwardForPrintf(args)...); 00523 } 00524 00525 void recordLevel(float level, const char * event) const 00526 { 00527 recordEvent(event, ET_LEVEL, level); 00528 } 00529 00530 void recordLevel(float level, const std::string & event) const 00531 { 00532 recordEvent(event.c_str(), ET_LEVEL, level); 00533 } 00534 00535 protected: 00536 std::string eventPrefix_; 00537 std::shared_ptr<EventService> events_; 00538 std::shared_ptr<ServiceProxies> services_; 00539 }; 00540 00541 00542 /*****************************************************************************/ 00543 /* SERVICE BASE */ 00544 /*****************************************************************************/ 00545 00546 struct ServiceBase: public EventRecorder { 00548 ServiceBase(const std::string & serviceName, 00549 std::shared_ptr<ServiceProxies> 00550 = std::shared_ptr<ServiceProxies>()); 00551 00553 ServiceBase(const std::string & subServiceName, 00554 ServiceBase & parent); 00555 00556 virtual ~ServiceBase(); 00557 00558 void setServices(std::shared_ptr<ServiceProxies> services) 00559 { 00560 services_ = services; 00561 } 00562 00563 std::shared_ptr<ServiceProxies> getServices() const 00564 { 00565 return services_; 00566 } 00567 00568 std::string serviceName() const 00569 { 00570 return serviceName_; 00571 } 00572 00573 /*************************************************************************/ 00574 /* REGISTRATION */ 00575 /*************************************************************************/ 00576 00584 void registerServiceProvider(const std::string & name, 00585 const std::vector<std::string> & serviceClasses); 00586 00589 void unregisterServiceProvider(const std::string & name, 00590 const std::vector<std::string> & serviceClasses); 00591 00592 00593 /*************************************************************************/ 00594 /* ZEROMQ CONTEXT */ 00595 /*************************************************************************/ 00596 00597 std::shared_ptr<zmq::context_t> getZmqContext() const 00598 { 00599 return services_->zmqContext; 00600 } 00601 00602 /*************************************************************************/ 00603 /* EXCEPTION LOGGING */ 00604 /*************************************************************************/ 00605 00606 void logException(std::exception_ptr exc, 00607 const std::string & context) 00608 { 00609 std::cerr << "error: exception in context " << context << std::endl; 00610 std::rethrow_exception(exc); 00611 } 00612 00613 00614 /*************************************************************************/ 00615 /* STATUS */ 00616 /*************************************************************************/ 00617 00622 virtual Json::Value getServiceStatus() const; 00623 00629 virtual void addChildServiceStatus(Json::Value & result) const; 00630 00631 protected: 00632 std::shared_ptr<ServiceProxies> services_; 00633 std::string serviceName_; 00634 ServiceBase * parent_; 00635 std::vector<ServiceBase *> children_; 00636 }; 00637 00638 00639 /*****************************************************************************/ 00640 /* SUB SERVICE BASE */ 00641 /*****************************************************************************/ 00642 00643 struct SubServiceBase : public ServiceBase { 00644 }; 00645 00646 } // namespace Datacratic 00647 00648 00649 #endif /* __service__service_base_h__ */ 00650
1.7.6.1