RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* monitor.cc 00002 Wolfgang Sourdeau, January 2013 00003 Copyright (c) 2013 Datacratic. All rights reserved. 00004 00005 Main monitor class 00006 */ 00007 00008 #include <boost/algorithm/string/trim.hpp> 00009 #include <jml/arch/exception_handler.h> 00010 00011 #include "soa/service/rest_request_binding.h" 00012 00013 #include "monitor_endpoint.h" 00014 00015 using namespace std; 00016 using namespace ML; 00017 using namespace Datacratic; 00018 using namespace RTBKIT; 00019 00020 00021 /* MONITOR */ 00022 00023 MonitorEndpoint:: 00024 MonitorEndpoint(shared_ptr<ServiceProxies> proxies, 00025 const string & serviceName) 00026 : ServiceBase(serviceName, proxies), 00027 RestServiceEndpoint(proxies->zmqContext), 00028 checkTimeout_(2) 00029 { 00030 } 00031 00032 MonitorEndpoint:: 00033 ~MonitorEndpoint() 00034 { 00035 shutdown(); 00036 } 00037 00038 void 00039 MonitorEndpoint:: 00040 init(const vector<string> & providerNames) 00041 { 00042 addPeriodic("MonitorEndpoint::checkServiceIndicators", 1.0, 00043 bind(&MonitorEndpoint::checkServiceIndicators, this), 00044 true); 00045 00046 providerNames_ = providerNames; 00047 registerServiceProvider(serviceName_, { "monitor" }); 00048 00049 auto config = getServices()->config; 00050 config->removePath(serviceName_); 00051 RestServiceEndpoint::init(config, serviceName_); 00052 00053 /* rest router */ 00054 onHandleRequest = router.requestHandler(); 00055 router.description = "API for the Datacratic Monitor Service"; 00056 router.addHelpRoute("/", "GET"); 00057 00058 auto & versionNode = router.addSubRouter("/v1", "version 1 of API"); 00059 00060 addRouteSyncReturn(versionNode, 00061 "/status", 00062 {"GET"}, 00063 "Return the health status of the system", 00064 "", 00065 [] (bool status) { 00066 Json::Value jsonResponse; 00067 jsonResponse["status"] = status ? "ok" : "failure"; 00068 00069 return jsonResponse; 00070 }, 00071 &MonitorEndpoint::getMonitorStatus, 00072 this); 00073 00074 addRouteSyncReturn(versionNode, 00075 "/service-dump", 00076 {"GET"}, 00077 "Return the service entries from ZooKeeper", 00078 "Dictionary of the the Zookeeper service entries", 00079 [] (Json::Value jsonValue) { 00080 return move(jsonValue); 00081 }, 00082 &ConfigurationService::jsonDump, 00083 &*config); 00084 00085 auto & servicesNode 00086 = versionNode.addSubRouter("/services", 00087 "Operations on service states"); 00088 00089 for (string & providerName: providerNames_) { 00090 auto providerPost = [&,providerName] 00091 (const RestServiceEndpoint::ConnectionId & connection, 00092 const RestRequest & request, 00093 const RestRequestParsingContext & context) { 00094 if (this->postServiceIndicators(providerName, request.payload)) 00095 connection.sendResponse(204, "", "application/json"); 00096 else 00097 connection.sendResponse(403, "Error", "application/json"); 00098 return RestRequestRouter::MR_YES; 00099 }; 00100 00101 servicesNode.addRoute("/" + providerName, "POST", 00102 "service REST url", 00103 providerPost, Json::Value()); 00104 00105 auto & providerStatus = providersStatus_[providerName]; 00106 providerStatus.lastStatus = false; 00107 providerStatus.lastCheck = Date::fromSecondsSinceEpoch(0); 00108 } 00109 } 00110 00111 void 00112 MonitorEndpoint:: 00113 checkServiceIndicators() 00114 const 00115 { 00116 Date now = Date::now(); 00117 00118 for (const auto & it: providersStatus_) { 00119 const MonitorProviderStatus & status = it.second; 00120 if (!status.lastStatus) { 00121 fprintf (stderr, "%s: status of service '%s' is wrong\n", 00122 now.printClassic().c_str(), it.first.c_str()); 00123 } 00124 if (status.lastCheck.plusSeconds((double) checkTimeout_) <= now) { 00125 fprintf (stderr, 00126 "%s: status of service '%s' was last updated at %s\n", 00127 now.printClassic().c_str(), it.first.c_str(), 00128 status.lastCheck.printClassic().c_str()); 00129 } 00130 } 00131 } 00132 00133 bool 00134 MonitorEndpoint:: 00135 getMonitorStatus() 00136 const 00137 { 00138 bool monitorStatus(true); 00139 Date now = Date::now(); 00140 00141 for (const auto & it: providersStatus_) { 00142 const MonitorProviderStatus & status = it.second; 00143 if (!status.lastStatus) { 00144 monitorStatus = false; 00145 } 00146 if (status.lastCheck.plusSeconds((double) checkTimeout_) <= now) { 00147 monitorStatus = false; 00148 } 00149 } 00150 00151 return monitorStatus; 00152 } 00153 00154 bool 00155 MonitorEndpoint:: 00156 postServiceIndicators(const string & providerName, 00157 const string & indicatorsStr) 00158 { 00159 bool rc(false); 00160 bool status; 00161 00162 ML::Set_Trace_Exceptions notrace(false); 00163 try { 00164 Json::Value indicators = Json::parse(indicatorsStr); 00165 if (indicators.type() == Json::objectValue) { 00166 status = (indicators.isMember("status") 00167 && indicators["status"] == "ok"); 00168 rc = true; 00169 } 00170 } 00171 catch (...) { 00172 cerr << "exception during parsing of (supposedly) json response: " 00173 << indicatorsStr << endl; 00174 } 00175 00176 if (rc) { 00177 auto & providerStatus = providersStatus_[providerName]; 00178 providerStatus.lastStatus = status; 00179 providerStatus.lastCheck = Date::now(); 00180 } 00181 00182 return rc; 00183 }