RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* service_base.cc 00002 Jeremy Barnes, 29 May 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Service base. 00006 */ 00007 00008 #include "service_base.h" 00009 #include <iostream> 00010 #include "soa/service/carbon_connector.h" 00011 #include "zookeeper_configuration_service.h" 00012 #include "jml/arch/demangle.h" 00013 #include "jml/utils/exc_assert.h" 00014 #include "jml/utils/environment.h" 00015 #include "jml/utils/file_functions.h" 00016 #include <unistd.h> 00017 #include <sys/utsname.h> 00018 #include <boost/make_shared.hpp> 00019 #include "zmq.hpp" 00020 #include "soa/jsoncpp/reader.h" 00021 #include "soa/jsoncpp/value.h" 00022 #include <fstream> 00023 #include <sys/utsname.h> 00024 00025 using namespace std; 00026 00027 extern const char * __progname; 00028 00029 namespace Datacratic { 00030 00031 00032 /*****************************************************************************/ 00033 /* NULL EVENT SERVICE */ 00034 /*****************************************************************************/ 00035 00036 NullEventService:: 00037 NullEventService() 00038 : stats(new MultiAggregator()) 00039 { 00040 } 00041 00042 NullEventService:: 00043 ~NullEventService() 00044 { 00045 } 00046 00047 void 00048 NullEventService:: 00049 onEvent(const std::string & name, 00050 const char * event, 00051 EventType type, 00052 float value) 00053 { 00054 stats->record(name + "." + event, type, value); 00055 } 00056 00057 void 00058 NullEventService:: 00059 dump(std::ostream & stream) const 00060 { 00061 stats->dumpSync(stream); 00062 } 00063 00064 00065 /*****************************************************************************/ 00066 /* CARBON EVENT SERVICE */ 00067 /*****************************************************************************/ 00068 00069 CarbonEventService:: 00070 CarbonEventService(std::shared_ptr<CarbonConnector> conn) : 00071 connector(conn) 00072 { 00073 } 00074 00075 CarbonEventService:: 00076 CarbonEventService(const std::string & carbonAddress, 00077 const std::string & prefix) 00078 : connector(new CarbonConnector(carbonAddress, prefix)) 00079 { 00080 } 00081 00082 CarbonEventService:: 00083 CarbonEventService(const std::vector<std::string> & carbonAddresses, 00084 const std::string & prefix) 00085 : connector(new CarbonConnector(carbonAddresses, prefix)) 00086 { 00087 } 00088 00089 void 00090 CarbonEventService:: 00091 onEvent(const std::string & name, 00092 const char * event, 00093 EventType type, 00094 float value) 00095 { 00096 if (name.empty()) { 00097 connector->record(event, type, value); 00098 } 00099 else { 00100 size_t l = strlen(event); 00101 string s; 00102 s.reserve(name.length() + l + 2); 00103 s.append(name); 00104 s.push_back('.'); 00105 s.append(event, event + l); 00106 connector->record(s, type, value); 00107 } 00108 } 00109 00110 00111 /*****************************************************************************/ 00112 /* CONFIGURATION SERVICE */ 00113 /*****************************************************************************/ 00114 00115 void 00116 ConfigurationService:: 00117 dump(std::ostream & stream) const 00118 { 00119 auto onValue = [&] (string key, const Json::Value & val) 00120 { 00121 stream << key << " --> " << val.toString(); 00122 return true; 00123 }; 00124 00125 forEachEntry(onValue, ""); 00126 } 00127 00128 Json::Value 00129 ConfigurationService:: 00130 jsonDump() const 00131 { 00132 Json::Value allEntries; 00133 00134 auto onValue = [&] (string key, const Json::Value & val) 00135 { 00136 allEntries[key] = val; 00137 return true; 00138 }; 00139 forEachEntry(onValue, ""); 00140 00141 return allEntries; 00142 } 00143 00144 std::pair<std::string, std::string> 00145 ConfigurationService:: 00146 splitPath(const std::string & path) 00147 { 00148 string::size_type pos = path.find('/'); 00149 00150 string root(path, 0, pos); 00151 string leaf; 00152 if (pos != string::npos) 00153 leaf = string(path, pos + 1); 00154 00155 return make_pair(root, leaf); 00156 } 00157 00158 /*****************************************************************************/ 00159 /* INTERNAL CONFIGURATION SERVICE */ 00160 /*****************************************************************************/ 00161 00162 Json::Value 00163 InternalConfigurationService:: 00164 getJson(const std::string & key, Watch watch) 00165 { 00166 Guard guard(lock); 00167 00168 Entry * entry = getNode(root, key); 00169 if (!entry) 00170 return Json::Value(); 00171 00172 entry->watch = watch; 00173 return entry->value; 00174 } 00175 00176 void 00177 InternalConfigurationService:: 00178 set(const std::string & key, 00179 const Json::Value & value) 00180 { 00181 Guard guard(lock); 00182 Entry & node = createNode(root, key); 00183 node.hasValue = true; 00184 node.value = value; 00185 if (node.watch) { 00186 node.watch.trigger(key, VALUE_CHANGED); 00187 node.watch = Watch(); 00188 } 00189 } 00190 00191 std::string 00192 InternalConfigurationService:: 00193 setUnique(const std::string & key_, 00194 const Json::Value & value) 00195 { 00196 Guard guard(lock); 00197 00198 int r = 0; 00199 00200 for (;; r = random()) { 00201 string key = key_; 00202 if (r != 0) 00203 key += ":" + to_string(r); 00204 00205 Entry & node = createNode(root, key); 00206 if (node.hasValue) 00207 continue; 00208 00209 node.hasValue = true; 00210 node.value = value; 00211 00212 return key; 00213 } 00214 } 00215 00216 std::vector<std::string> 00217 InternalConfigurationService:: 00218 getChildren(const std::string & key, 00219 Watch watch) 00220 { 00221 Guard guard(lock); 00222 00223 auto node = getNode(root, key); 00224 vector<string> result; 00225 if (node) { 00226 for (auto & ch: node->children) 00227 result.push_back(ch.first); 00228 node->watch = watch; 00229 } 00230 return result; 00231 } 00232 00233 bool 00234 InternalConfigurationService:: 00235 forEachEntry(const OnEntry & onEntry, 00236 const std::string & startPrefix) const 00237 { 00238 Guard guard(lock); 00239 00240 std::function<bool (const Entry *, string)> doNode 00241 = [&] (const Entry * node, string prefix) 00242 { 00243 if (!node) 00244 return true; 00245 00246 if (node->hasValue) 00247 if (!onEntry(prefix, node->value)) 00248 return false; 00249 00250 for (auto ch: node->children) 00251 if (!doNode(ch.second.get(), prefix + "/" + ch.first)) 00252 return false; 00253 return true; 00254 }; 00255 00256 return doNode(getNode(root, startPrefix), startPrefix); 00257 } 00258 00259 InternalConfigurationService::Entry & 00260 InternalConfigurationService:: 00261 createNode(Entry & node, const std::string & key) 00262 { 00263 if (key == "") 00264 return node; 00265 00266 string root, leaf; 00267 std::tie(root, leaf) = splitPath(key); 00268 00269 bool existed = node.children.count(root); 00270 00271 shared_ptr<Entry> & entryPtr = node.children[root]; 00272 if (!entryPtr) 00273 entryPtr.reset(new Entry()); 00274 else if (entryPtr->watch && !existed) { 00275 entryPtr->watch.trigger(key, VALUE_CHANGED); 00276 entryPtr->watch = Watch(); 00277 } 00278 00279 return createNode(*entryPtr, leaf); 00280 } 00281 00282 const InternalConfigurationService::Entry * 00283 InternalConfigurationService:: 00284 getNode(const Entry & node, const std::string & key) const 00285 { 00286 if (key == "") 00287 return &node; 00288 00289 string root, leaf; 00290 std::tie(root, leaf) = splitPath(key); 00291 00292 auto it = node.children.find(root); 00293 if (it == node.children.end()) 00294 return 0; 00295 00296 ExcAssert(it->second); 00297 00298 return getNode(*it->second, leaf); 00299 } 00300 00301 InternalConfigurationService::Entry * 00302 InternalConfigurationService:: 00303 getNode(Entry & node, const std::string & key) 00304 { 00305 if (key == "") 00306 return &node; 00307 00308 string root, leaf; 00309 std::tie(root, leaf) = splitPath(key); 00310 00311 auto it = node.children.find(root); 00312 if (it == node.children.end()) 00313 return 0; 00314 00315 ExcAssert(it->second); 00316 00317 return getNode(*it->second, leaf); 00318 } 00319 00320 void 00321 InternalConfigurationService:: 00322 removePath(const std::string & key) 00323 { 00324 string path, leaf; 00325 std::tie(path, leaf) = splitPath(key); 00326 00327 Guard guard(lock); 00328 Entry * parent = getNode(root, path); 00329 if (!parent) 00330 return; 00331 00332 // TODO: listeners for onChange, both here and children 00333 00334 parent->children.erase(leaf); 00335 } 00336 00337 00338 /*****************************************************************************/ 00339 /* SERVICE PROXIES */ 00340 /*****************************************************************************/ 00341 00342 namespace { 00343 00344 std::string bootstrapConfigPath() 00345 { 00346 ML::Env_Option<string> env("RTBKIT_BOOTSTRAP", ""); 00347 if (!env.get().empty()) return env.get(); 00348 00349 const string cwdPath = "./bootstrap.json"; 00350 if (ML::fileExists(cwdPath)) return cwdPath; 00351 00352 return ""; 00353 } 00354 00355 } // namespace anonymous 00356 00357 ServiceProxies:: 00358 ServiceProxies() 00359 : events(new NullEventService()), 00360 config(new InternalConfigurationService()), 00361 ports(new DefaultPortRangeService()), 00362 zmqContext(new zmq::context_t(1 /* num worker threads */)) 00363 { 00364 bootstrap(bootstrapConfigPath()); 00365 } 00366 00367 void 00368 ServiceProxies:: 00369 logToCarbon(const std::string & carbonConnection, 00370 const std::string & prefix) 00371 { 00372 events.reset(new CarbonEventService(carbonConnection, prefix)); 00373 } 00374 00375 void 00376 ServiceProxies:: 00377 logToCarbon(const std::vector<std::string> & carbonConnections, 00378 const std::string & prefix) 00379 { 00380 events.reset(new CarbonEventService(carbonConnections, prefix)); 00381 } 00382 00383 void 00384 ServiceProxies:: 00385 logToCarbon(std::shared_ptr<CarbonConnector> conn) 00386 { 00387 events.reset(new CarbonEventService(conn)); 00388 } 00389 00390 void 00391 ServiceProxies:: 00392 useZookeeper(std::string hostname, 00393 std::string prefix) 00394 { 00395 if (prefix == "CWD") { 00396 char buf[1024]; 00397 if (!getcwd(buf, 1024)) 00398 throw ML::Exception(errno, "getcwd"); 00399 string cwd = buf; 00400 00401 utsname name; 00402 if (uname(&name)) 00403 throw ML::Exception(errno, "uname"); 00404 string node = name.nodename; 00405 00406 prefix = "/dev/" + node + cwd + "_" + __progname + "/"; 00407 } 00408 00409 config.reset(new ZookeeperConfigurationService(hostname, prefix)); 00410 } 00411 00412 void 00413 ServiceProxies:: 00414 usePortRanges(const std::string& path) 00415 { 00416 ports.reset(new JsonPortRangeService(path)); 00417 } 00418 00419 void 00420 ServiceProxies:: 00421 usePortRanges(const Json::Value& config) 00422 { 00423 ports.reset(new JsonPortRangeService(config)); 00424 } 00425 00426 std::vector<std::string> 00427 ServiceProxies::getServiceClassInstances(std::string const & name, 00428 std::string const & protocol) 00429 { 00430 std::vector<std::string> result; 00431 if(config) { 00432 std::string root = "serviceClass/" + name; 00433 vector<string> children = config->getChildren(root); 00434 for(auto & item : children) { 00435 Json::Value json = config->getJson(root + "/" + item); 00436 auto items = getEndpointInstances(json["servicePath"].asString(), protocol); 00437 result.insert(result.begin(), items.begin(), items.end()); 00438 } 00439 } 00440 00441 return result; 00442 } 00443 00444 std::vector<std::string> 00445 ServiceProxies::getEndpointInstances(std::string const & name, 00446 std::string const & protocol) 00447 { 00448 std::vector<std::string> result; 00449 if(config) { 00450 std::string path = name + "/" + protocol; 00451 vector<string> children = config->getChildren(path); 00452 for(auto & item : children) { 00453 Json::Value json = config->getJson(path + "/" + item); 00454 for(auto & entry: json) { 00455 std::string key; 00456 if(protocol == "http") key = "httpUri"; 00457 if(protocol == "zeromq") key = "zmqConnectUri"; 00458 00459 if(key.empty() || !entry.isMember(key)) 00460 continue; 00461 00462 auto hs = entry["transports"][0]["hostScope"]; 00463 if(!hs) 00464 continue; 00465 00466 string hostScope = hs.asString(); 00467 if(hs != "*") { 00468 utsname name; 00469 if(uname(&name)) 00470 throw ML::Exception(errno, "uname"); 00471 if(hostScope != name.nodename) 00472 continue; 00473 } 00474 00475 result.push_back(entry[key].asString()); 00476 } 00477 } 00478 } 00479 00480 return result; 00481 } 00482 00483 void 00484 ServiceProxies:: 00485 bootstrap(const std::string& path) 00486 { 00487 if (path.empty()) return; 00488 ExcCheck(ML::fileExists(path), path + " doesn't exist"); 00489 00490 ifstream stream(path); 00491 ExcCheckErrno(stream, "Unable to open the Json port range file."); 00492 00493 string file; 00494 while(stream) { 00495 string line; 00496 getline(stream, line); 00497 file += line + "\n"; 00498 } 00499 00500 bootstrap(Json::parse(file)); 00501 } 00502 00503 void 00504 ServiceProxies:: 00505 bootstrap(const Json::Value& config) 00506 { 00507 string install = config["installation"].asString(); 00508 ExcCheck(!install.empty(), "installation is not specified in bootstrap.json"); 00509 00510 string node = config["node-name"].asString(); 00511 if (node.empty()) { 00512 struct utsname s; 00513 int ret = uname(&s); 00514 ExcCheckErrno(!ret, "Unable to call uname"); 00515 00516 node = string(s.nodename); 00517 } 00518 00519 if (config.isMember("carbon-uri")) { 00520 const Json::Value& entry = config["carbon-uri"]; 00521 vector<string> uris; 00522 00523 if (entry.isArray()) { 00524 for (size_t j = 0; j < entry.size(); ++j) 00525 uris.push_back(entry[j].asString()); 00526 } 00527 else uris.push_back(entry.asString()); 00528 00529 logToCarbon(uris, install + "." + node); 00530 } 00531 00532 00533 if (config.isMember("zookeeper-uri")) 00534 useZookeeper(config["zookeeper-uri"].asString(), install); 00535 00536 if (config.isMember("portRanges")) 00537 usePortRanges(config["portRanges"]); 00538 } 00539 00540 /*****************************************************************************/ 00541 /* EVENT RECORDER */ 00542 /*****************************************************************************/ 00543 00544 EventRecorder:: 00545 EventRecorder(const std::string & eventPrefix, 00546 const std::shared_ptr<EventService> & events) 00547 : eventPrefix_(eventPrefix), 00548 events_(events) 00549 { 00550 } 00551 00552 EventRecorder:: 00553 EventRecorder(const std::string & eventPrefix, 00554 const std::shared_ptr<ServiceProxies> & services) 00555 : eventPrefix_(eventPrefix), 00556 services_(services) 00557 { 00558 } 00559 00560 void 00561 EventRecorder:: 00562 recordEventFmt(EventType type, 00563 float value, 00564 const char * fmt, ...) const 00565 { 00566 if (!events_ && (!services_ || !services_->events)) return; 00567 00568 char buf[2048]; 00569 00570 va_list ap; 00571 va_start(ap, fmt); 00572 try { 00573 int res = vsnprintf(buf, 2048, fmt, ap); 00574 if (res < 0) 00575 throw ML::Exception("unable to record hit with fmt"); 00576 if (res >= 2048) 00577 throw ML::Exception("key is too long"); 00578 00579 recordEvent(buf, type, value); 00580 va_end(ap); 00581 return; 00582 } 00583 catch (...) { 00584 va_end(ap); 00585 throw; 00586 } 00587 } 00588 00589 00590 00591 /*****************************************************************************/ 00592 /* SERVICE BASE */ 00593 /*****************************************************************************/ 00594 00595 ServiceBase:: 00596 ServiceBase(const std::string & serviceName, 00597 std::shared_ptr<ServiceProxies> services) 00598 : EventRecorder(serviceName, services), 00599 services_(services), serviceName_(serviceName), parent_(0) 00600 { 00601 if (!services_) 00602 setServices(std::make_shared<ServiceProxies>()); 00603 00604 // Clear out any old entries 00605 getServices()->config->removePath(serviceName); 00606 } 00607 00608 ServiceBase:: 00609 ServiceBase(const std::string & subServiceName, 00610 ServiceBase & parent) 00611 : EventRecorder(parent.serviceName() + "." + subServiceName, 00612 parent.getServices()), 00613 services_(parent.getServices()), 00614 serviceName_(parent.serviceName() + "." + subServiceName), 00615 parent_(&parent) 00616 { 00617 // Clear out any old entries 00618 getServices()->config->removePath(serviceName()); 00619 } 00620 00621 ServiceBase:: 00622 ~ServiceBase() 00623 { 00624 // Clear out our config entries 00625 if (services_) 00626 getServices()->config->removePath(serviceName()); 00627 } 00628 00629 void 00630 ServiceBase:: 00631 registerServiceProvider(const std::string & name, 00632 const std::vector<std::string> & serviceClasses) 00633 { 00634 for (auto cl: serviceClasses) { 00635 Json::Value json; 00636 json["serviceName"] = name; 00637 json["servicePath"] = name; 00638 services_->config->setUnique("serviceClass/" + cl + "/" + name, json); 00639 } 00640 } 00641 00642 void 00643 ServiceBase:: 00644 unregisterServiceProvider(const std::string & name, 00645 const std::vector<std::string> & serviceClasses) 00646 { 00647 for (auto cl: serviceClasses) { 00648 services_->config->removePath("serviceClass/" + cl + "/" + name); 00649 } 00650 } 00651 00652 Json::Value 00653 ServiceBase:: 00654 getServiceStatus() const 00655 { 00656 Json::Value result; 00657 result["type"] = ML::type_name(*this); 00658 result["status"] = "Type has not implemented getStatus"; 00659 addChildServiceStatus(result); 00660 return result; 00661 } 00662 00663 void 00664 ServiceBase:: 00665 addChildServiceStatus(Json::Value & result) const 00666 { 00667 00668 } 00669 00670 } // namespace Datacratic