RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/agent_configuration/agent_configuration_listener.cc
00001 /* agent_configuration_listener.cc
00002    Jeremy Barnes, 26 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Listener for configuration.
00006 */
00007 
00008 #include "agent_configuration_listener.h"
00009 #include "agent_config.h"
00010 
00011 namespace RTBKIT {
00012 
00013 
00014 /*****************************************************************************/
00015 /* AGENT CONFIGURATION LISTENER                                              */
00016 /*****************************************************************************/
00017 
00018 
00019 void
00020 AgentConfigurationListener::
00021 forEachAgent(const OnAgentFn & onAgent) const
00022 {
00023     GcLock::SharedGuard guard(allAgentsGc);
00024     const AllAgentConfig * ac = allAgents;
00025     if (!ac) return;
00026 
00027     std::for_each(ac->begin(), ac->end(), onAgent);
00028 }
00029 
00030 void
00031 AgentConfigurationListener::
00032 forEachAccountAgent(const AccountKey & account,
00033                     const OnAgentFn & onAgent) const
00034 {
00035     GcLock::SharedGuard guard(allAgentsGc);
00036     const AllAgentConfig * ac = allAgents;
00037     if (!ac) return;
00038 
00039     auto it = ac->accountIndex.find(account);
00040     if (it == ac->accountIndex.end())
00041         return;
00042 
00043     for (auto jt = it->second.begin(), jend = it->second.end();
00044          jt != jend;  ++jt)
00045         onAgent(ac->at(*jt));
00046 }
00047 
00048 AgentConfigEntry
00049 AgentConfigurationListener::
00050 getAgentEntry(const std::string & agent) const
00051 {
00052     GcLock::SharedGuard guard(allAgentsGc);
00053     const AllAgentConfig * ac = allAgents;
00054     if (!ac) return AgentConfigEntry();
00055 
00056     auto it = ac->agentIndex.find(agent);
00057     if (it == ac->agentIndex.end())
00058         return AgentConfigEntry();
00059     return ac->at(it->second);
00060 }
00061 
00062 void
00063 AgentConfigurationListener::
00064 onMessage(const std::vector<std::string> & message)
00065 {
00066     using namespace std;
00067 
00068     const std::string & topic = message.at(0);
00069     if (topic != "CONFIG") {
00070         cerr << "unknown message for agent configuration listener" << endl;
00071         cerr << message;
00072         return;
00073     }
00074     const std::string & agent = message.at(1);
00075     const std::string & configStr = message.at(2);
00076 
00077     std::shared_ptr<AgentConfig> config;
00078 
00079     if (!configStr.empty()) {
00080         Json::Value j = Json::parse(configStr);
00081         config.reset(new AgentConfig(AgentConfig::createFromJson(j)));
00082     }
00083 
00084     /* Now, update the current configuration list */
00085 
00086     GcLock::SharedGuard guard(allAgentsGc);
00087     AllAgentConfig * ac = allAgents;
00088     
00089     /* Create a new object and copy the old ones in */
00090     std::unique_ptr<AllAgentConfig> newConfig(new AllAgentConfig());
00091     bool found = false;
00092     for (auto & c: *ac) {
00093         if (c.name == agent) {
00094             found = true;
00095             if (config) {
00096                 AgentConfigEntry ce = c;
00097                 ce.config = config;
00098                 newConfig->emplace_back(ce);
00099             }
00100             else continue;
00101         }
00102         else newConfig->emplace_back(c);
00103 
00104         int i = newConfig->size() - 1;
00105         newConfig->agentIndex[c.name] = i;
00106         newConfig->accountIndex[newConfig->back().config->account].push_back(i);
00107     }
00108     if (!found && config) {
00109         AgentConfigEntry ce;
00110         ce.name = agent;
00111         ce.config = config;
00112         newConfig->emplace_back(ce);
00113 
00114         int i = newConfig->size() - 1;
00115         newConfig->agentIndex[agent] = i;
00116         newConfig->accountIndex[newConfig->back().config->account].push_back(i);
00117     }
00118 
00119     if (ML::cmp_xchg(allAgents, ac, (AllAgentConfig *)newConfig.get())) {
00120         newConfig.release();
00121         ExcAssertNotEqual(ac, allAgents);
00122         if (ac)
00123             allAgentsGc.defer([=] () { delete ac; });
00124     }
00125     else {
00126         throw ML::Exception("cmp_exch failed for AgentConfigurationListener");
00127     }
00128 
00129     if (onConfigChange)
00130         onConfigChange(agent, config);
00131 }
00132 
00133 
00134 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator