RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* configuration_listener.h -*- C++ -*- 00002 Jeremy Barnes, 26 November 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Agent that listens for configuration. 00006 */ 00007 00008 #pragma once 00009 00010 #include "soa/service/zmq_endpoint.h" 00011 #include "rtbkit/core/router/router_types.h" 00012 #include "soa/gc/rcu_protected.h" 00013 00014 00015 namespace RTBKIT { 00016 00017 00018 /*****************************************************************************/ 00019 /* AGENT CONFIG ENTRY */ 00020 /*****************************************************************************/ 00021 00023 struct AgentConfigEntry { 00024 std::string name; 00025 std::shared_ptr<const AgentConfig> config; 00026 00027 bool valid() const { return !!config; } 00028 00030 Json::Value toJson() const; 00031 }; 00032 00033 00038 struct AllAgentConfig : public std::vector<AgentConfigEntry> { 00039 std::unordered_map<std::string, int> agentIndex; 00040 std::unordered_map<AccountKey, std::vector<int> > accountIndex; 00041 }; 00042 00043 00044 /*****************************************************************************/ 00045 /* AGENT CONFIGURATION LISTENER */ 00046 /*****************************************************************************/ 00047 00048 struct AgentConfigurationListener: public MessageLoop { 00049 00050 AgentConfigurationListener(std::shared_ptr<zmq::context_t> context) 00051 : allAgents(new AllAgentConfig()), configEndpoint(context) 00052 { 00053 } 00054 00055 ~AgentConfigurationListener() 00056 { 00057 shutdown(); 00058 } 00059 00063 typedef std::function<void (std::string, std::shared_ptr<const AgentConfig>)> 00064 OnConfigChange; 00065 00066 OnConfigChange onConfigChange; 00067 00068 void init(std::shared_ptr<ConfigurationService> config) 00069 { 00070 configEndpoint.init(config); 00071 configEndpoint.connectToServiceClass("rtbAgentConfiguration", 00072 "listen"); 00073 configEndpoint.messageHandler 00074 = [=] (const std::vector<std::string> & message) 00075 { 00076 if (message.size() > 1) { 00077 using namespace std; 00078 cerr << "got configuration message " << message[1] << endl; 00079 } 00080 this->onMessage(message); 00081 }; 00082 addSource("AgentConfigurationListener::configEndpoint", configEndpoint); 00083 } 00084 00085 void start() 00086 { 00087 MessageLoop::start(); 00088 } 00089 00090 void shutdown() 00091 { 00092 MessageLoop::shutdown(); 00093 configEndpoint.shutdown(); 00094 } 00095 00096 /*************************************************************************/ 00097 /* AGENT INTERACTIONS */ 00098 /*************************************************************************/ 00099 00100 typedef std::function<void (const AgentConfigEntry & info)> OnAgentFn; 00102 void forEachAgent(const OnAgentFn & onAgent) const; 00103 00107 void forEachAccountAgent(const AccountKey & account, 00108 const OnAgentFn & onAgent) const; 00109 00113 AgentConfigEntry getAgentEntry(const std::string & agent) const; 00114 00115 private: 00116 void onMessage(const std::vector<std::string> & message); 00117 00118 AllAgentConfig * allAgents; 00119 mutable GcLock allAgentsGc; 00120 00121 ZmqNamedClientBusProxy configEndpoint; 00122 }; 00123 00124 00125 } // namespace RTBKIT 00126