RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/launcher/launcher.h
00001 /* launcher.h                                                    -*- C++ -*-
00002    Eric Robert, 28 February 2013
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004    
00005    Common launcher task structures
00006 */
00007 
00008 #pragma once
00009 
00010 #include <iostream>
00011 #include <fstream>
00012 #include <string>
00013 #include <vector>
00014 #include <sys/prctl.h>
00015 #include <sys/types.h>
00016 #include <sys/wait.h>
00017 #include <sys/stat.h>
00018 #include <fcntl.h>
00019 #include <signal.h>
00020 #include <unistd.h>
00021 
00022 #include "jml/arch/exception.h"
00023 #include "jml/arch/timers.h"
00024 #include "jml/utils/ring_buffer.h"
00025 #include "soa/jsoncpp/json.h"
00026 #include "soa/service/service_base.h"
00027 #include "soa/service/message_loop.h"
00028 #include "soa/service/typed_message_channel.h"
00029 
00030 namespace Datacratic {
00031 
00032 struct Launcher
00033 {
00034     struct Task
00035     {
00036         Task() : pid(-1), log(false), delay(5.0) {
00037         }
00038 
00039         std::string const & getName() const {
00040             return name;
00041         }
00042 
00043         void restart() {
00044             stop();
00045             start();
00046         }
00047 
00048         void start() {
00049             spawn();
00050             ML::sleep(delay);
00051 
00052             for(auto & item : children) {
00053                 item.start();
00054             }
00055         }
00056 
00057         void stop() {
00058             if(pid != -1 && kill(pid, 0) != -1) {
00059                 int res = kill(pid, SIGTERM);
00060                 if(res == -1) {
00061                     throw ML::Exception(errno, "cannot kill process");
00062                 }
00063 
00064                 int status = 0;
00065                 res = waitpid(pid, &status, 0);
00066                 if(res == -1) {
00067                     throw ML::Exception(errno, "failed to wait for process to shutdown");
00068                 }
00069 
00070                 std::cout << "killed " << name << std::endl;
00071             }
00072 
00073             pid = -1;
00074 
00075             for(auto & item : children) {
00076                 item.stop();
00077             }
00078         }
00079 
00080         Task * findTask(int pid) const {
00081             if(this->pid == pid) {
00082                 return (Task *) this;
00083             }
00084 
00085             for(auto & item : children) {
00086                 Task * task = item.findTask(pid);
00087                 if(task) {
00088                     return task;
00089                 }
00090             }
00091 
00092             return 0;
00093         }
00094 
00095         void script(int & i, std::ostream & file) {
00096             file << "tmux new-window -d -t rtb:" << ++i << " -n '" << name << "' 'tail -F ./logs/" << name << ".log'" << std::endl;
00097             for(auto & item : children) {
00098                 item.script(i, file);
00099             }
00100         }
00101 
00102         void script(int & i, std::ostream & file, std::string const & node) {
00103             file << "tmux new-window -d -t rtb:" << ++i << " -n '" << name << "' 'ssh " << node << " \"tail -F " << root << "/logs/" << name << ".log\"'" << std::endl;
00104             for(auto & item : children) {
00105                 item.script(i, file, node);
00106             }
00107         }
00108 
00109         friend std::ostream & operator<<(std::ostream & stream, Task & task) {
00110             task.print(stream);
00111             return stream;
00112         }
00113 
00114         static Task createFromJson(Json::Value const & json) {
00115             Task result;
00116             for(auto i = json.begin(), end = json.end(); i != end; ++i) {
00117                 if(i.memberName() == "children") {
00118                     auto & json = *i;
00119                     if(!json.empty() && !json.isArray()) {
00120                         throw ML::Exception("children is not an array");
00121                     }
00122 
00123                     for(auto j = json.begin(), end = json.end(); j != end; ++j) {
00124                         auto & json = *j;
00125                         result.children.push_back(createFromJson(json));
00126                     }
00127                 }
00128                 else if(i.memberName() == "name") {
00129                     result.name = i->asString();
00130                 }
00131                 else if(i.memberName() == "path") {
00132                     result.path = i->asString();
00133                 }
00134                 else if(i.memberName() == "root") {
00135                     result.root = i->asString();
00136                 }
00137                 else if(i.memberName() == "log") {
00138                     result.log = i->asBool();
00139                 }
00140                 else if(i.memberName() == "delay") {
00141                     result.delay = i->asDouble();
00142                 }
00143                 else if(i.memberName() == "arg") {
00144                     auto & json = *i;
00145                     if(!json.empty() && !json.isArray()) {
00146                         throw ML::Exception("'arg' is not an array");
00147                     }
00148 
00149                     for(auto j = json.begin(), end = json.end(); j != end; ++j) {
00150                         auto & json = *j;
00151                         result.arg.push_back(json.asString());
00152                     }
00153                 }
00154                 else {
00155                     throw ML::Exception("unknown task field '" + i.memberName() + "'");
00156                 }
00157             }
00158 
00159             return result;
00160         }
00161 
00162     private:
00163         void print(std::ostream & stream, std::string indent = "") {
00164             stream << indent << name << " (" << pid << ")" << std::endl;
00165             stream << indent << "$> " << path;
00166 
00167             for(auto & item : arg) {
00168                 stream << " " << item;
00169             }
00170 
00171             stream << std::endl;
00172             indent += "  ";
00173 
00174             for(auto & item : children) {
00175                 item.print(stream, indent);
00176             }
00177         }
00178 
00179         std::vector<char const *> makeArgs() {
00180             std::vector<char const *> result;
00181 
00182             result.push_back(path.c_str());
00183             for(auto & item : arg) {
00184                 result.push_back(item.c_str());
00185             }
00186 
00187             result.push_back(0);
00188             return result;
00189         }
00190 
00191         std::vector<char const *> makeEnvs() {
00192             std::vector<char const *> result;
00193             result.push_back(0);
00194             return result;
00195         }
00196 
00197         void spawn() {
00198             std::cout << "launch " << name << std::endl;
00199             pid = fork();
00200 
00201             if(pid == -1) {
00202                 throw ML::Exception(errno, "fork failed");
00203             }
00204 
00205             if(pid == 0) {
00206                 signal(SIGTERM, SIG_DFL);
00207                 signal(SIGKILL, SIG_DFL);
00208 
00209                 int res = prctl(PR_SET_PDEATHSIG, SIGHUP);
00210                 if(res == -1) {
00211                     throw ML::Exception(errno, "prctl failed");
00212                 }
00213 
00214                 if(log) {
00215                     redirect();
00216                 }
00217 
00218                 res = chdir(root.c_str());
00219                 if(res == -1) {
00220                     throw ML::Exception(errno, "chdir failed");
00221                 }
00222 
00223                 std::vector<char const *> args = makeArgs();
00224                 std::vector<char const *> envs = makeEnvs();
00225 
00226                 res = execvpe(path.c_str(), (char **) &args[0], (char **) &envs[0]);
00227                 if (res == -1) {
00228                     throw ML::Exception(errno, "process failed to start");
00229                 }
00230 
00231                 throw ML::Exception(errno, "execvp failed");
00232             }
00233         }
00234 
00235         void redirect() {
00236             std::string filename = ML::format("./logs/%s-%d.log", name, getpid());
00237 
00238             int fd = open(filename.c_str(), O_WRONLY|O_CREAT, 0666);
00239             if(fd == -1) {
00240                 throw ML::Exception(errno, "open log '" + name + "' failed");
00241             }
00242 
00243             if(-1 == dup2(fd, 1)) {
00244                 throw ML::Exception(errno, "failed to redirect STDOUT to file");
00245             }
00246 
00247             if(-1 == dup2(1, 2)) {
00248                 throw ML::Exception(errno, "failed to redirect STDERR to STDOUT");
00249             }
00250 
00251             std::string ln = ML::format("ln -s -f ./%s-%d.log ./logs/%s.log", name, getpid(), name);
00252             if(-1 == system(ln.c_str())) {
00253                 throw ML::Exception(errno, "failed to create symbolic link");
00254             }
00255 
00256             close(fd);
00257         }
00258 
00259         int pid;
00260         std::vector<Task> children;
00261         std::string name;
00262         std::string path;
00263         std::string root;
00264         std::vector<std::string> arg;
00265         bool log;
00266         double delay;
00267     };
00268 
00269     struct Node
00270     {
00271         std::string const & getName() const {
00272             return name;
00273         }
00274 
00275         Task * findTask(int pid) const {
00276             for(auto & item : tasks) {
00277                 Task * task = item.findTask(pid);
00278                 if(task) {
00279                     return task;
00280                 }
00281             }
00282 
00283             return 0;
00284         }
00285 
00286         void restart() {
00287             for(auto & item : tasks) {
00288                 item.restart();
00289             }
00290         }
00291 
00292         void script(int & i, std::ostream & file) {
00293             for(auto & item : tasks) {
00294                 item.script(i, file);
00295             }
00296         }
00297 
00298         void script(int & i, std::ostream & file, std::string const & node) {
00299             for(auto & item : tasks) {
00300                 item.script(i, file, node);
00301             }
00302         }
00303 
00304         friend std::ostream & operator<<(std::ostream & stream, Node & node) {
00305             stream << node.name << std::endl;
00306             for(int i = 0; i != node.tasks.size(); ++i) {
00307                 stream << "task #" << i << std::endl << node.tasks[i] << std::endl;
00308             }
00309 
00310             return stream;
00311         }
00312 
00313         static Node createFromJson(Json::Value const & json) {
00314             Node result;
00315             for(auto i = json.begin(), end = json.end(); i != end; ++i) {
00316                 if(i.memberName() == "tasks") {
00317                     auto & json = *i;
00318                     if(!json.empty() && !json.isArray()) {
00319                         throw ML::Exception("'tasks' is not an array");
00320                     }
00321 
00322                     for(auto j = json.begin(), end = json.end(); j != end; ++j) {
00323                         auto & json = *j;
00324                         result.tasks.push_back(Task::createFromJson(json));
00325                     }
00326                 }
00327                 else if(i.memberName() == "name") {
00328                     result.name = i->asString();
00329                 }
00330                 else if(i.memberName() == "root") {
00331                     result.root = i->asString();
00332                 }
00333                 else {
00334                     throw ML::Exception("unknown node field '" + i.memberName() + "'");
00335                 }
00336             }
00337 
00338             return result;
00339         }
00340 
00341     private:
00342         std::string name;
00343         std::string root;
00344         std::vector<Task> tasks;
00345     };
00346 
00347     struct Sequence
00348     {
00349         Node * getNode(std::string const & name) {
00350             for(auto & item : nodes) {
00351                 if(item.getName() == name) {
00352                     return &item;
00353                 }
00354             }
00355 
00356             return 0;
00357         }
00358 
00359         void script(std::string const & filename, std::string const & sh, std::string const & node, bool master) {
00360             std::ofstream file(sh);
00361             if(!file) {
00362                 throw ML::Exception("cannot create " + sh + " script");
00363             }
00364 
00365             file << "#!/bin/bash" << std::endl;
00366             file << std::endl;
00367             file << "tmux kill-session -t rtb" << std::endl;
00368             file << "tmux new-session -d -s rtb './build/x86_64/bin/launcher --node " << node << " --script " << sh << (master ? " --master" : "") << " --launch" << " " << filename << "'" << std::endl;
00369             file << "tmux rename-window 'launcher'" << std::endl;
00370 
00371             int i = 0;
00372             for(int j = 0; j != nodes.size(); ++j) {
00373                 auto & item = nodes[j];
00374                 auto & name = item.getName();
00375                 if(name == node) {
00376                     item.script(i, file);
00377                 }
00378                 else if(master) {
00379                     item.script(i, file, name);
00380                 }
00381             }
00382 
00383             file << "tmux attach -t rtb" << std::endl;
00384             file.close();
00385 
00386             chmod(sh.c_str(), 0755);
00387         }
00388 
00389         friend std::ostream & operator<<(std::ostream & stream, Sequence & sequence) {
00390             for(auto & item : sequence.nodes) {
00391                 stream << "node:" << std::endl << item << std::endl;
00392             }
00393 
00394             return stream;
00395         }
00396 
00397         static Sequence createFromJson(Json::Value const & json) {
00398             Sequence result;
00399             for(auto i = json.begin(), end = json.end(); i != end; ++i) {
00400                 if(i.memberName() == "nodes") {
00401                     auto & json = *i;
00402                     if(!json.empty() && !json.isArray()) {
00403                         throw ML::Exception("'nodes' is not an array");
00404                     }
00405 
00406                     for(auto j = json.begin(), end = json.end(); j != end; ++j) {
00407                         auto & json = *j;
00408                         result.nodes.push_back(Node::createFromJson(json));
00409                     }
00410                 }
00411                 else {
00412                     throw ML::Exception("unknown launch sequence field '" + i.memberName() + "'");
00413                 }
00414             }
00415 
00416             return result;
00417         }
00418 
00419     private:
00420         std::vector<Node> nodes;
00421     };
00422 
00423     struct Service : public MessageLoop
00424     {
00425         void run(Json::Value const & root, std::string const & name, std::string const & filename, std::string const & sh, bool launch, bool master) {
00426             sequence = Datacratic::Launcher::Sequence::createFromJson(root);
00427 
00428             if(!sh.empty()) {
00429                 sequence.script(filename, sh, name, master);
00430             }
00431 
00432             if(launch) {
00433                 node = sequence.getNode(name);
00434                 if(!node) {
00435                     throw ML::Exception("cannot find node " + name);
00436                 }
00437 
00438                 int res = system("mkdir -p ./logs");
00439                 if(res == -1) {
00440                     throw ML::Exception("cannot create ./logs directory");
00441                 }
00442 
00443                 start();
00444 
00445                 struct sigaction sa;
00446                 memset(&sa, 0, sizeof(sa));
00447                 sa.sa_handler = &Service::sigchld;
00448                 sigaction(SIGCHLD, &sa, 0);
00449 
00450                 node->restart();
00451 
00452                 for(;;) {
00453                     ML::sleep(1.0);
00454                 }
00455             }
00456         }
00457 
00458         static Service & get() {
00459             static Service instance;
00460             return instance;
00461         }
00462 
00463     private:
00464         Service() : events(65536) {
00465             events.onEvent = std::bind<void>(&Service::onDeath, this, std::placeholders::_1);
00466             addSource("Launcher::Service::events", events);
00467         }
00468 
00469         void onDeath(int pid) {
00470             Task * item = node->findTask(pid);
00471 
00472             std::time_t now = std::time(0);
00473             std::cerr << "crash! " << (item ? item->getName() : "?") << " detected at " << std::asctime(std::localtime(&now)) << std::endl;
00474             if(item) {
00475                 item->restart();
00476             }
00477         }
00478 
00479         static void sigchld(int pid) {
00480             for(;;) {
00481                 int status = 0;
00482                 int pid = waitpid(-1, &status, WNOHANG);
00483                 if(pid == 0 || pid == -1) {
00484                     break;
00485                 }
00486 
00487                 Service::get().events.push(pid);
00488             }
00489         }
00490 
00491         // child death events
00492         TypedMessageSink<int> events;
00493 
00494         // node associated with this service
00495         Node * node;
00496 
00497         // launching sequence
00498         Sequence sequence;
00499     };
00500 };
00501 
00502 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator