RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* zookeeper.cc 00002 Jeremy Barnes, 17 August 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "soa/service/zookeeper.h" 00008 #include "jml/arch/timers.h" 00009 00010 using namespace std; 00011 00012 namespace Datacratic { 00013 00014 namespace { 00015 00016 struct Init { 00017 Init() 00018 { 00019 zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); 00020 } 00021 } init; 00022 00023 void zk_callback(zhandle_t * ah, int type, int state, const char * path, void * user) { 00024 auto cb = reinterpret_cast<ZookeeperConnection::Callback *>(user); 00025 if(cb) { 00026 cb->unlink(); 00027 cb->call(type); 00028 } 00029 } 00030 00031 } // file scope 00032 00033 /*****************************************************************************/ 00034 /* ZOOKEEPER CONNECTION */ 00035 /*****************************************************************************/ 00036 00037 ZookeeperConnection:: 00038 ZookeeperConnection() 00039 : recvTimeout(1000 /* one second */), 00040 handle(0) 00041 { 00042 } 00043 00044 std::string 00045 ZookeeperConnection:: 00046 printEvent(int eventType) 00047 { 00048 if (eventType == ZOO_CREATED_EVENT) 00049 return "CREATED"; 00050 else if (eventType == ZOO_DELETED_EVENT) 00051 return "DELETED"; 00052 else if (eventType == ZOO_CHANGED_EVENT) 00053 return "CHANGED"; 00054 else if (eventType == ZOO_CHILD_EVENT) 00055 return "CHILD"; 00056 else if (eventType == ZOO_SESSION_EVENT) 00057 return "SESSION"; 00058 else if (eventType == ZOO_NOTWATCHING_EVENT) 00059 return "NOTWATCHING"; 00060 else 00061 return ML::format("UNKNOWN(%d)", eventType); 00062 } 00063 00064 std::string 00065 ZookeeperConnection:: 00066 printState(int state) 00067 { 00068 if (state == ZOO_EXPIRED_SESSION_STATE) 00069 return "EXPIRED"; 00070 else if (state == ZOO_AUTH_FAILED_STATE) 00071 return "AUTH_FAILED"; 00072 else if (state == ZOO_CONNECTING_STATE) 00073 return "CONNECTING"; 00074 else if (state == ZOO_CONNECTED_STATE) 00075 return "CONNECTED"; 00076 else if (state == ZOO_ASSOCIATING_STATE) 00077 return "ASSOCIATING"; 00078 else 00079 return ML::format("UNKNOWN(%d)", state); 00080 } 00081 00082 void 00083 ZookeeperConnection:: 00084 connect(const std::string & host, 00085 double timeoutInSeconds) 00086 { 00087 if (handle) 00088 throw ML::Exception("can't connect; handle already exists"); 00089 00090 if (!connectMutex.try_lock()) 00091 throw ML::Exception("attempting to connect from two threads"); 00092 00093 this->host = host; 00094 00095 ML::Call_Guard guard([&] () { connectMutex.unlock(); }); 00096 00097 int wait = timeoutInSeconds * 1000; 00098 int timeout = wait; 00099 int times = 3; 00100 00101 for(int i = 0; i != times; ++i) { 00102 handle = zookeeper_init(host.c_str(), eventHandlerFn, recvTimeout, 0, this, 0); 00103 00104 if (!handle) 00105 throw ML::Exception(errno, "failed to initialize ZooKeeper at " + host); 00106 00107 if (connectMutex.try_lock_for(std::chrono::milliseconds(timeout))) 00108 return; 00109 00110 zookeeper_close(handle); 00111 00112 int ms = wait + (std::rand() % wait); 00113 wait *= 2; 00114 00115 ML::sleep(ms / 1000.0); 00116 } 00117 00118 throw ML::Exception("connection to Zookeeper timed out"); 00119 } 00120 00121 void 00122 ZookeeperConnection:: 00123 reconnect() 00124 { 00125 if(handle) { 00126 zookeeper_close(handle); 00127 handle = 0; 00128 } 00129 00130 connect(host); 00131 00132 for(auto & item : ephemerals) { 00133 createNode(item.path, item.value, true, false, true, true); 00134 } 00135 00136 while(callbacks.next != &callbacks) { 00137 auto i = callbacks.next; 00138 i->unlink(); 00139 i->call(ZOO_DELETED_EVENT); 00140 } 00141 } 00142 00143 void 00144 ZookeeperConnection:: 00145 close() 00146 { 00147 using namespace std; 00148 if (!handle) 00149 return; 00150 int res = zookeeper_close(handle); 00151 handle = 0; 00152 if (res != ZOK) 00153 cerr << "warning: closing returned error: " << zerror(res) << endl; 00154 00155 ephemerals.clear(); 00156 } 00157 00158 ZookeeperConnection::CheckResult 00159 ZookeeperConnection:: 00160 checkRes(int returnCode, int & retries, 00161 const char * operation, const char * path) 00162 { 00163 int maxRetries = 3; 00164 00165 if (returnCode == ZOK) 00166 return CR_DONE; 00167 00168 if (retries < maxRetries 00169 && is_unrecoverable(handle) != ZOK) { 00170 reconnect(); 00171 ++retries; 00172 return CR_RETRY; 00173 } 00174 00175 if (retries < maxRetries 00176 && (returnCode == ZCLOSING 00177 || returnCode == ZSESSIONMOVED 00178 || returnCode == ZSESSIONEXPIRED 00179 || returnCode == ZINVALIDSTATE 00180 || returnCode == ZOPERATIONTIMEOUT 00181 || returnCode == ZCONNECTIONLOSS)) { 00182 reconnect(); 00183 ++retries; 00184 return CR_RETRY; 00185 } 00186 00187 cerr << "zookeeper error on " << operation << ", path " 00188 << path << ": " << zerror(returnCode) << endl; 00189 throw ML::Exception("Zookeeper error on %s, path %s: %s", 00190 operation, path, zerror(returnCode)); 00191 } 00192 00193 void 00194 ZookeeperConnection:: 00195 createPath(const std::string & path) 00196 { 00197 if (path == "/") 00198 return; 00199 string::size_type pos = path.rfind('/'); 00200 if (pos == string::npos) 00201 return; 00202 00203 string prefix(path, 0, pos); 00204 00205 int retries = 0; 00206 for (;;) { 00207 int res = zoo_create(handle, 00208 prefix.c_str(), 00209 0, 0, 00210 &ZOO_OPEN_ACL_UNSAFE, 00211 0, 0, 0); 00212 00213 if (res == ZNODEEXISTS) 00214 return; 00215 00216 if (res == ZNONODE) { 00217 createPath(prefix); 00218 continue; 00219 } 00220 00221 if (checkRes(res, retries, "zoo_create", path.c_str()) == CR_DONE) 00222 break; 00223 } 00224 } 00225 00226 void 00227 ZookeeperConnection:: 00228 removePath(const std::string & path_) 00229 { 00230 string path = fixPath(path_); 00231 00232 std::function<void (const std::string &)> doNode 00233 = [&] (const std::string & currentPath) 00234 { 00235 vector<string> children 00236 = getChildren(currentPath, 00237 false /* throwIfNodeMissing */); 00238 00239 for (auto child: children) 00240 if (currentPath[currentPath.size() - 1] == '/') 00241 doNode(currentPath + child); 00242 else 00243 doNode(currentPath + "/" + child); 00244 00245 deleteNode(currentPath, false /* throwIfNodeMissing */); 00246 }; 00247 00248 doNode(path); 00249 } 00250 00251 std::pair<std::string, bool> 00252 ZookeeperConnection:: 00253 createNode(const std::string & path, 00254 const std::string & value, 00255 bool ephemeral, 00256 bool sequence, 00257 bool mustSucceed, 00258 bool createPath) 00259 { 00260 //cerr << "createNode for " << path << endl; 00261 00262 int flags = 0; 00263 if (ephemeral) 00264 flags |= ZOO_EPHEMERAL; 00265 if (sequence) 00266 flags |= ZOO_SEQUENCE; 00267 00268 int pathBufLen = path.size() + 256; 00269 char pathBuf[pathBufLen]; 00270 00271 int retries = 0; 00272 for (;;) { 00273 int res = zoo_create(handle, 00274 path.c_str(), 00275 value.c_str(), 00276 value.size(), 00277 &ZOO_OPEN_ACL_UNSAFE, 00278 flags, 00279 pathBuf, 00280 pathBufLen); 00281 00282 if (!mustSucceed && res == ZNODEEXISTS) 00283 return make_pair(path, false); 00284 00285 if (res == ZNONODE && createPath) { 00286 //cerr << "createPath for " << path << endl; 00287 this->createPath(path); 00288 continue; 00289 } 00290 00291 if (checkRes(res, retries, "zoo_create", path.c_str()) == CR_DONE) 00292 break; 00293 } 00294 00295 if(ephemeral) { 00296 ephemerals.insert(Node(path, value)); 00297 } 00298 00299 return make_pair<string, bool>(pathBuf, true); 00300 } 00301 00302 bool 00303 ZookeeperConnection:: 00304 deleteNode(const std::string & path, bool throwIfNodeMissing) 00305 { 00306 int retries = 0; 00307 for (;;) { 00308 int res = zoo_delete(handle, path.c_str(), -1); 00309 00310 if (!throwIfNodeMissing && res == ZNONODE) 00311 return false; 00312 00313 if (checkRes(res, retries, "zoo_delete", path.c_str()) == CR_DONE) 00314 break; 00315 } 00316 00317 ephemerals.erase(path); 00318 00319 return true; 00320 } 00321 00322 std::string 00323 ZookeeperConnection:: 00324 fixPath(const std::string & path) 00325 { 00326 if (path == "/") 00327 return path; 00328 int last = path.size(); 00329 while (last > 0 && path[last - 1] == '/') 00330 --last; 00331 return string(path, 0, last); 00332 } 00333 00334 bool 00335 ZookeeperConnection:: 00336 nodeExists(const std::string & path_, Callback::Type watcher, void * watcherData) 00337 { 00338 string path = fixPath(path_); 00339 00340 int retries = 0; 00341 for (;;) { 00342 00343 Callback * cb = getCallback(watcher, path, watcherData); 00344 00345 int res = zoo_wexists(handle, path.c_str(), zk_callback, 00346 cb, 0 /* stat */); 00347 if (res == ZNONODE) 00348 return false; 00349 if (checkRes(res, retries, "zoo_wexists", path.c_str()) == CR_DONE) 00350 break; 00351 } 00352 00353 return true; 00354 } 00355 00356 std::string 00357 ZookeeperConnection:: 00358 readNode(const std::string & path_, Callback::Type watcher, void * watcherData) 00359 { 00360 string path = fixPath(path_); 00361 00362 char buf[16384]; 00363 int bufLen = 16384; 00364 00365 int retries = 0; 00366 for (;;bufLen = 16384) { 00367 00368 Callback * cb = getCallback(watcher, path, watcherData); 00369 00370 int res = zoo_wget(handle, path.c_str(), zk_callback, cb, 00371 buf, &bufLen, 0 /* stat */); 00372 if (res == ZNONODE) 00373 return ""; 00374 if (checkRes(res, retries, "zoo_wget", path.c_str()) == CR_DONE) 00375 break; 00376 } 00377 00378 return string(buf, buf + bufLen); 00379 } 00380 00381 void 00382 ZookeeperConnection:: 00383 writeNode(const std::string & path, const std::string & value) 00384 { 00385 //cerr << "writeNode to " << path << endl; 00386 00387 int retries = 0; 00388 for (;;) { 00389 int res = zoo_set(handle, path.c_str(), value.c_str(), 00390 value.size(), -1); 00391 if (checkRes(res, retries, "zoo_set", path.c_str()) == CR_DONE) 00392 break; 00393 } 00394 00395 auto i = ephemerals.find(path); 00396 if(ephemerals.end() != i) { 00397 i->value = value; 00398 } 00399 } 00400 00401 std::vector<std::string> 00402 ZookeeperConnection:: 00403 getChildren(const std::string & path_, bool failIfNodeMissing, 00404 Callback::Type watcher, void * watcherData) 00405 { 00406 string path = fixPath(path_); 00407 00408 //cerr << "getChildren for " << path << ", " << path_ << endl; 00409 00410 String_vector strings; 00411 00412 std::vector<std::string> result; 00413 00414 int retries = 0; 00415 for (;;) { 00416 Callback * cb = getCallback(watcher, path, watcherData); 00417 00418 int res = zoo_wget_children(handle, path.c_str(), 00419 zk_callback, cb, 00420 &strings); 00421 00422 //cerr << "zoo_wget_children for " << path << " returned " 00423 // << res << " and " << strings.count << " strings" << endl; 00424 00425 if (res == ZNONODE && !failIfNodeMissing) { 00426 00427 //if (watcher) 00428 // cerr << "********* adding wexists handler to " << path << endl; 00429 00430 // If the node didn't exist then our watch wasn't set... so we 00431 // set it up here. 00432 if (watcher) { 00433 00434 Callback * cb = getCallback(watcher, path, watcherData); 00435 00436 res = zoo_wexists(handle, path.c_str(), zk_callback, cb, 0); 00437 //cerr << "wexists handler returned " << res << endl; 00438 00439 if (res == ZOK) { 00440 cerr << "wexists handler: node appeared" << endl; 00441 continue; // node suddenly appeared 00442 } 00443 else if (res == ZNONODE) 00444 return result; 00445 if (checkRes(res, retries, "zoo_wexists", path.c_str()) == CR_DONE) 00446 return result; 00447 } 00448 00449 return result; 00450 } 00451 00452 if (checkRes(res, retries, "zoo_get_children", path.c_str()) == CR_DONE) 00453 break; 00454 } 00455 00456 for (unsigned i = 0; i < strings.count; ++i) 00457 result.push_back(strings.data[i]); 00458 00459 deallocate_String_vector(&strings); 00460 00461 return result; 00462 00463 } 00464 00465 void 00466 ZookeeperConnection:: 00467 eventHandlerFn(zhandle_t * handle, 00468 int event, 00469 int state, 00470 const char * path, 00471 void * context) 00472 { 00473 ZookeeperConnection * connection = reinterpret_cast<ZookeeperConnection *>(context); 00474 00475 using namespace std; 00476 //cerr << "got event " << printEvent(event) << " state " << printState(state) << " on path " << path << endl; 00477 connection->connectMutex.unlock(); 00478 00479 if(state == ZOO_EXPIRED_SESSION_STATE) { 00480 cerr << "until we handle proper reconnecting to ZooKeeper, we just die..." << endl; 00481 abort(); 00482 } 00483 } 00484 00485 } // namespace Datacratic