RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zookeeper.h
00001 /* zookeeper.h                                                     -*- C++ -*-
00002    Jeremy Barnes, 17 August 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #pragma once
00008 
00009 #include <zookeeper/zookeeper.h>
00010 
00011 #include "jml/arch/exception.h"
00012 #include "jml/arch/format.h"
00013 #include "jml/utils/guard.h"
00014 
00015 #include <set>
00016 #include <iostream>
00017 #include <vector>
00018 #include <mutex>
00019 #include <thread>
00020 
00021 
00022 namespace Datacratic {
00023 
00024 /*****************************************************************************/
00025 /* ZOOKEEPER CONNECTION                                                      */
00026 /*****************************************************************************/
00027 
00028 struct ZookeeperConnection {
00029 
00030     template<typename T>
00031     struct CallbackNode {
00032         T * next;
00033         T * last;
00034 
00035         CallbackNode() {
00036             next = last = (T *) this;
00037         }
00038 
00039         ~CallbackNode() {
00040             unlink();
00041         }
00042 
00043         void add(T * node) {
00044             node->next = next;
00045             node->last = (T *) this;
00046             next->last = node;
00047             this->next = node;
00048         }
00049 
00050         void unlink() {
00051             next->last = last;
00052             last->next = next;
00053             next = last = (T *) this;
00054         }
00055     };
00056 
00057     struct Callback : public CallbackNode<Callback> {
00058         typedef void (* Type)(int type, std::string const & path, void * data);
00059         Type callback;
00060         std::string path;
00061         void * user;
00062 
00063         Callback(Type callback, std::string path, void * user) : callback(callback), path(path), user(user) {
00064         }
00065 
00066         void call(int type) {
00067             callback(type, path, user);
00068             delete this;
00069         }
00070     };
00071 
00072     Callback * getCallback(Callback::Type watch, std::string const & path, void * data) {
00073         if(!watch) {
00074             return nullptr;
00075         }
00076 
00077         Callback * item = new Callback(watch, path, data);
00078         callbacks.add(item);
00079         return item;
00080     }
00081 
00082     ZookeeperConnection();
00083     ~ZookeeperConnection() { close(); }
00084 
00085     static std::string printEvent(int eventType);
00086 
00087     static std::string printState(int state);
00088 
00090     void connect(const std::string & host,
00091                  double timeoutInSeconds = 5);
00092 
00093     void reconnect();
00094 
00095     void close();
00096 
00097     enum CheckResult {
00098         CR_RETRY,  
00099         CR_DONE    
00100     };
00101 
00107     CheckResult checkRes(int returnCode, int & retries,
00108                          const char * operation, const char * path);
00109 
00110     std::pair<std::string, bool>
00111     createNode(const std::string & path,
00112                const std::string & value,
00113                bool ephemeral,
00114                bool sequence,
00115                bool mustSucceed = true,
00116                bool createPath = false);
00117 
00122     bool deleteNode(const std::string & path, bool throwIfNodeMissing = true);
00123 
00125     void createPath(const std::string & path);
00126 
00128     void removePath(const std::string & path);
00129 
00131     bool nodeExists(const std::string & path,
00132                     Callback::Type watcher = 0,
00133                     void * watcherData = 0);
00134 
00135     std::string readNode(const std::string & path,
00136                          Callback::Type watcher = 0,
00137                          void * watcherData = 0);
00138 
00139     void writeNode(const std::string & path, const std::string & value);
00140 
00141     std::vector<std::string>
00142     getChildren(const std::string & path,
00143                 bool failIfNodeMissing = true,
00144                 Callback::Type watcher = 0,
00145                 void * watcherData = 0);
00146 
00147     static void eventHandlerFn(zhandle_t * handle,
00148                                int event,
00149                                int state,
00150                                const char * path,
00151                                void * context);
00152 
00154     static std::string fixPath(const std::string & path);
00155 
00156     std::timed_mutex connectMutex;
00157     std::string host;
00158     int recvTimeout;
00159     clientid_t clientId;
00160     zhandle_t * handle;
00161 
00162     struct Node {
00163         Node(std::string const & path) : path(path) {
00164         }
00165 
00166         Node(std::string const & path, std::string const & value) : path(path), value(value) {
00167         }
00168 
00169         bool operator<(Node const & other) const {
00170             return path < other.path;
00171         }
00172 
00173         std::string path;
00174         mutable std::string value;
00175     };
00176 
00177     std::set<Node> ephemerals;
00178 
00179     // head of doubly linked list of callbacks
00180     CallbackNode<Callback> callbacks;
00181 };
00182 
00183 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator