RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/service_base.cc
00001 /* service_base.cc
00002    Jeremy Barnes, 29 May 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Service base.
00006 */
00007 
00008 #include "service_base.h"
00009 #include <iostream>
00010 #include "soa/service/carbon_connector.h"
00011 #include "zookeeper_configuration_service.h"
00012 #include "jml/arch/demangle.h"
00013 #include "jml/utils/exc_assert.h"
00014 #include "jml/utils/environment.h"
00015 #include "jml/utils/file_functions.h"
00016 #include <unistd.h>
00017 #include <sys/utsname.h>
00018 #include <boost/make_shared.hpp>
00019 #include "zmq.hpp"
00020 #include "soa/jsoncpp/reader.h"
00021 #include "soa/jsoncpp/value.h"
00022 #include <fstream>
00023 #include <sys/utsname.h>
00024 
00025 using namespace std;
00026 
00027 extern const char * __progname;
00028 
00029 namespace Datacratic {
00030 
00031 
00032 /*****************************************************************************/
00033 /* NULL EVENT SERVICE                                                        */
00034 /*****************************************************************************/
00035 
00036 NullEventService::
00037 NullEventService()
00038     : stats(new MultiAggregator())
00039 {
00040 }
00041 
00042 NullEventService::
00043 ~NullEventService()
00044 {
00045 }
00046 
00047 void
00048 NullEventService::
00049 onEvent(const std::string & name,
00050         const char * event,
00051         EventType type,
00052         float value)
00053 {
00054     stats->record(name + "." + event, type, value);
00055 }
00056 
00057 void
00058 NullEventService::
00059 dump(std::ostream & stream) const
00060 {
00061     stats->dumpSync(stream);
00062 }
00063 
00064 
00065 /*****************************************************************************/
00066 /* CARBON EVENT SERVICE                                                      */
00067 /*****************************************************************************/
00068 
00069 CarbonEventService::
00070 CarbonEventService(std::shared_ptr<CarbonConnector> conn) :
00071     connector(conn)
00072 {
00073 }
00074 
00075 CarbonEventService::
00076 CarbonEventService(const std::string & carbonAddress,
00077                    const std::string & prefix)
00078     : connector(new CarbonConnector(carbonAddress, prefix))
00079 {
00080 }
00081 
00082 CarbonEventService::
00083 CarbonEventService(const std::vector<std::string> & carbonAddresses,
00084                    const std::string & prefix)
00085     : connector(new CarbonConnector(carbonAddresses, prefix))
00086 {
00087 }
00088 
00089 void
00090 CarbonEventService::
00091 onEvent(const std::string & name,
00092         const char * event,
00093         EventType type,
00094         float value)
00095 {
00096     if (name.empty()) {
00097         connector->record(event, type, value);
00098     }
00099     else {
00100         size_t l = strlen(event);
00101         string s;
00102         s.reserve(name.length() + l + 2);
00103         s.append(name);
00104         s.push_back('.');
00105         s.append(event, event + l);
00106         connector->record(s, type, value);
00107     }
00108 }
00109 
00110 
00111 /*****************************************************************************/
00112 /* CONFIGURATION SERVICE                                                     */
00113 /*****************************************************************************/
00114 
00115 void
00116 ConfigurationService::
00117 dump(std::ostream & stream) const
00118 {
00119     auto onValue = [&] (string key, const Json::Value & val)
00120         {
00121             stream << key << " --> " << val.toString();
00122             return true;
00123         };
00124 
00125     forEachEntry(onValue, "");
00126 }
00127 
00128 Json::Value
00129 ConfigurationService::
00130 jsonDump() const
00131 {
00132     Json::Value allEntries;
00133 
00134     auto onValue = [&] (string key, const Json::Value & val)
00135         {
00136             allEntries[key] = val;
00137             return true;
00138         };
00139     forEachEntry(onValue, "");
00140 
00141     return allEntries;
00142 }
00143 
00144 std::pair<std::string, std::string>
00145 ConfigurationService::
00146 splitPath(const std::string & path)
00147 {
00148     string::size_type pos = path.find('/');
00149 
00150     string root(path, 0, pos);
00151     string leaf;
00152     if (pos != string::npos)
00153         leaf = string(path, pos + 1);
00154 
00155     return make_pair(root, leaf);
00156 }
00157 
00158 /*****************************************************************************/
00159 /* INTERNAL CONFIGURATION SERVICE                                            */
00160 /*****************************************************************************/
00161 
00162 Json::Value
00163 InternalConfigurationService::
00164 getJson(const std::string & key, Watch watch)
00165 {
00166     Guard guard(lock);
00167 
00168     Entry * entry = getNode(root, key);
00169     if (!entry)
00170         return Json::Value();
00171 
00172     entry->watch = watch;
00173     return entry->value;
00174 }
00175     
00176 void
00177 InternalConfigurationService::
00178 set(const std::string & key,
00179     const Json::Value & value)
00180 {
00181     Guard guard(lock);
00182     Entry & node = createNode(root, key);
00183     node.hasValue = true;
00184     node.value = value;
00185     if (node.watch) {
00186         node.watch.trigger(key, VALUE_CHANGED);
00187         node.watch = Watch();
00188     }
00189 }
00190 
00191 std::string
00192 InternalConfigurationService::
00193 setUnique(const std::string & key_,
00194           const Json::Value & value)
00195 {
00196     Guard guard(lock);
00197 
00198     int r = 0;
00199 
00200     for (;; r = random()) {
00201         string key = key_;
00202         if (r != 0)
00203             key += ":" + to_string(r);
00204         
00205         Entry & node = createNode(root, key);
00206         if (node.hasValue)
00207             continue;
00208         
00209         node.hasValue = true;
00210         node.value = value;
00211         
00212         return key;
00213     }
00214 }
00215 
00216 std::vector<std::string>
00217 InternalConfigurationService::
00218 getChildren(const std::string & key,
00219             Watch watch)
00220 {
00221     Guard guard(lock);
00222 
00223     auto node = getNode(root, key);
00224     vector<string> result;
00225     if (node) {
00226         for (auto & ch: node->children)
00227             result.push_back(ch.first);
00228         node->watch = watch;
00229     }
00230     return result;
00231 }
00232 
00233 bool
00234 InternalConfigurationService::
00235 forEachEntry(const OnEntry & onEntry,
00236              const std::string & startPrefix) const
00237 {
00238     Guard guard(lock);
00239 
00240     std::function<bool (const Entry *, string)> doNode
00241         = [&] (const Entry * node, string prefix)
00242         {
00243             if (!node)
00244                 return true;
00245             
00246             if (node->hasValue)
00247                 if (!onEntry(prefix, node->value))
00248                     return false;
00249 
00250             for (auto ch: node->children)
00251                 if (!doNode(ch.second.get(), prefix + "/" + ch.first))
00252                     return false;
00253             return true;
00254         };
00255 
00256     return doNode(getNode(root, startPrefix), startPrefix);
00257 }
00258 
00259 InternalConfigurationService::Entry &
00260 InternalConfigurationService::
00261 createNode(Entry & node, const std::string & key)
00262 {
00263     if (key == "")
00264         return node;
00265 
00266     string root, leaf;
00267     std::tie(root, leaf) = splitPath(key);
00268 
00269     bool existed = node.children.count(root);
00270 
00271     shared_ptr<Entry> & entryPtr = node.children[root];
00272     if (!entryPtr)
00273         entryPtr.reset(new Entry());
00274     else if (entryPtr->watch && !existed) {
00275         entryPtr->watch.trigger(key, VALUE_CHANGED);
00276         entryPtr->watch = Watch();
00277     }
00278 
00279     return createNode(*entryPtr, leaf);
00280 }
00281 
00282 const InternalConfigurationService::Entry *
00283 InternalConfigurationService::
00284 getNode(const Entry & node, const std::string & key) const
00285 {
00286     if (key == "")
00287         return &node;
00288 
00289     string root, leaf;
00290     std::tie(root, leaf) = splitPath(key);
00291 
00292     auto it = node.children.find(root);
00293     if (it == node.children.end())
00294         return 0;
00295 
00296     ExcAssert(it->second);
00297 
00298     return getNode(*it->second, leaf);
00299 }
00300 
00301 InternalConfigurationService::Entry *
00302 InternalConfigurationService::
00303 getNode(Entry & node, const std::string & key)
00304 {
00305     if (key == "")
00306         return &node;
00307 
00308     string root, leaf;
00309     std::tie(root, leaf) = splitPath(key);
00310 
00311     auto it = node.children.find(root);
00312     if (it == node.children.end())
00313         return 0;
00314 
00315     ExcAssert(it->second);
00316 
00317     return getNode(*it->second, leaf);
00318 }
00319 
00320 void
00321 InternalConfigurationService::
00322 removePath(const std::string & key)
00323 {
00324     string path, leaf;
00325     std::tie(path, leaf) = splitPath(key);
00326 
00327     Guard guard(lock);
00328     Entry * parent = getNode(root, path);
00329     if (!parent)
00330         return;
00331 
00332     // TODO: listeners for onChange, both here and children
00333 
00334     parent->children.erase(leaf);
00335 }
00336 
00337 
00338 /*****************************************************************************/
00339 /* SERVICE PROXIES                                                           */
00340 /*****************************************************************************/
00341 
00342 namespace {
00343 
00344 std::string bootstrapConfigPath()
00345 {
00346     ML::Env_Option<string> env("RTBKIT_BOOTSTRAP", "");
00347     if (!env.get().empty()) return env.get();
00348 
00349     const string cwdPath = "./bootstrap.json";
00350     if (ML::fileExists(cwdPath)) return cwdPath;
00351 
00352     return "";
00353 }
00354 
00355 } // namespace anonymous
00356 
00357 ServiceProxies::
00358 ServiceProxies()
00359     : events(new NullEventService()),
00360       config(new InternalConfigurationService()),
00361       ports(new DefaultPortRangeService()),
00362       zmqContext(new zmq::context_t(1 /* num worker threads */))
00363 {
00364     bootstrap(bootstrapConfigPath());
00365 }
00366 
00367 void
00368 ServiceProxies::
00369 logToCarbon(const std::string & carbonConnection,
00370             const std::string & prefix)
00371 {
00372     events.reset(new CarbonEventService(carbonConnection, prefix));
00373 }
00374 
00375 void
00376 ServiceProxies::
00377 logToCarbon(const std::vector<std::string> & carbonConnections,
00378             const std::string & prefix)
00379 {
00380     events.reset(new CarbonEventService(carbonConnections, prefix));
00381 }
00382 
00383 void
00384 ServiceProxies::
00385 logToCarbon(std::shared_ptr<CarbonConnector> conn)
00386 {
00387     events.reset(new CarbonEventService(conn));
00388 }
00389 
00390 void
00391 ServiceProxies::
00392 useZookeeper(std::string hostname,
00393              std::string prefix)
00394 {
00395     if (prefix == "CWD") {
00396         char buf[1024];
00397         if (!getcwd(buf, 1024))
00398             throw ML::Exception(errno, "getcwd");
00399         string cwd = buf;
00400 
00401         utsname name;
00402         if (uname(&name))
00403             throw ML::Exception(errno, "uname");
00404         string node = name.nodename;
00405         
00406         prefix = "/dev/" + node + cwd + "_" + __progname + "/";
00407     }
00408 
00409     config.reset(new ZookeeperConfigurationService(hostname, prefix));
00410 }
00411 
00412 void
00413 ServiceProxies::
00414 usePortRanges(const std::string& path)
00415 {
00416     ports.reset(new JsonPortRangeService(path));
00417 }
00418 
00419 void
00420 ServiceProxies::
00421 usePortRanges(const Json::Value& config)
00422 {
00423     ports.reset(new JsonPortRangeService(config));
00424 }
00425 
00426 std::vector<std::string>
00427 ServiceProxies::getServiceClassInstances(std::string const & name,
00428                                          std::string const & protocol)
00429 {
00430     std::vector<std::string> result;
00431     if(config) {
00432         std::string root = "serviceClass/" + name;
00433         vector<string> children = config->getChildren(root);
00434         for(auto & item : children) {
00435             Json::Value json = config->getJson(root + "/" + item);
00436             auto items = getEndpointInstances(json["servicePath"].asString(), protocol);
00437             result.insert(result.begin(), items.begin(), items.end());
00438         }
00439     }
00440 
00441     return result;
00442 }
00443 
00444 std::vector<std::string>
00445 ServiceProxies::getEndpointInstances(std::string const & name,
00446                                      std::string const & protocol)
00447 {
00448     std::vector<std::string> result;
00449     if(config) {
00450         std::string path = name + "/" + protocol;
00451         vector<string> children = config->getChildren(path);
00452         for(auto & item : children) {
00453             Json::Value json = config->getJson(path + "/" + item);
00454             for(auto & entry: json) {
00455                 std::string key;
00456                 if(protocol == "http") key = "httpUri";
00457                 if(protocol == "zeromq") key = "zmqConnectUri";
00458 
00459                 if(key.empty() || !entry.isMember(key))
00460                     continue;
00461 
00462                 auto hs = entry["transports"][0]["hostScope"];
00463                 if(!hs)
00464                     continue;
00465 
00466                 string hostScope = hs.asString();
00467                 if(hs != "*") {
00468                     utsname name;
00469                     if(uname(&name))
00470                         throw ML::Exception(errno, "uname");
00471                     if(hostScope != name.nodename)
00472                         continue;
00473                 }
00474 
00475                 result.push_back(entry[key].asString());
00476             }
00477         }
00478     }
00479 
00480     return result;
00481 }
00482 
00483 void
00484 ServiceProxies::
00485 bootstrap(const std::string& path)
00486 {
00487     if (path.empty()) return;
00488     ExcCheck(ML::fileExists(path), path + " doesn't exist");
00489 
00490     ifstream stream(path);
00491     ExcCheckErrno(stream, "Unable to open the Json port range file.");
00492 
00493     string file;
00494     while(stream) {
00495         string line;
00496         getline(stream, line);
00497         file += line + "\n";
00498     }
00499 
00500     bootstrap(Json::parse(file));
00501 }
00502 
00503 void
00504 ServiceProxies::
00505 bootstrap(const Json::Value& config)
00506 {
00507     string install = config["installation"].asString();
00508     ExcCheck(!install.empty(), "installation is not specified in bootstrap.json");
00509 
00510     string node = config["node-name"].asString();
00511     if (node.empty()) {
00512         struct utsname s;
00513         int ret = uname(&s);
00514         ExcCheckErrno(!ret, "Unable to call uname");
00515 
00516         node = string(s.nodename);
00517     }
00518 
00519     if (config.isMember("carbon-uri")) {
00520         const Json::Value& entry = config["carbon-uri"];
00521         vector<string> uris;
00522 
00523         if (entry.isArray()) {
00524             for (size_t j = 0; j < entry.size(); ++j)
00525                 uris.push_back(entry[j].asString());
00526         }
00527         else uris.push_back(entry.asString());
00528 
00529         logToCarbon(uris, install + "." + node);
00530     }
00531 
00532 
00533     if (config.isMember("zookeeper-uri"))
00534         useZookeeper(config["zookeeper-uri"].asString(), install);
00535 
00536     if (config.isMember("portRanges"))
00537         usePortRanges(config["portRanges"]);
00538 }
00539 
00540 /*****************************************************************************/
00541 /* EVENT RECORDER                                                            */
00542 /*****************************************************************************/
00543 
00544 EventRecorder::
00545 EventRecorder(const std::string & eventPrefix,
00546               const std::shared_ptr<EventService> & events)
00547     : eventPrefix_(eventPrefix),
00548       events_(events)
00549 {
00550 }
00551 
00552 EventRecorder::
00553 EventRecorder(const std::string & eventPrefix,
00554               const std::shared_ptr<ServiceProxies> & services)
00555     : eventPrefix_(eventPrefix),
00556       services_(services)
00557 {
00558 }
00559 
00560 void
00561 EventRecorder::
00562 recordEventFmt(EventType type,
00563                float value,
00564                const char * fmt, ...) const
00565 {
00566     if (!events_ && (!services_ || !services_->events))  return;
00567         
00568     char buf[2048];
00569         
00570     va_list ap;
00571     va_start(ap, fmt);
00572     try {
00573         int res = vsnprintf(buf, 2048, fmt, ap);
00574         if (res < 0)
00575             throw ML::Exception("unable to record hit with fmt");
00576         if (res >= 2048)
00577             throw ML::Exception("key is too long");
00578             
00579         recordEvent(buf, type, value);
00580         va_end(ap);
00581         return;
00582     }
00583     catch (...) {
00584         va_end(ap);
00585         throw;
00586     }
00587 }
00588 
00589 
00590 
00591 /*****************************************************************************/
00592 /* SERVICE BASE                                                              */
00593 /*****************************************************************************/
00594 
00595 ServiceBase::
00596 ServiceBase(const std::string & serviceName,
00597             std::shared_ptr<ServiceProxies> services)
00598     : EventRecorder(serviceName, services), 
00599       services_(services), serviceName_(serviceName), parent_(0)
00600 {
00601     if (!services_)
00602         setServices(std::make_shared<ServiceProxies>());
00603 
00604     // Clear out any old entries
00605     getServices()->config->removePath(serviceName);
00606 }
00607 
00608 ServiceBase::
00609 ServiceBase(const std::string & subServiceName,
00610             ServiceBase & parent)
00611     : EventRecorder(parent.serviceName() + "." + subServiceName,
00612                     parent.getServices()),
00613       services_(parent.getServices()),
00614       serviceName_(parent.serviceName() + "." + subServiceName),
00615       parent_(&parent)
00616 {
00617     // Clear out any old entries
00618     getServices()->config->removePath(serviceName());
00619 }
00620 
00621 ServiceBase::
00622 ~ServiceBase()
00623 {
00624     // Clear out our config entries
00625     if (services_)
00626         getServices()->config->removePath(serviceName());
00627 }
00628 
00629 void
00630 ServiceBase::
00631 registerServiceProvider(const std::string & name,
00632                         const std::vector<std::string> & serviceClasses)
00633 {
00634     for (auto cl: serviceClasses) {
00635         Json::Value json;
00636         json["serviceName"] = name;
00637         json["servicePath"] = name;
00638         services_->config->setUnique("serviceClass/" + cl + "/" + name, json);
00639     }
00640 }
00641 
00642 void
00643 ServiceBase::
00644 unregisterServiceProvider(const std::string & name,
00645                           const std::vector<std::string> & serviceClasses)
00646 {
00647     for (auto cl: serviceClasses) {
00648         services_->config->removePath("serviceClass/" + cl + "/" + name);
00649     }
00650 }
00651 
00652 Json::Value
00653 ServiceBase::
00654 getServiceStatus() const
00655 {
00656     Json::Value result;
00657     result["type"] = ML::type_name(*this);
00658     result["status"] = "Type has not implemented getStatus";
00659     addChildServiceStatus(result);
00660     return result;
00661 }
00662 
00663 void
00664 ServiceBase::
00665 addChildServiceStatus(Json::Value & result) const
00666 {
00667     
00668 }
00669 
00670 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator