RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/agent_configuration/agent_configuration_service.cc
00001 /* agent_configuration_service.cc
00002    Jeremy Barnes, 26 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Implementation of the configuration service.
00006 */
00007 
00008 #include "jml/utils/string_functions.h"
00009 #include "agent_configuration_service.h"
00010 #include "soa/service/rest_request_binding.h"
00011 
00012 using namespace std;
00013 using namespace ML;
00014 
00015 namespace RTBKIT {
00016 
00017 
00018 /*****************************************************************************/
00019 /* AGENT CONFIGURATION SERVICE                                               */
00020 /*****************************************************************************/
00021 
00022 AgentConfigurationService::
00023 AgentConfigurationService(std::shared_ptr<ServiceProxies> services,
00024                           const std::string & serviceName)
00025     : RestServiceEndpoint(services->zmqContext), 
00026       ServiceBase(serviceName, services),
00027       agents(services->zmqContext),
00028       listeners(services->zmqContext),
00029       monitorProviderClient(services->zmqContext, *this)
00030 {
00031 }
00032 
00033 void
00034 AgentConfigurationService::
00035 init()
00036 {
00037     getServices()->config->removePath(serviceName());
00038 
00039     registerServiceProvider(serviceName(), { "rtbAgentConfiguration" });
00040     
00041 
00042     //registerService();
00043 
00044     agents.onConnection = [=] (const std::string & agent)
00045         {
00046             cerr << "got new agent " << hexify_string(agent) << endl;
00047         };
00048 
00049     agents.onDisconnection = [=] (const std::string & agent)
00050         {
00051             cerr << "lost agent " << hexify_string(agent) << endl;
00052             // Broadcast the disconnection to all listeners
00053             for (auto & l: listenerInfo)
00054                 listeners.sendMessage(l.first, "CONFIG", agent, "");
00055         };
00056 
00057     listeners.onConnection = [=] (const std::string & listener)
00058         {
00059             cerr << "got new listener " << hexify_string(listener) << endl;
00060 
00061             listenerInfo.insert(make_pair(listener, ListenerInfo()));
00062 
00063             // we got a new listener...
00064             for (auto & a: agentInfo) {
00065                 if (!a.second.config.isNull())
00066                     listeners.sendMessage(listener, "CONFIG", a.first,
00067                                           a.second.config.toString());
00068             }
00069         };
00070 
00071     listeners.onDisconnection = [=] (const std::string & listener)
00072         {
00073             cerr << "lost listener " << hexify_string(listener) << endl;
00074 
00075             listenerInfo.erase(listener);
00076         };
00077 
00078     listeners.clientMessageHandler = [=] (const std::vector<std::string> & message)
00079         {
00080             cerr << "listeners got client message " << message << endl;
00081             throw ML::Exception("unexpected listener message");
00082             //const std::string & agent = message.at(2);
00083             //Json::Value j = Json::parse(message.at(3));
00084 
00085             //handleAgentConfig(agent, j);
00086         };
00087 
00088     agents.clientMessageHandler = [=] (const std::vector<std::string> & message)
00089         {
00090             //cerr << "agent message " << message << endl;
00091             const std::string & agent = message.at(2);
00092             Json::Value j = Json::parse(message.at(3));
00093 
00094             handleAgentConfig(agent, j);
00095         };
00096 
00097     RestServiceEndpoint::init(getServices()->config, serviceName() + "/rest");
00098     listeners.init(getServices()->config, serviceName() + "/listen");
00099     agents.init(getServices()->config, serviceName() + "/agents");
00100 
00101     onHandleRequest = router.requestHandler();
00102 
00103     router.description = "API for the Datacratic Bidding Agent Configuration Service";
00104 
00105     router.addHelpRoute("/", "GET");
00106 
00107     auto & versionNode = router.addSubRouter("/v1", "version 1 of API");
00108     auto & agentsNode
00109         = versionNode.addSubRouter("/agents",
00110                                    "Operations on agents");
00111     
00112     addRouteSyncReturn(agentsNode,
00113                        "/",
00114                        {"GET"},
00115                        "List all agents that are configured",
00116                        "Array of names",
00117                        [] (const std::vector<std::string> & v) { return jsonEncode(v); },
00118                        &AgentConfigurationService::handleGetAgentList,
00119                        this);
00120     
00121     addRouteSyncReturn(agentsNode,
00122                        "/all",
00123                        {"GET"},
00124                        "Return the configuration of the all agents",
00125                        "Dictionary of configuration of all agents",
00126                        [] (const Json::Value & config) { return config; },
00127                        &AgentConfigurationService::handleGetAllAgents,
00128                        this);
00129 
00130 
00131     auto & agent
00132         = agentsNode.addSubRouter(Rx("/([^/]*)", "/<agentName>"),
00133                                   "operations on an individual agent");
00134     
00135     RequestParam<std::string> agentKeyParam(-2, "<agentName>", "agent to operate on");
00136     
00137     addRouteSyncReturn(agent,
00138                        "/config",
00139                        {"GET"},
00140                        "Return the configuration of the given agent",
00141                        "Representation of the named agent",
00142                        [] (const Json::Value & config) { return config; },
00143                        &AgentConfigurationService::handleGetAgent,
00144                        this,
00145                        agentKeyParam);
00146 
00147     addRouteSync(agent,
00148                  "/config",
00149                  {"POST"},
00150                  "Set the configuration of the given agent",
00151                  &AgentConfigurationService::handleAgentConfig,
00152                  this,
00153                  agentKeyParam,
00154                  JsonParam<Json::Value>("", "Configuration block for agent"));
00155 
00156     addRouteSync(agent,
00157                  "/heartbeat",
00158                  {"POST"},
00159                  "Send a heartbeat for the agent",
00160                  &AgentConfigurationService::handleAgentHeartbeat,
00161                  this,
00162                  agentKeyParam);
00163 
00164 
00165     addSource("AgentConfigurationService::agents", agents);
00166     addSource("AgentConfigurationService::listeners", listeners);
00167 
00168     monitorProviderClient.init(getServices()->config);
00169 }
00170 
00171 void
00172 AgentConfigurationService::
00173 bindTcp()
00174 {
00175     RestServiceEndpoint::bindTcp(
00176             getServices()->ports->getRange("agentConfiguration.zmq"),
00177             getServices()->ports->getRange("agentConfiguration.http"));
00178     listeners.bindTcp(getServices()->ports->getRange("configuration"));
00179     agents.bindTcp();
00180 }
00181 
00182 Json::Value
00183 AgentConfigurationService::
00184 handleGetAgent(const std::string & agent) const
00185 {
00186     auto it = agentInfo.find(agent);
00187     if (it == agentInfo.end())
00188         return Json::Value();
00189     return it->second.config;
00190 }
00191 
00192 Json::Value
00193 AgentConfigurationService::
00194 handleGetAllAgents() const
00195 {
00196     Json::Value result;
00197     for (auto & c: agentInfo)
00198         result[c.first] = c.second.config;
00199     return result;
00200 }
00201 
00202 std::vector<std::string>
00203 AgentConfigurationService::
00204 handleGetAgentList() const
00205 {
00206     vector<string> result;
00207     for (auto & c: agentInfo)
00208         result.push_back(c.first);
00209     return result;
00210 }
00211 
00212 void
00213 AgentConfigurationService::
00214 handleAgentConfig(const std::string & agent,
00215                   const Json::Value & config)
00216 {
00217     auto & info = agentInfo[agent];
00218     info.lastHeartbeat = Date::now();
00219     
00220     // If the configuration didn't change, we don't need to broadcast it
00221     if (info.config == config)
00222         return;
00223 
00224     info.config = config;
00225 
00226     // Broadcast the configuration to all listeners
00227     for (auto & l: listenerInfo)
00228         listeners.sendMessage(l.first, "CONFIG", agent, config.toString());
00229 }
00230 
00231 void
00232 AgentConfigurationService::
00233 handleAgentHeartbeat(const std::string & agent)
00234 {
00235 }
00236 
00238 string
00239 AgentConfigurationService::
00240 getProviderName()
00241     const
00242 {
00243     return serviceName();
00244 }
00245 
00246 Json::Value
00247 AgentConfigurationService::
00248 getProviderIndicators()
00249     const
00250 {
00251     Json::Value value;
00252 
00253     /* MB health check:
00254        - no error occurred in last save (implying Redis conn is alive) */
00255     Date now = Date::now();
00256     bool status(true);
00257     value["status"] = status ? "ok" : "failure";
00258 
00259     return value;
00260 }
00261 
00262 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator