Package ZenUtils :: Module Watchdog
[hide private]
[frames] | no frames]

Source Code for Module ZenUtils.Watchdog

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__='''watchdog for zenoss daemons 
 16   
 17  Run a program that is expected to run forever.  If the program stops, 
 18  restart it. 
 19   
 20  ''' 
 21   
 22  import Globals 
 23  from Products.ZenUtils.Utils import zenPath 
 24  import logging 
 25   
 26  import socket as s 
 27  import os, sys, time, signal, select 
 28   
29 -class TimeoutError(Exception): pass
30 -class UnexpectedFailure(Exception): pass
31 32 log = logging.getLogger('watchdog') 33 34 # time to spend waiting around for a child to die after we kill it 35 DEATH_WATCH_TIME = 10 36 37 # time to wait between tests of a childs imminent death 38 BUSY_WAIT_SLEEP = 0.5 39
40 -def _sleep(secs):
41 "Sleep, but don't raise an exception if interrupted" 42 try: 43 time.sleep(secs) 44 except: 45 pass
46
47 -class ExitStatus:
48 "Model a child's exit status"
49 - def __init__(self, status):
50 self.status = status
51
52 - def __str__(self):
53 if self.signaled(): 54 return 'Killed with signal %d' % self.signal() 55 return 'Exited with code %d' % self.exitCode()
56
57 - def __repr__(self):
58 return '<ExitStatus %d (%s)>' % (self.status, self)
59
60 - def signaled(self):
61 return os.WIFSIGNALED(self.status)
62
63 - def exitCode(self):
64 if self.signaled(): 65 raise ValueError(str(self)) 66 return os.WEXITSTATUS(self.status)
67
68 - def signal(self):
69 if not self.signaled(): 70 raise ValueError(str(self)) 71 return os.WTERMSIG(self.status)
72 73
74 -class Watcher:
75 """Run the given command, and expect periodic input on a shared 76 UNIX-domain socket. If the command does not connect to the socket 77 in startTimeout seconds, or it does not report every cycleTimeout 78 seconds, then the process is restarted. The Watchdog will 79 increase the time-between restarts until maxTime is achieved""" 80
81 - def __init__(self, 82 socketPath, 83 cmd, 84 startTimeout = None, 85 cycleTimeout = 1, 86 maxTime = 30):
87 if startTimeout == None: 88 startTimeout = 120 89 self.socketPath = socketPath 90 self.cmd = cmd 91 self.startTimeout = startTimeout 92 self.cycleTimeout = cycleTimeout 93 self.maxTime = maxTime 94 self.stop = False 95 self.childPid = -1
96
97 - def _kill(self):
98 """Send a signal to a process and wait for it to stop. Use 99 progressively more serious signals to get it to stop. 100 """ 101 if self.childPid <= 0: 102 return 103 signals = signal.SIGINT, signal.SIGTERM, signal.SIGKILL 104 for sig in signals: 105 log.debug("Killing %d with %d", self.childPid, sig) 106 os.kill(self.childPid, sig) 107 stopTime = time.time() + DEATH_WATCH_TIME / len(signals) 108 while time.time() < stopTime: 109 try: 110 pid, status = os.waitpid(self.childPid, os.WNOHANG) 111 if pid: 112 return ExitStatus(status) 113 except os.error: 114 pass 115 _sleep(BUSY_WAIT_SLEEP)
116
117 - def _readWait(self, sock, timeout):
118 "Wait for a file descriptor to become readable" 119 endTime = time.time() + timeout 120 # Loop because signals can cause select stop early 121 while not self.stop and time.time() < endTime: 122 diff = endTime - time.time() 123 try: 124 log.debug("waiting %f seconds" % diff) 125 rd, wr, ex = select.select([sock], [], [], diff) 126 except Exception: 127 continue 128 if rd: 129 return sock 130 return None
131
132 - def _runOnce(self):
133 try: 134 if os.path.exists(self.socketPath): 135 os.unlink(self.socketPath) 136 except OSError: 137 log.exception("Problem removing old socket %s" % self.socketPath) 138 cmd = self.cmd + ['--watchdogPath', self.socketPath] 139 cmd.insert(0, sys.executable) 140 sock = s.socket(s.AF_UNIX, s.SOCK_STREAM) 141 sock.bind(self.socketPath) 142 self.childPid = os.fork() 143 if self.childPid < 0: 144 log.error("Unable to fork") 145 return 146 if self.childPid == 0: 147 # child 148 try: 149 log.debug('Running %r' % (cmd,)) 150 os.execlp(cmd[0], *cmd) 151 except: 152 log.exception("Exec failed!") 153 sys.exit(0) 154 try: 155 sock.setblocking(False) 156 sock.listen(1) 157 if not self._readWait(sock, self.startTimeout): 158 if not self.stop: 159 raise TimeoutError("getting initial connection from process") 160 log.debug('Waiting for command to connect %r' % (cmd,)) 161 conn, addr = sock.accept() 162 conn.setblocking(False) 163 try: 164 buf = '' 165 while not self.stop: 166 # get input from the child 167 if not self._readWait(conn, self.cycleTimeout * 2): 168 if not self.stop: 169 raise TimeoutError("getting status from process") 170 try: 171 bytes = conn.recv(1024) 172 except Exception: 173 continue 174 if bytes == '': # EOF 175 pid, status = os.waitpid(self.childPid, os.WNOHANG) 176 if pid == self.childPid: 177 status = ExitStatus(status) 178 self.childPid = -1 179 if status.signaled(): 180 raise UnexpectedFailure(status) 181 if status.exitCode() != 0: 182 log.error("Child exited with status %d" % 183 status.exitCode()) 184 raise UnexpectedFailure(status) 185 return 186 else: 187 _sleep(0.1) 188 continue 189 # interpret the data as an updated cycleTime 190 buf += bytes 191 lines = buf.split('\n') 192 if lines: 193 buf = lines[-1] 194 line = lines[0] 195 if line: 196 log.debug("Child sent %s" % line) 197 try: 198 self.cycleTimeout = max(int(line), 1) 199 log.debug("Watchdog cycleTimeout is %d", 200 self.cycleTimeout) 201 except ValueError: 202 log.exception("Unable to convert cycleTime") 203 finally: 204 conn.close() 205 finally: 206 os.unlink(self.socketPath) 207 self._kill()
208
209 - def _stop(self, *unused):
210 self.stop = True
211
212 - def run(self):
213 sleepTime = 1 214 signal.signal(signal.SIGINT, self._stop) 215 while not self.stop: 216 try: 217 self._runOnce() 218 return 219 except TimeoutError, ex: 220 log.error("Timeout: %s" % ex.args) 221 except UnexpectedFailure, ex: 222 status = ex.args[0] 223 log.error("Child died: %s" % status) 224 except Exception, ex: 225 log.exception(ex) 226 if not self.stop: 227 log.debug("Waiting %.2f seconds before restarting", sleepTime) 228 _sleep(sleepTime) 229 prog = self.cmd[0].split('/')[-1].split('.')[0] 230 log.error("Restarting %s" % prog) 231 sleepTime = min(1.5 * sleepTime, self.maxTime)
232
233 -class Reporter:
234 - def __init__(self, path):
235 self.sock = s.socket(s.AF_UNIX, s.SOCK_STREAM) 236 self.sock.connect(path) 237 self.sock.setblocking(False)
238
239 - def niceDoggie(self, cycleTime):
240 cycleTime = max(1, int(cycleTime)) 241 try: 242 try: 243 self.sock.recv(1) 244 log.error("Received input on report socket: probably EOF") 245 sys.exit(1) 246 except: 247 pass 248 self.sock.send('%d\n' % cycleTime) 249 except Exception: 250 log.exception("Unable to report to the watchdog.") 251 sys.exit(1)
252
253 - def close(self):
254 self.sock.close()
255
256 -def main():
257 '''Little test for the Watchdog. 258 Usage: 259 python Watchdog.py -- python Watchdog.py -e 1 260 261 This will repeatedly run a child that exits with exit code 1. 262 The child does periodic reports over the watchdog socket. 263 ''' 264 import getopt 265 global log 266 opts, cmd = getopt.getopt(sys.argv[1:], 'p:m:d:e:c:', 'watchdogPath=') 267 socketPath = 'watchdog.%d' % os.getpid() 268 maxTime = 30 269 cycleTime = 1 270 level = 20 271 child = None 272 exitCode = 0 273 for opt, arg in opts: 274 if opt == '-p': 275 socketPath = arg 276 if opt == '-e': 277 exitCode = int(arg) 278 if opt == '-m': 279 maxTime = float(arg) 280 if opt == '-d': 281 level = int(arg) 282 if opt == '-c': 283 cycleTime = int(arg) 284 if opt == '--watchdogPath': 285 socketPath = arg 286 child = True 287 if not child: 288 socketPath = zenPath('var', socketPath) 289 290 logging.basicConfig(level=level) 291 log = logging.getLogger('watchdog') 292 293 if child: 294 r = Reporter(socketPath) 295 print 'Connected' 296 for i in range(3): 297 time.sleep(1) 298 r.niceDoggie(1) 299 sys.stdout.write('*') 300 sys.stdout.flush() 301 time.sleep(2) 302 r.close() 303 print 'Closed' 304 sys.exit(exitCode) 305 else: 306 w = Watcher(socketPath, cmd, cycleTimeout=cycleTime, maxTime=maxTime) 307 w.run()
308 309 if __name__ == '__main__': 310 main() 311 312 __all__ = ['Watcher', 'Reporter'] 313