RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/agent_configuration/agent_configuration_listener.h
00001 /* configuration_listener.h                                        -*- C++ -*-
00002    Jeremy Barnes, 26 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Agent that listens for configuration.
00006 */
00007 
00008 #pragma once
00009 
00010 #include "soa/service/zmq_endpoint.h"
00011 #include "rtbkit/core/router/router_types.h"
00012 #include "soa/gc/rcu_protected.h"
00013 
00014 
00015 namespace RTBKIT {
00016 
00017 
00018 /*****************************************************************************/
00019 /* AGENT CONFIG ENTRY                                                        */
00020 /*****************************************************************************/
00021 
00023 struct AgentConfigEntry {
00024     std::string name;
00025     std::shared_ptr<const AgentConfig> config;
00026 
00027     bool valid() const { return !!config; }
00028 
00030     Json::Value toJson() const;
00031 };
00032 
00033 
00038 struct AllAgentConfig : public std::vector<AgentConfigEntry> {
00039     std::unordered_map<std::string, int> agentIndex;
00040     std::unordered_map<AccountKey, std::vector<int> > accountIndex;
00041 };
00042 
00043 
00044 /*****************************************************************************/
00045 /* AGENT CONFIGURATION LISTENER                                              */
00046 /*****************************************************************************/
00047 
00048 struct AgentConfigurationListener: public MessageLoop {
00049 
00050     AgentConfigurationListener(std::shared_ptr<zmq::context_t> context)
00051         : allAgents(new AllAgentConfig()), configEndpoint(context)
00052     {
00053     }
00054 
00055     ~AgentConfigurationListener()
00056     {
00057         shutdown();
00058     }
00059 
00063     typedef std::function<void (std::string, std::shared_ptr<const AgentConfig>)>
00064     OnConfigChange;
00065 
00066     OnConfigChange onConfigChange;
00067 
00068     void init(std::shared_ptr<ConfigurationService> config)
00069     {
00070         configEndpoint.init(config);
00071         configEndpoint.connectToServiceClass("rtbAgentConfiguration",
00072                                              "listen");
00073         configEndpoint.messageHandler
00074             = [=] (const std::vector<std::string> & message)
00075             {
00076                 if (message.size() > 1) {
00077                     using namespace std;
00078                     cerr << "got configuration message " << message[1] << endl;
00079                 }
00080                 this->onMessage(message);
00081             };
00082         addSource("AgentConfigurationListener::configEndpoint", configEndpoint);
00083     }
00084 
00085     void start()
00086     {
00087         MessageLoop::start();
00088     }
00089 
00090     void shutdown()
00091     {
00092         MessageLoop::shutdown();
00093         configEndpoint.shutdown();
00094     }
00095 
00096     /*************************************************************************/
00097     /* AGENT INTERACTIONS                                                    */
00098     /*************************************************************************/
00099 
00100     typedef std::function<void (const AgentConfigEntry & info)> OnAgentFn;
00102     void forEachAgent(const OnAgentFn & onAgent) const;
00103 
00107     void forEachAccountAgent(const AccountKey & account,
00108                              const OnAgentFn & onAgent) const;
00109 
00113     AgentConfigEntry getAgentEntry(const std::string & agent) const;
00114 
00115 private:
00116     void onMessage(const std::vector<std::string> & message);
00117 
00118     AllAgentConfig * allAgents;
00119     mutable GcLock allAgentsGc;
00120 
00121     ZmqNamedClientBusProxy configEndpoint;
00122 };
00123 
00124 
00125 } // namespace RTBKIT
00126 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator