RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* zookeeper_test.cc 00002 Jeremy Barnes, 2 July 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Test of zookeeper interface. 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 00013 #include "jml/utils/smart_ptr_utils.h" 00014 #include "jml/utils/vector_utils.h" 00015 #include "jml/utils/pair_utils.h" 00016 #include "jml/utils/string_functions.h" 00017 #include "jml/utils/file_functions.h" 00018 #include "jml/arch/exception_handler.h" 00019 #include "jml/arch/timers.h" 00020 #include "soa/service/zookeeper.h" 00021 #include "soa/service/testing/zookeeper_temporary_server.h" 00022 00023 #include <thread> 00024 #include <iostream> 00025 #include <set> 00026 #include <sys/prctl.h> 00027 00028 using namespace Datacratic; 00029 00030 BOOST_AUTO_TEST_CASE( test_zookeeper_connection ) 00031 { 00032 ML::set_default_trace_exceptions(false); 00033 00034 ZooKeeper::TemporaryServer server; 00035 std::string uri = ML::format("localhost:%d", server.getPort()); 00036 00037 // avoid aborting test when killing a child process 00038 signal(SIGCHLD, SIG_DFL); 00039 00040 std::thread client([=] { 00041 ZookeeperConnection zk; 00042 std::cerr << "starting client..." << std::endl; 00043 zk.connect(uri, 1.0); 00044 }); 00045 00046 ML::sleep(5.0); 00047 00048 std::cerr << "starting zookeeper..." << std::endl; 00049 server.start(); 00050 client.join(); 00051 } 00052 00053 BOOST_AUTO_TEST_CASE( test_zookeeper_crash ) 00054 { 00055 ML::set_default_trace_exceptions(false); 00056 00057 ZooKeeper::TemporaryServer server; 00058 std::string uri = ML::format("localhost:%d", server.getPort()); 00059 00060 // avoid aborting test when killing a child process 00061 signal(SIGCHLD, SIG_DFL); 00062 00063 std::cerr << "starting zookeeper..." << std::endl; 00064 server.start(); 00065 00066 std::thread client([=] { 00067 ZookeeperConnection zk; 00068 00069 std::cerr << "starting client..." << std::endl; 00070 zk.connect(uri); 00071 00072 for(;;) { 00073 auto text = zk.readNode("/hello"); 00074 if(text == "world") { 00075 break; 00076 } 00077 00078 ML::sleep(0.5); 00079 } 00080 }); 00081 00082 ML::sleep(1.0); 00083 00084 std::cerr << "crash!" << std::endl; 00085 server.shutdown(); 00086 00087 ML::sleep(1.0); 00088 00089 std::cerr << "restarting zookeeper..." << std::endl; 00090 server.start(); 00091 00092 ZookeeperConnection zk; 00093 zk.connect(uri); 00094 zk.createNode("/hello", "world", true, false); 00095 00096 zk.readNode("/hello", [](int type, std::string const & path, void * data) { 00097 std::cerr << "event type=" << type << " path=" << path << std::endl; 00098 }, 0); 00099 00100 client.join(); 00101 00102 std::cerr << "crash & restart..." << std::endl; 00103 server.shutdown(); 00104 server.start(); 00105 00106 while(zk.readNode("/hello") != "world") { 00107 ML::sleep(0.5); 00108 } 00109 } 00110 00111 BOOST_AUTO_TEST_CASE( test_zookeeper ) 00112 { 00113 ML::set_default_trace_exceptions(false); 00114 00115 ZooKeeper::TemporaryServer server; 00116 std::string uri = ML::format("localhost:%d", server.getPort()); 00117 00118 // avoid aborting test when killing a child process 00119 signal(SIGCHLD, SIG_DFL); 00120 00121 std::cerr << "starting zookeeper..." << std::endl; 00122 server.start(); 00123 00124 ZookeeperConnection zk; 00125 zk.connect(uri); 00126 00127 auto node = zk.createNode("/hello", "world", true, false); 00128 std::cerr << "nodeName = " << node.first << std::endl; 00129 00130 int forked = 0; 00131 int killed = 0; 00132 00133 std::vector<int> tasks; 00134 tasks.push_back(100); 00135 tasks.push_back(-10); 00136 tasks.push_back(+10); 00137 tasks.push_back(-20); 00138 tasks.push_back(+20); 00139 tasks.push_back(-50); 00140 tasks.push_back(+50); 00141 00142 std::vector<int> pids; 00143 00144 for(int task : tasks) { 00145 if(task > 0) { 00146 for(int i = 0; i != task; ++i) { 00147 int pid = fork(); 00148 if(pid == -1) { 00149 throw ML::Exception(errno, "fork"); 00150 } 00151 00152 if(pid == 0) { 00153 pid = getpid(); 00154 std::cerr << "process created pid=" << pid << std::endl; 00155 00156 int res = prctl(PR_SET_PDEATHSIG, SIGHUP); 00157 if(res == -1) { 00158 throw ML::Exception(errno, "prctl failed"); 00159 } 00160 00161 ML::sleep(1); 00162 00163 std::cerr << pid << " trying to connect to " << uri << std::endl; 00164 ZookeeperConnection zk; 00165 zk.connect(uri); 00166 00167 for(;;) { 00168 auto node = zk.readNode("/hello"); 00169 std::cerr << pid << " node=" << node << std::endl; 00170 if(node == "world") { 00171 break; 00172 } 00173 00174 ML::sleep(1); 00175 } 00176 00177 zk.createNode(ML::format("/%d", pid), "hello", true, false); 00178 for(;;) { 00179 ML::sleep(1); 00180 } 00181 } 00182 else { 00183 pids.push_back(pid); 00184 ++forked; 00185 } 00186 } 00187 } 00188 else { 00189 for(int i = 0; i != -task; ++i) { 00190 int k = rand() % pids.size(); 00191 int pid = pids[k]; 00192 00193 std::cerr << "killing pid=" << pid << std::endl; 00194 00195 int res = kill(pid, SIGTERM); 00196 if(res == -1) { 00197 throw ML::Exception(errno, "cannot kill child process"); 00198 } 00199 00200 int status = 0; 00201 res = waitpid(pid, &status, 0); 00202 if (res == -1) { 00203 throw ML::Exception(errno, "failed to wait for child process to shutdown"); 00204 } 00205 00206 std::swap(pids[k], pids.back()); 00207 pids.pop_back(); 00208 00209 ++killed; 00210 } 00211 } 00212 } 00213 00214 bool ok = false; 00215 while(!ok) { 00216 auto children = zk.getChildren("/"); 00217 std::cerr << "children = " << children << std::endl; 00218 00219 if(children.size() == 2 + pids.size()) { 00220 ok = true; 00221 std::set<std::string> keys(children.begin(), children.end()); 00222 for(int pid : pids) { 00223 ok &= keys.count(std::to_string(pid)); 00224 } 00225 } 00226 00227 ML::sleep(1); 00228 } 00229 00230 for(int pid : pids) { 00231 int res = kill(pid, SIGTERM); 00232 if (res == -1) { 00233 throw ML::Exception(errno, "cannot kill child process"); 00234 } 00235 00236 ++killed; 00237 } 00238 00239 for(int pid : pids) { 00240 int status = 0; 00241 int res = waitpid(pid, &status, 0); 00242 if (res == -1) { 00243 throw ML::Exception(errno, "failed to wait for child process to shutdown"); 00244 } 00245 } 00246 00247 std::cerr << "number of process forked: " << forked << std::endl; 00248 std::cerr << "number of process killed: " << killed << std::endl; 00249 00250 BOOST_CHECK_EQUAL(forked, 180); 00251 BOOST_CHECK_EQUAL(killed, 180); 00252 } 00253