RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* agent_configuration_service.cc 00002 Jeremy Barnes, 26 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Implementation of the configuration service. 00006 */ 00007 00008 #include "jml/utils/string_functions.h" 00009 #include "agent_configuration_service.h" 00010 #include "soa/service/rest_request_binding.h" 00011 00012 using namespace std; 00013 using namespace ML; 00014 00015 namespace RTBKIT { 00016 00017 00018 /*****************************************************************************/ 00019 /* AGENT CONFIGURATION SERVICE */ 00020 /*****************************************************************************/ 00021 00022 AgentConfigurationService:: 00023 AgentConfigurationService(std::shared_ptr<ServiceProxies> services, 00024 const std::string & serviceName) 00025 : RestServiceEndpoint(services->zmqContext), 00026 ServiceBase(serviceName, services), 00027 agents(services->zmqContext), 00028 listeners(services->zmqContext), 00029 monitorProviderClient(services->zmqContext, *this) 00030 { 00031 } 00032 00033 void 00034 AgentConfigurationService:: 00035 init() 00036 { 00037 getServices()->config->removePath(serviceName()); 00038 00039 registerServiceProvider(serviceName(), { "rtbAgentConfiguration" }); 00040 00041 00042 //registerService(); 00043 00044 agents.onConnection = [=] (const std::string & agent) 00045 { 00046 cerr << "got new agent " << hexify_string(agent) << endl; 00047 }; 00048 00049 agents.onDisconnection = [=] (const std::string & agent) 00050 { 00051 cerr << "lost agent " << hexify_string(agent) << endl; 00052 // Broadcast the disconnection to all listeners 00053 for (auto & l: listenerInfo) 00054 listeners.sendMessage(l.first, "CONFIG", agent, ""); 00055 }; 00056 00057 listeners.onConnection = [=] (const std::string & listener) 00058 { 00059 cerr << "got new listener " << hexify_string(listener) << endl; 00060 00061 listenerInfo.insert(make_pair(listener, ListenerInfo())); 00062 00063 // we got a new listener... 00064 for (auto & a: agentInfo) { 00065 if (!a.second.config.isNull()) 00066 listeners.sendMessage(listener, "CONFIG", a.first, 00067 a.second.config.toString()); 00068 } 00069 }; 00070 00071 listeners.onDisconnection = [=] (const std::string & listener) 00072 { 00073 cerr << "lost listener " << hexify_string(listener) << endl; 00074 00075 listenerInfo.erase(listener); 00076 }; 00077 00078 listeners.clientMessageHandler = [=] (const std::vector<std::string> & message) 00079 { 00080 cerr << "listeners got client message " << message << endl; 00081 throw ML::Exception("unexpected listener message"); 00082 //const std::string & agent = message.at(2); 00083 //Json::Value j = Json::parse(message.at(3)); 00084 00085 //handleAgentConfig(agent, j); 00086 }; 00087 00088 agents.clientMessageHandler = [=] (const std::vector<std::string> & message) 00089 { 00090 //cerr << "agent message " << message << endl; 00091 const std::string & agent = message.at(2); 00092 Json::Value j = Json::parse(message.at(3)); 00093 00094 handleAgentConfig(agent, j); 00095 }; 00096 00097 RestServiceEndpoint::init(getServices()->config, serviceName() + "/rest"); 00098 listeners.init(getServices()->config, serviceName() + "/listen"); 00099 agents.init(getServices()->config, serviceName() + "/agents"); 00100 00101 onHandleRequest = router.requestHandler(); 00102 00103 router.description = "API for the Datacratic Bidding Agent Configuration Service"; 00104 00105 router.addHelpRoute("/", "GET"); 00106 00107 auto & versionNode = router.addSubRouter("/v1", "version 1 of API"); 00108 auto & agentsNode 00109 = versionNode.addSubRouter("/agents", 00110 "Operations on agents"); 00111 00112 addRouteSyncReturn(agentsNode, 00113 "/", 00114 {"GET"}, 00115 "List all agents that are configured", 00116 "Array of names", 00117 [] (const std::vector<std::string> & v) { return jsonEncode(v); }, 00118 &AgentConfigurationService::handleGetAgentList, 00119 this); 00120 00121 addRouteSyncReturn(agentsNode, 00122 "/all", 00123 {"GET"}, 00124 "Return the configuration of the all agents", 00125 "Dictionary of configuration of all agents", 00126 [] (const Json::Value & config) { return config; }, 00127 &AgentConfigurationService::handleGetAllAgents, 00128 this); 00129 00130 00131 auto & agent 00132 = agentsNode.addSubRouter(Rx("/([^/]*)", "/<agentName>"), 00133 "operations on an individual agent"); 00134 00135 RequestParam<std::string> agentKeyParam(-2, "<agentName>", "agent to operate on"); 00136 00137 addRouteSyncReturn(agent, 00138 "/config", 00139 {"GET"}, 00140 "Return the configuration of the given agent", 00141 "Representation of the named agent", 00142 [] (const Json::Value & config) { return config; }, 00143 &AgentConfigurationService::handleGetAgent, 00144 this, 00145 agentKeyParam); 00146 00147 addRouteSync(agent, 00148 "/config", 00149 {"POST"}, 00150 "Set the configuration of the given agent", 00151 &AgentConfigurationService::handleAgentConfig, 00152 this, 00153 agentKeyParam, 00154 JsonParam<Json::Value>("", "Configuration block for agent")); 00155 00156 addRouteSync(agent, 00157 "/heartbeat", 00158 {"POST"}, 00159 "Send a heartbeat for the agent", 00160 &AgentConfigurationService::handleAgentHeartbeat, 00161 this, 00162 agentKeyParam); 00163 00164 00165 addSource("AgentConfigurationService::agents", agents); 00166 addSource("AgentConfigurationService::listeners", listeners); 00167 00168 monitorProviderClient.init(getServices()->config); 00169 } 00170 00171 void 00172 AgentConfigurationService:: 00173 bindTcp() 00174 { 00175 RestServiceEndpoint::bindTcp( 00176 getServices()->ports->getRange("agentConfiguration.zmq"), 00177 getServices()->ports->getRange("agentConfiguration.http")); 00178 listeners.bindTcp(getServices()->ports->getRange("configuration")); 00179 agents.bindTcp(); 00180 } 00181 00182 Json::Value 00183 AgentConfigurationService:: 00184 handleGetAgent(const std::string & agent) const 00185 { 00186 auto it = agentInfo.find(agent); 00187 if (it == agentInfo.end()) 00188 return Json::Value(); 00189 return it->second.config; 00190 } 00191 00192 Json::Value 00193 AgentConfigurationService:: 00194 handleGetAllAgents() const 00195 { 00196 Json::Value result; 00197 for (auto & c: agentInfo) 00198 result[c.first] = c.second.config; 00199 return result; 00200 } 00201 00202 std::vector<std::string> 00203 AgentConfigurationService:: 00204 handleGetAgentList() const 00205 { 00206 vector<string> result; 00207 for (auto & c: agentInfo) 00208 result.push_back(c.first); 00209 return result; 00210 } 00211 00212 void 00213 AgentConfigurationService:: 00214 handleAgentConfig(const std::string & agent, 00215 const Json::Value & config) 00216 { 00217 auto & info = agentInfo[agent]; 00218 info.lastHeartbeat = Date::now(); 00219 00220 // If the configuration didn't change, we don't need to broadcast it 00221 if (info.config == config) 00222 return; 00223 00224 info.config = config; 00225 00226 // Broadcast the configuration to all listeners 00227 for (auto & l: listenerInfo) 00228 listeners.sendMessage(l.first, "CONFIG", agent, config.toString()); 00229 } 00230 00231 void 00232 AgentConfigurationService:: 00233 handleAgentHeartbeat(const std::string & agent) 00234 { 00235 } 00236 00238 string 00239 AgentConfigurationService:: 00240 getProviderName() 00241 const 00242 { 00243 return serviceName(); 00244 } 00245 00246 Json::Value 00247 AgentConfigurationService:: 00248 getProviderIndicators() 00249 const 00250 { 00251 Json::Value value; 00252 00253 /* MB health check: 00254 - no error occurred in last save (implying Redis conn is alive) */ 00255 Date now = Date::now(); 00256 bool status(true); 00257 value["status"] = status ? "ok" : "failure"; 00258 00259 return value; 00260 } 00261 00262 } // namespace RTBKIT