RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/testing/banker_temporary_server.cc
00001 /* banker_temporary_server.h                                        -*- C++ -*-
00002    Jeremy Barnes, 19 October 2012
00003    Wolfgang Sourdeau, 20 December 2012
00004    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00005 
00006    A temporary server for testing of banker-based services. Starts one up in a
00007    temporary directory and gives the uri to connect to. */
00008 
00009 #include <arpa/inet.h>
00010 #include <netinet/in.h>
00011 #include <sys/socket.h>
00012 #include <sys/types.h>
00013 #include <sys/wait.h>
00014 #include <signal.h>
00015 
00016 #include <iostream>
00017 
00018 #include <jml/arch/exception.h>
00019 #include <jml/arch/timers.h>
00020 #include "rtbkit/core/banker/master_banker.h"
00021 
00022 #include "banker_temporary_server.h"
00023 
00024 using namespace std;
00025 
00026 namespace {
00027     bool mustTerminate(false);
00028 
00029     void handleSIGTERM(int) {
00030         cerr << "handling sigterm" << endl;
00031         mustTerminate = true;
00032     }
00033 }
00034 
00035 namespace RTBKIT {
00036 
00037 BankerTemporaryServer::
00038 BankerTemporaryServer(const Redis::Address & redisAddress,
00039                       const std::string & zookeeperUri,
00040                       const std::string & zookeeperPath)
00041     : redisAddress_(redisAddress),
00042       zookeeperUri_(zookeeperUri),
00043       zookeeperPath_(zookeeperPath),
00044       serverPid_(-1)
00045 {
00046     start();
00047 }
00048 
00049 BankerTemporaryServer::
00050 ~BankerTemporaryServer()
00051 {
00052     shutdown();
00053 }
00054 
00055 void
00056 BankerTemporaryServer::
00057 start()
00058 {
00059     // 2.  Start the server
00060     int pid = fork();
00061     if (pid == -1)
00062         throw ML::Exception(errno, "fork");
00063     if (pid == 0) {
00064         signal(SIGTERM, handleSIGTERM);
00065         signal(SIGKILL, SIG_DFL);
00066 
00067         auto redis = make_shared<Redis::AsyncConnection>(redisAddress_);
00068         redis->test();
00069 
00070         // cerr << "tested redis" << endl;
00071 
00072         auto proxies = std::make_shared<ServiceProxies>();
00073         proxies->useZookeeper(zookeeperUri_, zookeeperPath_);
00074         cerr << "zookeeperPath: " << zookeeperPath_ << endl;
00075 
00076         auto banker = make_shared<MasterBanker>(proxies, "masterBanker");
00077 
00078         // cerr << "initializing banker" << endl;
00079 
00080         banker->init(make_shared<RedisBankerPersistence>(redis));
00081         banker->monitorProviderClient.inhibit_ = true;
00082 
00083         // cerr << "binding banker" << endl;
00084 
00085         auto addr = banker->bindTcp();
00086 
00087         cerr << "MasterBanker: addrs = " << addr.first << "," << addr.second
00088              << endl;
00089 
00090         // cerr << "running banker" << endl;
00091         banker->start();
00092         proxies->config->dump(cerr);
00093     
00094         while (!mustTerminate) {
00095             ML::sleep(1);
00096         }
00097 
00098         /* force the destruction of the banker instance */
00099         banker.reset();
00100 
00101         // Exit without running destructors, which will greatly confuse things
00102         _exit(0);
00103     }
00104     else {
00105         cerr << "MasterBanker: pid = " << pid << endl;
00106         serverPid_ = pid;
00107 
00108 #if 0
00109         // 3.  Connect to the server to make sure it works
00110         int sock = socket(AF_INET, SOCK_STREAM, 0);
00111         if (sock == -1)
00112             throw ML::Exception(errno, "socket");
00113 
00114         struct sockaddr_in name;
00115         name.sin_family = AF_INET;
00116         inet_aton("127.0.0.1", &name.sin_addr);
00117         name.sin_port = htons(9876);
00118 
00119         cerr << "Attempting to connect to daemon process..." << endl;
00120 
00121         int res;
00122         // Wait for it to start up
00123         for (unsigned i = 0; i < 1000;  ++i) {
00124             res = connect(sock, (const sockaddr *)&name, sizeof(name));
00125             if (res == 0) break;
00126             if (res == -1 && errno != ECONNREFUSED)
00127                 throw ML::Exception(errno, "connect");
00128             
00129             ML::sleep(0.01);
00130         }
00131 
00132         if (res != 0)
00133             throw ML::Exception("banker didn't start up in 10 seconds");
00134         cerr << "Master Banker daemon is ready" << endl;
00135         ::close(sock);
00136 #endif
00137     }
00138 }
00139 
00140 void
00141 BankerTemporaryServer::
00142 shutdownWithSignal(int signum)
00143 {
00144     cerr << "shutdownWithSignal; serverPid_ = " << serverPid_ << endl;
00145 
00146     if (serverPid_ == -1)
00147         return;
00148 
00149     // Stop boost test framework from interpreting this as a problem...
00150     sighandler_t oldHandler = signal(SIGCHLD, SIG_DFL);
00151 
00152     int res = kill(serverPid_, signum);
00153     if (res == -1)
00154         throw ML::Exception(errno, "exterminate banker");
00155 
00156     cerr << "done kill" << endl;
00157 
00158     int status = 0;
00159     res = waitpid(serverPid_, &status, 0);
00160     if (res == -1)
00161         throw ML::Exception(errno, "wait for banker shutdown");
00162 
00163     serverPid_ = -1;
00164 
00165     signal(SIGCHLD, oldHandler);
00166 }
00167 
00168 void
00169 BankerTemporaryServer::
00170 shutdown()
00171 {
00172     shutdownWithSignal(SIGTERM);
00173 }
00174 
00175 void
00176 BankerTemporaryServer::
00177 exterminate()
00178 {
00179     shutdownWithSignal(SIGKILL);
00180 }
00181 
00182 } // namespace RTBKIT
00183 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator