![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* launcher.h -*- C++ -*- 00002 Eric Robert, 28 February 2013 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Common launcher task structures 00006 */ 00007 00008 #pragma once 00009 00010 #include <iostream> 00011 #include <fstream> 00012 #include <string> 00013 #include <vector> 00014 #include <sys/prctl.h> 00015 #include <sys/types.h> 00016 #include <sys/wait.h> 00017 #include <sys/stat.h> 00018 #include <fcntl.h> 00019 #include <signal.h> 00020 #include <unistd.h> 00021 00022 #include "jml/arch/exception.h" 00023 #include "jml/arch/timers.h" 00024 #include "jml/utils/ring_buffer.h" 00025 #include "soa/jsoncpp/json.h" 00026 #include "soa/service/service_base.h" 00027 #include "soa/service/message_loop.h" 00028 #include "soa/service/typed_message_channel.h" 00029 00030 namespace Datacratic { 00031 00032 struct Launcher 00033 { 00034 struct Task 00035 { 00036 Task() : pid(-1), log(false), delay(5.0) { 00037 } 00038 00039 std::string const & getName() const { 00040 return name; 00041 } 00042 00043 void restart() { 00044 stop(); 00045 start(); 00046 } 00047 00048 void start() { 00049 spawn(); 00050 ML::sleep(delay); 00051 00052 for(auto & item : children) { 00053 item.start(); 00054 } 00055 } 00056 00057 void stop() { 00058 if(pid != -1 && kill(pid, 0) != -1) { 00059 int res = kill(pid, SIGTERM); 00060 if(res == -1) { 00061 throw ML::Exception(errno, "cannot kill process"); 00062 } 00063 00064 int status = 0; 00065 res = waitpid(pid, &status, 0); 00066 if(res == -1) { 00067 throw ML::Exception(errno, "failed to wait for process to shutdown"); 00068 } 00069 00070 std::cout << "killed " << name << std::endl; 00071 } 00072 00073 pid = -1; 00074 00075 for(auto & item : children) { 00076 item.stop(); 00077 } 00078 } 00079 00080 Task * findTask(int pid) const { 00081 if(this->pid == pid) { 00082 return (Task *) this; 00083 } 00084 00085 for(auto & item : children) { 00086 Task * task = item.findTask(pid); 00087 if(task) { 00088 return task; 00089 } 00090 } 00091 00092 return 0; 00093 } 00094 00095 void script(int & i, std::ostream & file) { 00096 file << "tmux new-window -d -t rtb:" << ++i << " -n '" << name << "' 'tail -F ./logs/" << name << ".log'" << std::endl; 00097 for(auto & item : children) { 00098 item.script(i, file); 00099 } 00100 } 00101 00102 void script(int & i, std::ostream & file, std::string const & node) { 00103 file << "tmux new-window -d -t rtb:" << ++i << " -n '" << name << "' 'ssh " << node << " \"tail -F " << root << "/logs/" << name << ".log\"'" << std::endl; 00104 for(auto & item : children) { 00105 item.script(i, file, node); 00106 } 00107 } 00108 00109 friend std::ostream & operator<<(std::ostream & stream, Task & task) { 00110 task.print(stream); 00111 return stream; 00112 } 00113 00114 static Task createFromJson(Json::Value const & json) { 00115 Task result; 00116 for(auto i = json.begin(), end = json.end(); i != end; ++i) { 00117 if(i.memberName() == "children") { 00118 auto & json = *i; 00119 if(!json.empty() && !json.isArray()) { 00120 throw ML::Exception("children is not an array"); 00121 } 00122 00123 for(auto j = json.begin(), end = json.end(); j != end; ++j) { 00124 auto & json = *j; 00125 result.children.push_back(createFromJson(json)); 00126 } 00127 } 00128 else if(i.memberName() == "name") { 00129 result.name = i->asString(); 00130 } 00131 else if(i.memberName() == "path") { 00132 result.path = i->asString(); 00133 } 00134 else if(i.memberName() == "root") { 00135 result.root = i->asString(); 00136 } 00137 else if(i.memberName() == "log") { 00138 result.log = i->asBool(); 00139 } 00140 else if(i.memberName() == "delay") { 00141 result.delay = i->asDouble(); 00142 } 00143 else if(i.memberName() == "arg") { 00144 auto & json = *i; 00145 if(!json.empty() && !json.isArray()) { 00146 throw ML::Exception("'arg' is not an array"); 00147 } 00148 00149 for(auto j = json.begin(), end = json.end(); j != end; ++j) { 00150 auto & json = *j; 00151 result.arg.push_back(json.asString()); 00152 } 00153 } 00154 else { 00155 throw ML::Exception("unknown task field '" + i.memberName() + "'"); 00156 } 00157 } 00158 00159 return result; 00160 } 00161 00162 private: 00163 void print(std::ostream & stream, std::string indent = "") { 00164 stream << indent << name << " (" << pid << ")" << std::endl; 00165 stream << indent << "$> " << path; 00166 00167 for(auto & item : arg) { 00168 stream << " " << item; 00169 } 00170 00171 stream << std::endl; 00172 indent += " "; 00173 00174 for(auto & item : children) { 00175 item.print(stream, indent); 00176 } 00177 } 00178 00179 std::vector<char const *> makeArgs() { 00180 std::vector<char const *> result; 00181 00182 result.push_back(path.c_str()); 00183 for(auto & item : arg) { 00184 result.push_back(item.c_str()); 00185 } 00186 00187 result.push_back(0); 00188 return result; 00189 } 00190 00191 std::vector<char const *> makeEnvs() { 00192 std::vector<char const *> result; 00193 result.push_back(0); 00194 return result; 00195 } 00196 00197 void spawn() { 00198 std::cout << "launch " << name << std::endl; 00199 pid = fork(); 00200 00201 if(pid == -1) { 00202 throw ML::Exception(errno, "fork failed"); 00203 } 00204 00205 if(pid == 0) { 00206 signal(SIGTERM, SIG_DFL); 00207 signal(SIGKILL, SIG_DFL); 00208 00209 int res = prctl(PR_SET_PDEATHSIG, SIGHUP); 00210 if(res == -1) { 00211 throw ML::Exception(errno, "prctl failed"); 00212 } 00213 00214 if(log) { 00215 redirect(); 00216 } 00217 00218 res = chdir(root.c_str()); 00219 if(res == -1) { 00220 throw ML::Exception(errno, "chdir failed"); 00221 } 00222 00223 std::vector<char const *> args = makeArgs(); 00224 std::vector<char const *> envs = makeEnvs(); 00225 00226 res = execvpe(path.c_str(), (char **) &args[0], (char **) &envs[0]); 00227 if (res == -1) { 00228 throw ML::Exception(errno, "process failed to start"); 00229 } 00230 00231 throw ML::Exception(errno, "execvp failed"); 00232 } 00233 } 00234 00235 void redirect() { 00236 std::string filename = ML::format("./logs/%s-%d.log", name, getpid()); 00237 00238 int fd = open(filename.c_str(), O_WRONLY|O_CREAT, 0666); 00239 if(fd == -1) { 00240 throw ML::Exception(errno, "open log '" + name + "' failed"); 00241 } 00242 00243 if(-1 == dup2(fd, 1)) { 00244 throw ML::Exception(errno, "failed to redirect STDOUT to file"); 00245 } 00246 00247 if(-1 == dup2(1, 2)) { 00248 throw ML::Exception(errno, "failed to redirect STDERR to STDOUT"); 00249 } 00250 00251 std::string ln = ML::format("ln -s -f ./%s-%d.log ./logs/%s.log", name, getpid(), name); 00252 if(-1 == system(ln.c_str())) { 00253 throw ML::Exception(errno, "failed to create symbolic link"); 00254 } 00255 00256 close(fd); 00257 } 00258 00259 int pid; 00260 std::vector<Task> children; 00261 std::string name; 00262 std::string path; 00263 std::string root; 00264 std::vector<std::string> arg; 00265 bool log; 00266 double delay; 00267 }; 00268 00269 struct Node 00270 { 00271 std::string const & getName() const { 00272 return name; 00273 } 00274 00275 Task * findTask(int pid) const { 00276 for(auto & item : tasks) { 00277 Task * task = item.findTask(pid); 00278 if(task) { 00279 return task; 00280 } 00281 } 00282 00283 return 0; 00284 } 00285 00286 void restart() { 00287 for(auto & item : tasks) { 00288 item.restart(); 00289 } 00290 } 00291 00292 void script(int & i, std::ostream & file) { 00293 for(auto & item : tasks) { 00294 item.script(i, file); 00295 } 00296 } 00297 00298 void script(int & i, std::ostream & file, std::string const & node) { 00299 for(auto & item : tasks) { 00300 item.script(i, file, node); 00301 } 00302 } 00303 00304 friend std::ostream & operator<<(std::ostream & stream, Node & node) { 00305 stream << node.name << std::endl; 00306 for(int i = 0; i != node.tasks.size(); ++i) { 00307 stream << "task #" << i << std::endl << node.tasks[i] << std::endl; 00308 } 00309 00310 return stream; 00311 } 00312 00313 static Node createFromJson(Json::Value const & json) { 00314 Node result; 00315 for(auto i = json.begin(), end = json.end(); i != end; ++i) { 00316 if(i.memberName() == "tasks") { 00317 auto & json = *i; 00318 if(!json.empty() && !json.isArray()) { 00319 throw ML::Exception("'tasks' is not an array"); 00320 } 00321 00322 for(auto j = json.begin(), end = json.end(); j != end; ++j) { 00323 auto & json = *j; 00324 result.tasks.push_back(Task::createFromJson(json)); 00325 } 00326 } 00327 else if(i.memberName() == "name") { 00328 result.name = i->asString(); 00329 } 00330 else if(i.memberName() == "root") { 00331 result.root = i->asString(); 00332 } 00333 else { 00334 throw ML::Exception("unknown node field '" + i.memberName() + "'"); 00335 } 00336 } 00337 00338 return result; 00339 } 00340 00341 private: 00342 std::string name; 00343 std::string root; 00344 std::vector<Task> tasks; 00345 }; 00346 00347 struct Sequence 00348 { 00349 Node * getNode(std::string const & name) { 00350 for(auto & item : nodes) { 00351 if(item.getName() == name) { 00352 return &item; 00353 } 00354 } 00355 00356 return 0; 00357 } 00358 00359 void script(std::string const & filename, std::string const & sh, std::string const & node, bool master) { 00360 std::ofstream file(sh); 00361 if(!file) { 00362 throw ML::Exception("cannot create " + sh + " script"); 00363 } 00364 00365 file << "#!/bin/bash" << std::endl; 00366 file << std::endl; 00367 file << "tmux kill-session -t rtb" << std::endl; 00368 file << "tmux new-session -d -s rtb './build/x86_64/bin/launcher --node " << node << " --script " << sh << (master ? " --master" : "") << " --launch" << " " << filename << "'" << std::endl; 00369 file << "tmux rename-window 'launcher'" << std::endl; 00370 00371 int i = 0; 00372 for(int j = 0; j != nodes.size(); ++j) { 00373 auto & item = nodes[j]; 00374 auto & name = item.getName(); 00375 if(name == node) { 00376 item.script(i, file); 00377 } 00378 else if(master) { 00379 item.script(i, file, name); 00380 } 00381 } 00382 00383 file << "tmux attach -t rtb" << std::endl; 00384 file.close(); 00385 00386 chmod(sh.c_str(), 0755); 00387 } 00388 00389 friend std::ostream & operator<<(std::ostream & stream, Sequence & sequence) { 00390 for(auto & item : sequence.nodes) { 00391 stream << "node:" << std::endl << item << std::endl; 00392 } 00393 00394 return stream; 00395 } 00396 00397 static Sequence createFromJson(Json::Value const & json) { 00398 Sequence result; 00399 for(auto i = json.begin(), end = json.end(); i != end; ++i) { 00400 if(i.memberName() == "nodes") { 00401 auto & json = *i; 00402 if(!json.empty() && !json.isArray()) { 00403 throw ML::Exception("'nodes' is not an array"); 00404 } 00405 00406 for(auto j = json.begin(), end = json.end(); j != end; ++j) { 00407 auto & json = *j; 00408 result.nodes.push_back(Node::createFromJson(json)); 00409 } 00410 } 00411 else { 00412 throw ML::Exception("unknown launch sequence field '" + i.memberName() + "'"); 00413 } 00414 } 00415 00416 return result; 00417 } 00418 00419 private: 00420 std::vector<Node> nodes; 00421 }; 00422 00423 struct Service : public MessageLoop 00424 { 00425 void run(Json::Value const & root, std::string const & name, std::string const & filename, std::string const & sh, bool launch, bool master) { 00426 sequence = Datacratic::Launcher::Sequence::createFromJson(root); 00427 00428 if(!sh.empty()) { 00429 sequence.script(filename, sh, name, master); 00430 } 00431 00432 if(launch) { 00433 node = sequence.getNode(name); 00434 if(!node) { 00435 throw ML::Exception("cannot find node " + name); 00436 } 00437 00438 int res = system("mkdir -p ./logs"); 00439 if(res == -1) { 00440 throw ML::Exception("cannot create ./logs directory"); 00441 } 00442 00443 start(); 00444 00445 struct sigaction sa; 00446 memset(&sa, 0, sizeof(sa)); 00447 sa.sa_handler = &Service::sigchld; 00448 sigaction(SIGCHLD, &sa, 0); 00449 00450 node->restart(); 00451 00452 for(;;) { 00453 ML::sleep(1.0); 00454 } 00455 } 00456 } 00457 00458 static Service & get() { 00459 static Service instance; 00460 return instance; 00461 } 00462 00463 private: 00464 Service() : events(65536) { 00465 events.onEvent = std::bind<void>(&Service::onDeath, this, std::placeholders::_1); 00466 addSource("Launcher::Service::events", events); 00467 } 00468 00469 void onDeath(int pid) { 00470 Task * item = node->findTask(pid); 00471 00472 std::time_t now = std::time(0); 00473 std::cerr << "crash! " << (item ? item->getName() : "?") << " detected at " << std::asctime(std::localtime(&now)) << std::endl; 00474 if(item) { 00475 item->restart(); 00476 } 00477 } 00478 00479 static void sigchld(int pid) { 00480 for(;;) { 00481 int status = 0; 00482 int pid = waitpid(-1, &status, WNOHANG); 00483 if(pid == 0 || pid == -1) { 00484 break; 00485 } 00486 00487 Service::get().events.push(pid); 00488 } 00489 } 00490 00491 // child death events 00492 TypedMessageSink<int> events; 00493 00494 // node associated with this service 00495 Node * node; 00496 00497 // launching sequence 00498 Sequence sequence; 00499 }; 00500 }; 00501 00502 } // namespace RTBKIT