RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* agent_configuration_listener.cc 00002 Jeremy Barnes, 26 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Listener for configuration. 00006 */ 00007 00008 #include "agent_configuration_listener.h" 00009 #include "agent_config.h" 00010 00011 namespace RTBKIT { 00012 00013 00014 /*****************************************************************************/ 00015 /* AGENT CONFIGURATION LISTENER */ 00016 /*****************************************************************************/ 00017 00018 00019 void 00020 AgentConfigurationListener:: 00021 forEachAgent(const OnAgentFn & onAgent) const 00022 { 00023 GcLock::SharedGuard guard(allAgentsGc); 00024 const AllAgentConfig * ac = allAgents; 00025 if (!ac) return; 00026 00027 std::for_each(ac->begin(), ac->end(), onAgent); 00028 } 00029 00030 void 00031 AgentConfigurationListener:: 00032 forEachAccountAgent(const AccountKey & account, 00033 const OnAgentFn & onAgent) const 00034 { 00035 GcLock::SharedGuard guard(allAgentsGc); 00036 const AllAgentConfig * ac = allAgents; 00037 if (!ac) return; 00038 00039 auto it = ac->accountIndex.find(account); 00040 if (it == ac->accountIndex.end()) 00041 return; 00042 00043 for (auto jt = it->second.begin(), jend = it->second.end(); 00044 jt != jend; ++jt) 00045 onAgent(ac->at(*jt)); 00046 } 00047 00048 AgentConfigEntry 00049 AgentConfigurationListener:: 00050 getAgentEntry(const std::string & agent) const 00051 { 00052 GcLock::SharedGuard guard(allAgentsGc); 00053 const AllAgentConfig * ac = allAgents; 00054 if (!ac) return AgentConfigEntry(); 00055 00056 auto it = ac->agentIndex.find(agent); 00057 if (it == ac->agentIndex.end()) 00058 return AgentConfigEntry(); 00059 return ac->at(it->second); 00060 } 00061 00062 void 00063 AgentConfigurationListener:: 00064 onMessage(const std::vector<std::string> & message) 00065 { 00066 using namespace std; 00067 00068 const std::string & topic = message.at(0); 00069 if (topic != "CONFIG") { 00070 cerr << "unknown message for agent configuration listener" << endl; 00071 cerr << message; 00072 return; 00073 } 00074 const std::string & agent = message.at(1); 00075 const std::string & configStr = message.at(2); 00076 00077 std::shared_ptr<AgentConfig> config; 00078 00079 if (!configStr.empty()) { 00080 Json::Value j = Json::parse(configStr); 00081 config.reset(new AgentConfig(AgentConfig::createFromJson(j))); 00082 } 00083 00084 /* Now, update the current configuration list */ 00085 00086 GcLock::SharedGuard guard(allAgentsGc); 00087 AllAgentConfig * ac = allAgents; 00088 00089 /* Create a new object and copy the old ones in */ 00090 std::unique_ptr<AllAgentConfig> newConfig(new AllAgentConfig()); 00091 bool found = false; 00092 for (auto & c: *ac) { 00093 if (c.name == agent) { 00094 found = true; 00095 if (config) { 00096 AgentConfigEntry ce = c; 00097 ce.config = config; 00098 newConfig->emplace_back(ce); 00099 } 00100 else continue; 00101 } 00102 else newConfig->emplace_back(c); 00103 00104 int i = newConfig->size() - 1; 00105 newConfig->agentIndex[c.name] = i; 00106 newConfig->accountIndex[newConfig->back().config->account].push_back(i); 00107 } 00108 if (!found && config) { 00109 AgentConfigEntry ce; 00110 ce.name = agent; 00111 ce.config = config; 00112 newConfig->emplace_back(ce); 00113 00114 int i = newConfig->size() - 1; 00115 newConfig->agentIndex[agent] = i; 00116 newConfig->accountIndex[newConfig->back().config->account].push_back(i); 00117 } 00118 00119 if (ML::cmp_xchg(allAgents, ac, (AllAgentConfig *)newConfig.get())) { 00120 newConfig.release(); 00121 ExcAssertNotEqual(ac, allAgents); 00122 if (ac) 00123 allAgentsGc.defer([=] () { delete ac; }); 00124 } 00125 else { 00126 throw ML::Exception("cmp_exch failed for AgentConfigurationListener"); 00127 } 00128 00129 if (onConfigChange) 00130 onConfigChange(agent, config); 00131 } 00132 00133 00134 } // namespace RTBKIT