RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zookeeper.cc
00001 /* zookeeper.cc
00002    Jeremy Barnes, 17 August 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "soa/service/zookeeper.h"
00008 #include "jml/arch/timers.h"
00009 
00010 using namespace std;
00011 
00012 namespace Datacratic {
00013 
00014 namespace {
00015 
00016 struct Init {
00017     Init()
00018     {
00019         zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
00020     }
00021 } init;
00022 
00023 void zk_callback(zhandle_t * ah, int type, int state, const char * path, void * user) {
00024     auto cb = reinterpret_cast<ZookeeperConnection::Callback *>(user);
00025     if(cb) {
00026         cb->unlink();
00027         cb->call(type);
00028     }
00029 }
00030 
00031 } // file scope
00032 
00033 /*****************************************************************************/
00034 /* ZOOKEEPER CONNECTION                                                      */
00035 /*****************************************************************************/
00036 
00037 ZookeeperConnection::
00038 ZookeeperConnection()
00039     : recvTimeout(1000 /* one second */),
00040       handle(0)
00041 {
00042 }
00043     
00044 std::string
00045 ZookeeperConnection::
00046 printEvent(int eventType)
00047 {
00048     if (eventType == ZOO_CREATED_EVENT)
00049         return "CREATED";
00050     else if (eventType == ZOO_DELETED_EVENT)
00051         return "DELETED";
00052     else if (eventType == ZOO_CHANGED_EVENT)
00053         return "CHANGED";
00054     else if (eventType == ZOO_CHILD_EVENT)
00055         return "CHILD";
00056     else if (eventType == ZOO_SESSION_EVENT)
00057         return "SESSION";
00058     else if (eventType == ZOO_NOTWATCHING_EVENT)
00059         return "NOTWATCHING";
00060     else
00061         return ML::format("UNKNOWN(%d)", eventType);
00062 }
00063 
00064 std::string
00065 ZookeeperConnection::
00066 printState(int state)
00067 {
00068     if (state == ZOO_EXPIRED_SESSION_STATE)
00069         return "EXPIRED";
00070     else if (state == ZOO_AUTH_FAILED_STATE)
00071         return "AUTH_FAILED";
00072     else if (state == ZOO_CONNECTING_STATE)
00073         return "CONNECTING";
00074     else if (state == ZOO_CONNECTED_STATE)
00075         return "CONNECTED";
00076     else if (state == ZOO_ASSOCIATING_STATE)
00077         return "ASSOCIATING";
00078     else 
00079         return ML::format("UNKNOWN(%d)", state);
00080 }
00081 
00082 void
00083 ZookeeperConnection::
00084 connect(const std::string & host,
00085         double timeoutInSeconds)
00086 {
00087     if (handle)
00088         throw ML::Exception("can't connect; handle already exists");
00089 
00090     if (!connectMutex.try_lock())
00091         throw ML::Exception("attempting to connect from two threads");
00092 
00093     this->host = host;
00094 
00095     ML::Call_Guard guard([&] () { connectMutex.unlock(); });
00096 
00097     int wait = timeoutInSeconds * 1000;
00098     int timeout = wait;
00099     int times = 3;
00100 
00101     for(int i = 0; i != times; ++i) {
00102         handle = zookeeper_init(host.c_str(), eventHandlerFn, recvTimeout, 0, this, 0);
00103 
00104         if (!handle)
00105             throw ML::Exception(errno, "failed to initialize ZooKeeper at " + host);
00106 
00107         if (connectMutex.try_lock_for(std::chrono::milliseconds(timeout)))
00108             return;
00109 
00110         zookeeper_close(handle);
00111         
00112         int ms = wait + (std::rand() % wait);
00113         wait *= 2;
00114 
00115         ML::sleep(ms / 1000.0);
00116     }
00117 
00118     throw ML::Exception("connection to Zookeeper timed out");
00119 }
00120 
00121 void
00122 ZookeeperConnection::
00123 reconnect()
00124 {
00125     if(handle) {
00126         zookeeper_close(handle);
00127         handle = 0;
00128     }
00129 
00130     connect(host);
00131 
00132     for(auto & item : ephemerals) {
00133         createNode(item.path, item.value, true, false, true, true);
00134     }
00135 
00136     while(callbacks.next != &callbacks) {
00137         auto i = callbacks.next;
00138         i->unlink();
00139         i->call(ZOO_DELETED_EVENT);
00140     }
00141 }
00142 
00143 void
00144 ZookeeperConnection::
00145 close()
00146 {
00147     using namespace std;
00148     if (!handle)
00149         return;
00150     int res = zookeeper_close(handle);
00151     handle = 0;
00152     if (res != ZOK)
00153         cerr << "warning: closing returned error: " << zerror(res) << endl;
00154 
00155     ephemerals.clear();
00156 }
00157 
00158 ZookeeperConnection::CheckResult
00159 ZookeeperConnection::
00160 checkRes(int returnCode, int & retries,
00161          const char * operation, const char * path)
00162 {
00163     int maxRetries = 3;
00164 
00165     if (returnCode == ZOK)
00166         return CR_DONE;
00167 
00168     if (retries < maxRetries
00169         && is_unrecoverable(handle) != ZOK) {
00170         reconnect();
00171         ++retries;
00172         return CR_RETRY;
00173     }
00174 
00175     if (retries < maxRetries
00176         && (returnCode == ZCLOSING
00177             || returnCode == ZSESSIONMOVED
00178             || returnCode == ZSESSIONEXPIRED
00179             || returnCode == ZINVALIDSTATE
00180             || returnCode == ZOPERATIONTIMEOUT
00181             || returnCode == ZCONNECTIONLOSS)) {
00182         reconnect();
00183         ++retries;
00184         return CR_RETRY;
00185     }
00186 
00187     cerr << "zookeeper error on " << operation << ", path "
00188          << path << ": " << zerror(returnCode) << endl;
00189     throw ML::Exception("Zookeeper error on %s, path %s: %s",
00190                         operation, path, zerror(returnCode));
00191 }
00192 
00193 void
00194 ZookeeperConnection::
00195 createPath(const std::string & path)
00196 {
00197     if (path == "/")
00198         return;
00199     string::size_type pos = path.rfind('/');
00200     if (pos == string::npos)
00201         return;
00202 
00203     string prefix(path, 0, pos);
00204 
00205     int retries = 0;
00206     for (;;) {
00207         int res = zoo_create(handle,
00208                              prefix.c_str(),
00209                              0, 0,
00210                              &ZOO_OPEN_ACL_UNSAFE,
00211                              0, 0, 0);
00212 
00213         if (res == ZNODEEXISTS)
00214             return;
00215         
00216         if (res == ZNONODE) {
00217             createPath(prefix);
00218             continue;
00219         }
00220 
00221         if (checkRes(res, retries, "zoo_create", path.c_str()) == CR_DONE)
00222             break;
00223     }
00224 }
00225 
00226 void
00227 ZookeeperConnection::
00228 removePath(const std::string & path_)
00229 {
00230     string path = fixPath(path_);
00231 
00232     std::function<void (const std::string &)> doNode
00233         = [&] (const std::string & currentPath)
00234         {
00235             vector<string> children
00236                 = getChildren(currentPath,
00237                               false /* throwIfNodeMissing */);
00238             
00239             for (auto child: children)
00240                 if (currentPath[currentPath.size() - 1] == '/')
00241                     doNode(currentPath + child);
00242                 else
00243                     doNode(currentPath + "/" + child);
00244             
00245             deleteNode(currentPath, false /* throwIfNodeMissing */);
00246         };
00247 
00248     doNode(path);
00249 }
00250 
00251 std::pair<std::string, bool>
00252 ZookeeperConnection::
00253 createNode(const std::string & path,
00254            const std::string & value,
00255            bool ephemeral,
00256            bool sequence,
00257            bool mustSucceed,
00258            bool createPath)
00259 {
00260     //cerr << "createNode for " << path << endl;
00261 
00262     int flags = 0;
00263     if (ephemeral)
00264         flags |= ZOO_EPHEMERAL;
00265     if (sequence)
00266         flags |= ZOO_SEQUENCE;
00267 
00268     int pathBufLen = path.size() + 256;
00269     char pathBuf[pathBufLen];
00270 
00271     int retries = 0;
00272     for (;;) {
00273         int res = zoo_create(handle,
00274                              path.c_str(),
00275                              value.c_str(),
00276                              value.size(),
00277                              &ZOO_OPEN_ACL_UNSAFE,
00278                              flags,
00279                              pathBuf,
00280                              pathBufLen);
00281 
00282         if (!mustSucceed && res == ZNODEEXISTS)
00283             return make_pair(path, false);
00284         
00285         if (res == ZNONODE && createPath) {
00286             //cerr << "createPath for " << path << endl;
00287             this->createPath(path);
00288             continue;
00289         }
00290 
00291         if (checkRes(res, retries, "zoo_create", path.c_str()) == CR_DONE)
00292             break;
00293     }
00294 
00295     if(ephemeral) {
00296         ephemerals.insert(Node(path, value));
00297     }
00298 
00299     return make_pair<string, bool>(pathBuf, true);
00300 }
00301 
00302 bool
00303 ZookeeperConnection::
00304 deleteNode(const std::string & path, bool throwIfNodeMissing)
00305 {
00306     int retries = 0;
00307     for (;;) {
00308         int res = zoo_delete(handle, path.c_str(), -1);
00309 
00310         if (!throwIfNodeMissing && res == ZNONODE)
00311             return false;
00312         
00313         if (checkRes(res, retries, "zoo_delete", path.c_str()) == CR_DONE)
00314             break;
00315     }
00316 
00317     ephemerals.erase(path);
00318 
00319     return true;
00320 }
00321 
00322 std::string
00323 ZookeeperConnection::
00324 fixPath(const std::string & path)
00325 {
00326     if (path == "/")
00327         return path;
00328     int last = path.size();
00329     while (last > 0 && path[last - 1] == '/')
00330         --last;
00331     return string(path, 0, last);
00332 }
00333 
00334 bool
00335 ZookeeperConnection::
00336 nodeExists(const std::string & path_, Callback::Type watcher, void * watcherData)
00337 {
00338     string path = fixPath(path_);
00339 
00340     int retries = 0;
00341     for (;;) {
00342 
00343         Callback * cb = getCallback(watcher, path, watcherData);
00344 
00345         int res = zoo_wexists(handle, path.c_str(), zk_callback,
00346                               cb, 0 /* stat */);
00347         if (res == ZNONODE)
00348             return false;
00349         if (checkRes(res, retries, "zoo_wexists", path.c_str()) == CR_DONE)
00350             break;
00351     }
00352 
00353     return true;
00354 }
00355 
00356 std::string
00357 ZookeeperConnection::
00358 readNode(const std::string & path_, Callback::Type watcher, void * watcherData)
00359 {
00360     string path = fixPath(path_);
00361 
00362     char buf[16384];
00363     int bufLen = 16384;
00364 
00365     int retries = 0;
00366     for (;;bufLen = 16384) {
00367 
00368         Callback * cb = getCallback(watcher, path, watcherData);
00369 
00370         int res = zoo_wget(handle, path.c_str(), zk_callback, cb,
00371                            buf, &bufLen, 0 /* stat */);
00372         if (res == ZNONODE)
00373             return "";
00374         if (checkRes(res, retries, "zoo_wget", path.c_str()) == CR_DONE)
00375             break;
00376     }
00377 
00378     return string(buf, buf + bufLen);
00379 }
00380 
00381 void
00382 ZookeeperConnection::
00383 writeNode(const std::string & path, const std::string & value)
00384 {
00385     //cerr << "writeNode to " << path << endl;
00386 
00387     int retries = 0;
00388     for (;;) {
00389         int res = zoo_set(handle, path.c_str(), value.c_str(),
00390                           value.size(), -1);
00391         if (checkRes(res, retries, "zoo_set", path.c_str()) == CR_DONE)
00392             break;
00393     }
00394 
00395     auto i = ephemerals.find(path);
00396     if(ephemerals.end() != i) {
00397         i->value = value;
00398     }
00399 }
00400 
00401 std::vector<std::string>
00402 ZookeeperConnection::
00403 getChildren(const std::string & path_, bool failIfNodeMissing,
00404             Callback::Type watcher, void * watcherData)
00405 {
00406     string path = fixPath(path_);
00407 
00408     //cerr << "getChildren for " << path << ", " << path_ << endl;
00409 
00410     String_vector strings;
00411 
00412     std::vector<std::string> result;
00413 
00414     int retries = 0;
00415     for (;;) {
00416         Callback * cb = getCallback(watcher, path, watcherData);
00417 
00418         int res = zoo_wget_children(handle, path.c_str(),
00419                                     zk_callback, cb,
00420                                     &strings);
00421 
00422         //cerr << "zoo_wget_children for " << path << " returned "
00423         //     << res << " and " << strings.count << " strings" << endl;
00424 
00425         if (res == ZNONODE && !failIfNodeMissing) {
00426 
00427             //if (watcher)
00428             //    cerr << "********* adding wexists handler to " << path << endl;
00429 
00430             // If the node didn't exist then our watch wasn't set... so we
00431             // set it up here.
00432             if (watcher) {
00433 
00434                 Callback * cb = getCallback(watcher, path, watcherData);
00435 
00436                 res = zoo_wexists(handle, path.c_str(), zk_callback, cb, 0);
00437                 //cerr << "wexists handler returned " << res << endl;
00438 
00439                 if (res == ZOK) {
00440                     cerr << "wexists handler: node appeared" << endl;
00441                     continue;  // node suddenly appeared
00442                 }
00443                 else if (res == ZNONODE)
00444                     return result;
00445                 if (checkRes(res, retries, "zoo_wexists", path.c_str()) == CR_DONE)
00446                     return result;
00447             }
00448 
00449             return result;
00450         }
00451 
00452         if (checkRes(res, retries, "zoo_get_children", path.c_str()) == CR_DONE)
00453             break;
00454     }
00455 
00456     for (unsigned i = 0;  i < strings.count;  ++i)
00457         result.push_back(strings.data[i]);
00458 
00459     deallocate_String_vector(&strings);
00460 
00461     return result;
00462         
00463 }
00464 
00465 void
00466 ZookeeperConnection::
00467 eventHandlerFn(zhandle_t * handle,
00468                int event,
00469                int state,
00470                const char * path,
00471                void * context)
00472 {
00473     ZookeeperConnection * connection = reinterpret_cast<ZookeeperConnection *>(context);
00474 
00475     using namespace std;
00476     //cerr << "got event " << printEvent(event) << " state " << printState(state) << " on path " << path << endl;
00477     connection->connectMutex.unlock();
00478 
00479     if(state == ZOO_EXPIRED_SESSION_STATE) {
00480         cerr << "until we handle proper reconnecting to ZooKeeper, we just die..." << endl;
00481         abort();
00482     }
00483 }
00484 
00485 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator