![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00008 #include "zookeeper_configuration_service.h" 00009 #include "soa/service/zookeeper.h" 00010 #include "jml/utils/exc_assert.h" 00011 #include <boost/algorithm/string.hpp> 00012 00013 00014 using namespace std; 00015 using namespace ML; 00016 00017 namespace Datacratic { 00018 00019 std::string printZookeeperEventType(int type) 00020 { 00021 00022 if (type == ZOO_CREATED_EVENT) 00023 return "CREATED"; 00024 if (type == ZOO_DELETED_EVENT) 00025 return "DELETED"; 00026 if (type == ZOO_CHANGED_EVENT) 00027 return "CHANGED"; 00028 if (type == ZOO_CHILD_EVENT) 00029 return "CHILD"; 00030 if (type == ZOO_SESSION_EVENT) 00031 return "SESSION"; 00032 if (type == ZOO_NOTWATCHING_EVENT) 00033 return "NOTWATCHING"; 00034 return ML::format("UNKNOWN(%d)", type); 00035 } 00036 00037 std::string printZookeeperState(int state) 00038 { 00039 if (state == ZOO_EXPIRED_SESSION_STATE) 00040 return "ZOO_EXPIRED_SESSION_STATE"; 00041 if (state == ZOO_AUTH_FAILED_STATE) 00042 return "ZOO_AUTH_FAILED_STATE"; 00043 if (state == ZOO_CONNECTING_STATE) 00044 return "ZOO_CONNECTING_STATE"; 00045 if (state == ZOO_ASSOCIATING_STATE) 00046 return "ZOO_ASSOCIATING_STATE"; 00047 if (state == ZOO_CONNECTED_STATE) 00048 return "ZOO_CONNECTED_STATE"; 00049 return ML::format("ZOO_UNKNOWN_STATE(%d)", state); 00050 } 00051 00052 void 00053 watcherFn(int type, std::string const & path, void * watcherCtx) 00054 { 00055 typedef std::shared_ptr<ConfigurationService::Watch::Data> SharedPtr; 00056 std::unique_ptr<SharedPtr> data(reinterpret_cast<SharedPtr *>(watcherCtx)); 00057 #if 0 00058 cerr << "type = " << printZookeeperEventType(type) 00059 << " state = " << printZookeeperState(state) 00060 << " path = " << path << " context " 00061 << watcherCtx << " data " << data->get() << endl; 00062 #endif 00063 00064 ConfigurationService::ChangeType change; 00065 if (type == ZOO_CREATED_EVENT) 00066 change = ConfigurationService::CREATED; 00067 if (type == ZOO_DELETED_EVENT) 00068 change = ConfigurationService::DELETED; 00069 if (type == ZOO_CHANGED_EVENT) 00070 change = ConfigurationService::VALUE_CHANGED; 00071 if (type == ZOO_CHILD_EVENT) 00072 change = ConfigurationService::NEW_CHILD; 00073 00074 auto & item = *data; 00075 if (item->watchReferences > 0) { 00076 item->onChange(path, change); 00077 } 00078 } 00079 00080 ZookeeperConnection::Callback::Type 00081 getWatcherFn(const ConfigurationService::Watch & watch) 00082 { 00083 if (!watch) 00084 return nullptr; 00085 return watcherFn; 00086 } 00087 00088 00089 00090 /*****************************************************************************/ 00091 /* ZOOKEEPER CONFIGURATION SERVICE */ 00092 /*****************************************************************************/ 00093 00094 ZookeeperConfigurationService:: 00095 ZookeeperConfigurationService() 00096 { 00097 } 00098 00099 ZookeeperConfigurationService:: 00100 ZookeeperConfigurationService(const std::string & host, 00101 const std::string & prefix, 00102 int timeout) 00103 { 00104 init(host, prefix, timeout); 00105 } 00106 00107 ZookeeperConfigurationService:: 00108 ~ZookeeperConfigurationService() 00109 { 00110 } 00111 00112 void 00113 ZookeeperConfigurationService:: 00114 init(const std::string & host, 00115 const std::string & prefix, 00116 int timeout) 00117 { 00118 zoo.reset(new ZookeeperConnection()); 00119 zoo->connect(host, timeout); 00120 this->prefix = prefix; 00121 00122 if (!this->prefix.empty() 00123 && this->prefix[this->prefix.size() - 1] != '/') 00124 this->prefix = this->prefix + "/"; 00125 00126 if (!this->prefix.empty() 00127 && this->prefix[0] != '/') 00128 this->prefix = "/" + this->prefix; 00129 00130 zoo->createPath(this->prefix); 00131 00132 #if 0 00133 for (unsigned i = 1; i < prefix.size(); ++i) { 00134 if (prefix[i] == '/') { 00135 zoo->createNode(string(prefix, 0, i), 00136 "", 00137 false, 00138 false, 00139 false /* must succeed */); 00140 } 00141 } 00142 #endif 00143 } 00144 00145 Json::Value 00146 ZookeeperConfigurationService:: 00147 getJson(const std::string & key, Watch watch) 00148 { 00149 ExcAssert(zoo); 00150 auto val = zoo->readNode(prefix + key, getWatcherFn(watch), 00151 watch.get()); 00152 try { 00153 if (val == "") 00154 return Json::Value(); 00155 return Json::parse(val); 00156 } catch (...) { 00157 cerr << "error parsing JSON entry '" << val << "'" << endl; 00158 throw; 00159 } 00160 } 00161 00162 void 00163 ZookeeperConfigurationService:: 00164 set(const std::string & key, 00165 const Json::Value & value) 00166 { 00167 //cerr << "setting " << key << " to " << value << endl; 00168 // TODO: race condition 00169 if (!zoo->createNode(prefix + key, boost::trim_copy(value.toString()), 00170 false, false, 00171 false /* must succeed */, 00172 true /* create path */).second) 00173 zoo->writeNode(prefix + key, boost::trim_copy(value.toString())); 00174 ExcAssert(zoo); 00175 } 00176 00177 std::string 00178 ZookeeperConfigurationService:: 00179 setUnique(const std::string & key, 00180 const Json::Value & value) 00181 { 00182 //cerr << "setting unique " << key << " to " << value << endl; 00183 ExcAssert(zoo); 00184 return zoo->createNode(prefix + key, boost::trim_copy(value.toString()), 00185 true /* ephemeral */, 00186 false /* sequential */, 00187 true /* mustSucceed */, 00188 true /* create path */) 00189 .first; 00190 } 00191 00192 std::vector<std::string> 00193 ZookeeperConfigurationService:: 00194 getChildren(const std::string & key, 00195 Watch watch) 00196 { 00197 //cerr << "getChildren " << key << " watch " << watch << endl; 00198 return zoo->getChildren(prefix + key, 00199 false /* fail if not there */, 00200 getWatcherFn(watch), 00201 watch.get()); 00202 } 00203 00204 bool 00205 ZookeeperConfigurationService:: 00206 forEachEntry(const OnEntry & onEntry, 00207 const std::string & startPrefix) const 00208 { 00209 //cerr << "forEachEntry: startPrefix = " << startPrefix << endl; 00210 00211 ExcAssert(zoo); 00212 00213 std::function<bool (const std::string &)> doNode 00214 = [&] (const std::string & currentPrefix) 00215 { 00216 //cerr << "doNode " << currentPrefix << endl; 00217 00218 string r = zoo->readNode(prefix + currentPrefix); 00219 00220 //cerr << "r = " << r << endl; 00221 00222 if (r != "") { 00223 if (!onEntry(currentPrefix, Json::parse(r))) 00224 return false; 00225 } 00226 00227 vector<string> children = zoo->getChildren(prefix + currentPrefix, 00228 false); 00229 00230 for (auto child: children) { 00231 //cerr << "child = " << child << endl; 00232 string newPrefix = currentPrefix + "/" + child; 00233 if (currentPrefix.empty()) 00234 newPrefix = child; 00235 00236 if (!doNode(newPrefix)) 00237 return false; 00238 } 00239 00240 return true; 00241 }; 00242 00243 if (!zoo->nodeExists(prefix + startPrefix)) { 00244 00245 00246 return true; 00247 } 00248 00249 return doNode(startPrefix); 00250 } 00251 00252 void 00253 ZookeeperConfigurationService:: 00254 removePath(const std::string & path) 00255 { 00256 ExcAssert(zoo); 00257 zoo->removePath(prefix + path); 00258 } 00259 00260 00261 } // namespace Datacratic