RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zookeeper_configuration_service.cc
00001 
00008 #include "zookeeper_configuration_service.h"
00009 #include "soa/service/zookeeper.h"
00010 #include "jml/utils/exc_assert.h"
00011 #include <boost/algorithm/string.hpp>
00012 
00013 
00014 using namespace std;
00015 using namespace ML;
00016 
00017 namespace Datacratic {
00018 
00019 std::string printZookeeperEventType(int type)
00020 {
00021 
00022     if (type == ZOO_CREATED_EVENT)
00023         return "CREATED";
00024     if (type == ZOO_DELETED_EVENT)
00025         return "DELETED";
00026     if (type == ZOO_CHANGED_EVENT)
00027         return "CHANGED";
00028     if (type == ZOO_CHILD_EVENT)
00029         return "CHILD";
00030     if (type == ZOO_SESSION_EVENT)
00031         return "SESSION";
00032     if (type == ZOO_NOTWATCHING_EVENT)
00033         return "NOTWATCHING";
00034     return ML::format("UNKNOWN(%d)", type);
00035 }
00036 
00037 std::string printZookeeperState(int state)
00038 {
00039     if (state == ZOO_EXPIRED_SESSION_STATE)
00040         return "ZOO_EXPIRED_SESSION_STATE";
00041     if (state == ZOO_AUTH_FAILED_STATE)
00042         return "ZOO_AUTH_FAILED_STATE";
00043     if (state == ZOO_CONNECTING_STATE)
00044         return "ZOO_CONNECTING_STATE";
00045     if (state == ZOO_ASSOCIATING_STATE)
00046         return "ZOO_ASSOCIATING_STATE";
00047     if (state == ZOO_CONNECTED_STATE)
00048         return "ZOO_CONNECTED_STATE";
00049     return ML::format("ZOO_UNKNOWN_STATE(%d)", state);
00050 }
00051 
00052 void
00053 watcherFn(int type, std::string const & path, void * watcherCtx)
00054 {
00055     typedef std::shared_ptr<ConfigurationService::Watch::Data> SharedPtr;
00056     std::unique_ptr<SharedPtr> data(reinterpret_cast<SharedPtr *>(watcherCtx));
00057 #if 0
00058     cerr << "type = " << printZookeeperEventType(type)
00059          << " state = " << printZookeeperState(state)
00060          << " path = " << path << " context "
00061          << watcherCtx << " data " << data->get() << endl;
00062 #endif
00063 
00064     ConfigurationService::ChangeType change;
00065     if (type == ZOO_CREATED_EVENT)
00066         change = ConfigurationService::CREATED;
00067     if (type == ZOO_DELETED_EVENT)
00068         change = ConfigurationService::DELETED;
00069     if (type == ZOO_CHANGED_EVENT)
00070         change = ConfigurationService::VALUE_CHANGED;
00071     if (type == ZOO_CHILD_EVENT)
00072         change = ConfigurationService::NEW_CHILD;
00073 
00074     auto & item = *data;
00075     if (item->watchReferences > 0) {
00076         item->onChange(path, change);
00077     }
00078 }
00079 
00080 ZookeeperConnection::Callback::Type
00081 getWatcherFn(const ConfigurationService::Watch & watch)
00082 {
00083     if (!watch)
00084         return nullptr;
00085     return watcherFn;
00086 }
00087 
00088 
00089 
00090 /*****************************************************************************/
00091 /* ZOOKEEPER CONFIGURATION SERVICE                                           */
00092 /*****************************************************************************/
00093 
00094 ZookeeperConfigurationService::
00095 ZookeeperConfigurationService()
00096 {
00097 }
00098 
00099 ZookeeperConfigurationService::
00100 ZookeeperConfigurationService(const std::string & host,
00101                               const std::string & prefix,
00102                               int timeout)
00103 {
00104     init(host, prefix, timeout);
00105 }
00106     
00107 ZookeeperConfigurationService::
00108 ~ZookeeperConfigurationService()
00109 {
00110 }
00111 
00112 void
00113 ZookeeperConfigurationService::
00114 init(const std::string & host,
00115      const std::string & prefix,
00116      int timeout)
00117 {
00118     zoo.reset(new ZookeeperConnection());
00119     zoo->connect(host, timeout);
00120     this->prefix = prefix;
00121 
00122     if (!this->prefix.empty()
00123         && this->prefix[this->prefix.size() - 1] != '/')
00124         this->prefix = this->prefix + "/";
00125 
00126     if (!this->prefix.empty()
00127         && this->prefix[0] != '/')
00128         this->prefix = "/" + this->prefix;
00129     
00130     zoo->createPath(this->prefix);
00131 
00132 #if 0
00133     for (unsigned i = 1;  i < prefix.size();  ++i) {
00134         if (prefix[i] == '/') {
00135             zoo->createNode(string(prefix, 0, i),
00136                             "",
00137                             false,
00138                             false,
00139                             false /* must succeed */);
00140         }
00141     }
00142 #endif
00143 }
00144 
00145 Json::Value
00146 ZookeeperConfigurationService::
00147 getJson(const std::string & key, Watch watch)
00148 {
00149     ExcAssert(zoo);
00150     auto val = zoo->readNode(prefix + key, getWatcherFn(watch),
00151                              watch.get());
00152     try {
00153         if (val == "")
00154             return Json::Value();
00155         return Json::parse(val);
00156     } catch (...) {
00157         cerr << "error parsing JSON entry '" << val << "'" << endl;
00158         throw;
00159     }
00160 }
00161     
00162 void
00163 ZookeeperConfigurationService::
00164 set(const std::string & key,
00165     const Json::Value & value)
00166 {
00167     //cerr << "setting " << key << " to " << value << endl;
00168     // TODO: race condition
00169     if (!zoo->createNode(prefix + key, boost::trim_copy(value.toString()),
00170                          false, false,
00171                          false /* must succeed */,
00172                          true /* create path */).second)
00173         zoo->writeNode(prefix + key, boost::trim_copy(value.toString()));
00174     ExcAssert(zoo);
00175 }
00176 
00177 std::string
00178 ZookeeperConfigurationService::
00179 setUnique(const std::string & key,
00180           const Json::Value & value)
00181 {
00182     //cerr << "setting unique " << key << " to " << value << endl;
00183     ExcAssert(zoo);
00184     return zoo->createNode(prefix + key, boost::trim_copy(value.toString()),
00185                            true /* ephemeral */,
00186                            false /* sequential */,
00187                            true /* mustSucceed */,
00188                            true /* create path */)
00189         .first;
00190 }
00191 
00192 std::vector<std::string>
00193 ZookeeperConfigurationService::
00194 getChildren(const std::string & key,
00195             Watch watch)
00196 {
00197     //cerr << "getChildren " << key << " watch " << watch << endl;
00198     return zoo->getChildren(prefix + key,
00199                             false /* fail if not there */,
00200                             getWatcherFn(watch),
00201                             watch.get());
00202 }
00203 
00204 bool
00205 ZookeeperConfigurationService::
00206 forEachEntry(const OnEntry & onEntry,
00207                       const std::string & startPrefix) const
00208 {
00209     //cerr << "forEachEntry: startPrefix = " << startPrefix << endl;
00210 
00211     ExcAssert(zoo);
00212 
00213     std::function<bool (const std::string &)> doNode
00214         = [&] (const std::string & currentPrefix)
00215         {
00216             //cerr << "doNode " << currentPrefix << endl;
00217 
00218             string r = zoo->readNode(prefix + currentPrefix);
00219 
00220             //cerr << "r = " << r << endl;
00221             
00222             if (r != "") {
00223                 if (!onEntry(currentPrefix, Json::parse(r)))
00224                     return false;
00225             }
00226 
00227             vector<string> children = zoo->getChildren(prefix + currentPrefix,
00228                                                        false);
00229             
00230             for (auto child: children) {
00231                 //cerr << "child = " << child << endl;
00232                 string newPrefix = currentPrefix + "/" + child;
00233                 if (currentPrefix.empty())
00234                     newPrefix = child;
00235                 
00236                 if (!doNode(newPrefix))
00237                     return false;
00238             }
00239             
00240             return true;
00241         };
00242 
00243     if (!zoo->nodeExists(prefix + startPrefix)) {
00244         
00245 
00246         return true;
00247     }
00248     
00249     return doNode(startPrefix);
00250 }
00251 
00252 void
00253 ZookeeperConfigurationService::
00254 removePath(const std::string & path)
00255 {
00256     ExcAssert(zoo);
00257     zoo->removePath(prefix + path);
00258 }
00259 
00260 
00261 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator