Package Products :: Package ZenUtils :: Module Watchdog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.Watchdog

  1  #! /usr/bin/env python  
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  __doc__='''watchdog for zenoss daemons 
 13   
 14  Run a program that is expected to run forever.  If the program stops, 
 15  restart it. 
 16   
 17  ''' 
 18   
 19  import Globals 
 20  from Products.ZenUtils.Utils import zenPath 
 21  import logging 
 22   
 23  import socket as s 
 24  import os, sys, time, signal, select 
 25   
26 -class TimeoutError(Exception): pass
27 -class UnexpectedFailure(Exception): pass
28 29 log = logging.getLogger('watchdog') 30 31 # time to spend waiting around for a child to die after we kill it 32 DEATH_WATCH_TIME = 10 33 34 # time to wait between tests of a childs imminent death 35 BUSY_WAIT_SLEEP = 0.5 36
37 -def _sleep(secs):
38 "Sleep, but don't raise an exception if interrupted" 39 try: 40 time.sleep(secs) 41 except: 42 pass
43
44 -class ExitStatus:
45 "Model a child's exit status"
46 - def __init__(self, status):
47 self.status = status
48
49 - def __str__(self):
50 if self.signaled(): 51 return 'Killed with signal %d' % self.signal() 52 return 'Exited with code %d' % self.exitCode()
53
54 - def __repr__(self):
55 return '<ExitStatus %d (%s)>' % (self.status, self)
56
57 - def signaled(self):
58 return os.WIFSIGNALED(self.status)
59
60 - def exitCode(self):
61 if self.signaled(): 62 raise ValueError(str(self)) 63 return os.WEXITSTATUS(self.status)
64
65 - def signal(self):
66 if not self.signaled(): 67 raise ValueError(str(self)) 68 return os.WTERMSIG(self.status)
69 70
71 -class Watcher:
72 """Run the given command, and expect periodic input on a shared 73 UNIX-domain socket. If the command does not connect to the socket 74 in startTimeout seconds, or it does not report every cycleTimeout 75 seconds, then the process is restarted. The Watchdog will 76 increase the time-between restarts until maxTime is achieved""" 77
78 - def __init__(self, 79 socketPath, 80 cmd, 81 startTimeout = None, 82 cycleTimeout = 1, 83 maxTime = 30):
84 if startTimeout == None: 85 startTimeout = 120 86 self.socketPath = socketPath 87 self.cmd = cmd 88 self.startTimeout = startTimeout 89 self.cycleTimeout = cycleTimeout 90 self.maxTime = maxTime 91 self.stop = False 92 self.childPid = -1
93
94 - def _kill(self):
95 """Send a signal to a process and wait for it to stop. Use 96 progressively more serious signals to get it to stop. 97 """ 98 if self.childPid <= 0: 99 return 100 signals = signal.SIGINT, signal.SIGTERM, signal.SIGKILL 101 for sig in signals: 102 log.debug("Killing %d with %d", self.childPid, sig) 103 os.kill(self.childPid, sig) 104 stopTime = time.time() + DEATH_WATCH_TIME / len(signals) 105 while time.time() < stopTime: 106 try: 107 pid, status = os.waitpid(self.childPid, os.WNOHANG) 108 if pid: 109 return ExitStatus(status) 110 except os.error: 111 pass 112 _sleep(BUSY_WAIT_SLEEP)
113
114 - def _readWait(self, sock, timeout):
115 "Wait for a file descriptor to become readable" 116 endTime = time.time() + timeout 117 # Loop because signals can cause select stop early 118 while not self.stop and time.time() < endTime: 119 diff = endTime - time.time() 120 try: 121 log.debug("waiting %f seconds" % diff) 122 rd, wr, ex = select.select([sock], [], [], diff) 123 except Exception: 124 continue 125 if rd: 126 return sock 127 return None
128
129 - def _runOnce(self):
130 try: 131 if os.path.exists(self.socketPath): 132 os.unlink(self.socketPath) 133 except OSError: 134 log.exception("Problem removing old socket %s" % self.socketPath) 135 cmd = self.cmd + ['--watchdogPath', self.socketPath] 136 cmd.insert(0, sys.executable) 137 sock = s.socket(s.AF_UNIX, s.SOCK_STREAM) 138 sock.bind(self.socketPath) 139 self.childPid = os.fork() 140 if self.childPid < 0: 141 log.error("Unable to fork") 142 return 143 if self.childPid == 0: 144 # child 145 try: 146 log.debug('Running %r' % (cmd,)) 147 os.execlp(cmd[0], *cmd) 148 except: 149 log.exception("Exec failed!") 150 sys.exit(0) 151 try: 152 sock.setblocking(False) 153 sock.listen(1) 154 if not self._readWait(sock, self.startTimeout): 155 if not self.stop: 156 raise TimeoutError("getting initial connection from process") 157 log.debug('Waiting for command to connect %r' % (cmd,)) 158 conn, addr = sock.accept() 159 conn.setblocking(False) 160 try: 161 buf = '' 162 while not self.stop: 163 # get input from the child 164 if not self._readWait(conn, self.cycleTimeout * 2): 165 if not self.stop: 166 raise TimeoutError("getting status from process") 167 try: 168 bytes = conn.recv(1024) 169 except Exception: 170 continue 171 if bytes == '': # EOF 172 pid, status = os.waitpid(self.childPid, os.WNOHANG) 173 if pid == self.childPid: 174 status = ExitStatus(status) 175 self.childPid = -1 176 if status.signaled(): 177 raise UnexpectedFailure(status) 178 if status.exitCode() != 0: 179 log.error("Child exited with status %d" % 180 status.exitCode()) 181 raise UnexpectedFailure(status) 182 return 183 else: 184 _sleep(0.1) 185 continue 186 # interpret the data as an updated cycleTime 187 buf += bytes 188 lines = buf.split('\n') 189 if lines: 190 buf = lines[-1] 191 line = lines[0] 192 if line: 193 log.debug("Child sent %s" % line) 194 try: 195 self.cycleTimeout = max(int(line), 1) 196 log.debug("Watchdog cycleTimeout is %d", 197 self.cycleTimeout) 198 except ValueError: 199 log.exception("Unable to convert cycleTime") 200 finally: 201 conn.close() 202 finally: 203 os.unlink(self.socketPath) 204 self._kill()
205
206 - def _stop(self, *unused):
207 self.stop = True
208
209 - def run(self):
210 sleepTime = 1 211 signal.signal(signal.SIGINT, self._stop) 212 while not self.stop: 213 try: 214 self._runOnce() 215 return 216 except TimeoutError, ex: 217 log.error("Timeout: %s" % ex.args) 218 except UnexpectedFailure, ex: 219 status = ex.args[0] 220 log.error("Child died: %s" % status) 221 except Exception, ex: 222 log.exception(ex) 223 if not self.stop: 224 log.debug("Waiting %.2f seconds before restarting", sleepTime) 225 _sleep(sleepTime) 226 prog = self.cmd[0].split('/')[-1].split('.')[0] 227 log.error("Restarting %s" % prog) 228 sleepTime = min(1.5 * sleepTime, self.maxTime)
229
230 -class Reporter:
231 - def __init__(self, path):
232 self.sock = s.socket(s.AF_UNIX, s.SOCK_STREAM) 233 self.sock.connect(path) 234 self.sock.setblocking(False)
235
236 - def niceDoggie(self, cycleTime):
237 cycleTime = max(1, int(cycleTime)) 238 try: 239 try: 240 self.sock.recv(1) 241 log.error("Received input on report socket: probably EOF") 242 sys.exit(1) 243 except: 244 pass 245 self.sock.send('%d\n' % cycleTime) 246 except Exception: 247 log.exception("Unable to report to the watchdog.") 248 sys.exit(1)
249
250 - def close(self):
251 self.sock.close()
252
253 -def main():
254 '''Little test for the Watchdog. 255 Usage: 256 python Watchdog.py -- python Watchdog.py -e 1 257 258 This will repeatedly run a child that exits with exit code 1. 259 The child does periodic reports over the watchdog socket. 260 ''' 261 import getopt 262 global log 263 opts, cmd = getopt.getopt(sys.argv[1:], 'p:m:d:e:c:', 'watchdogPath=') 264 socketPath = 'watchdog.%d' % os.getpid() 265 maxTime = 30 266 cycleTime = 1 267 level = 20 268 child = None 269 exitCode = 0 270 for opt, arg in opts: 271 if opt == '-p': 272 socketPath = arg 273 if opt == '-e': 274 exitCode = int(arg) 275 if opt == '-m': 276 maxTime = float(arg) 277 if opt == '-d': 278 level = int(arg) 279 if opt == '-c': 280 cycleTime = int(arg) 281 if opt == '--watchdogPath': 282 socketPath = arg 283 child = True 284 if not child: 285 socketPath = zenPath('var', socketPath) 286 287 logging.basicConfig(level=level) 288 log = logging.getLogger('watchdog') 289 290 if child: 291 r = Reporter(socketPath) 292 print 'Connected' 293 for i in range(3): 294 time.sleep(1) 295 r.niceDoggie(1) 296 sys.stdout.write('*') 297 sys.stdout.flush() 298 time.sleep(2) 299 r.close() 300 print 'Closed' 301 sys.exit(exitCode) 302 else: 303 w = Watcher(socketPath, cmd, cycleTimeout=cycleTime, maxTime=maxTime) 304 w.run()
305 306 if __name__ == '__main__': 307 main() 308 309 __all__ = ['Watcher', 'Reporter'] 310