Package Products :: Package ZenUtils :: Module ProcessQueue
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.ProcessQueue

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2009, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  from collections import deque 
 12  from twisted.internet import reactor, defer, error 
 13  from twisted.internet.protocol import ProcessProtocol 
 14  from twisted.python import failure 
 15  import time 
 16  import logging 
 17   
 18  log = logging.getLogger("zen.processqueue") 
 19   
 20   
21 -class QueueStopped(Exception):
22 pass
23
24 -class ProcessQueue(object):
25 """ 26 Ansynchronously run processes. Processes are queued up and run in a FIFO 27 order. Processes are run in concurrently up to the configured amount. 28 """ 29
30 - def __init__(self, parallel=10):
31 """ 32 initialize the process queue; process in queue will not executed until 33 start is called. 34 @param parallel: number of process to run concurrently 35 @type parallel: int 36 """ 37 self._parallel=parallel 38 self._processes=deque() 39 self._started=False 40 self._num_running=0 41 42 self._maxQtime = 0 43 self._maxExecTime = 0 44 self._stopped=None
45
46 - def queueProcess(self, executable, args=(), env={}, path=None, uid=None, 47 gid=None, usePTY=0, childFDs=None, processProtocol=None, 48 timeout=60, timeout_callback=None):
49 """ 50 add a process to the queue. args are similar to reactor.spawnProcess 51 @param processProtocol: optional protocol to control the process 52 @type processProtocol: ProcessProtocol from twisted 53 @param timeout: how many seconds to let the process execute 54 @type timeout: int 55 @param timeout_callback: callable to call if the process times out 56 @type timeout_callback: callable w/ one arg 57 @raise QueueStopped: if the queue has been stopped 58 """ 59 if self._stopped: 60 raise QueueStopped() 61 processQProtocol = None 62 if processProtocol: 63 processQProtocol = _ProcessQueueProtocolDecorator(processProtocol, 64 executable, args, 65 env, path, uid, 66 gid, usePTY, 67 childFDs,timeout, 68 timeout_callback) 69 else: 70 processQProtocol = _ProcessQueueProtocol(executable, args, 71 env, path, uid, 72 gid, usePTY, 73 childFDs,timeout, 74 timeout_callback) 75 log.debug("Adding process %s to queue" % processQProtocol) 76 log.debug("Processes in queue: %s" % len(self._processes)) 77 78 self._processes.append(processQProtocol) 79 80 if self._started: 81 self._processQueue()
82
83 - def stop(self):
84 """ 85 stops the process queue; no more processes will be accepted. deferred 86 will be called back when process queue is empty 87 """ 88 if self._stopped: return self._stopped 89 self._stopped = defer.Deferred() 90 if self._num_running ==0 and len(self._processes) == 0: 91 self._stopped.callback("process queue is empty and stopped") 92 return self._stopped
93
94 - def start(self):
95 """ 96 start processing the queue. Processes will only be executed when the 97 reactor starts 98 """ 99 def _doStart(): 100 # don't want to actually start unless reactor is running to prevent 101 #zombie processes 102 if not self._started: 103 self._started=True 104 self._processQueue()
105 106 reactor.callLater(0,_doStart)
107
108 - def _processQueue(self):
109 def processFinished(value, processProtocol): 110 self._num_running -= 1 111 reactor.callLater(0, self._processQueue) 112 113 execTime = processProtocol.execStopTime - processProtocol.execStartTime 114 qTime = processProtocol.queueStopTime - processProtocol.queueStartTime 115 self._maxQtime = max(self._maxQtime, qTime) 116 self._maxExecTime = max(self._maxExecTime, execTime) 117 log.debug("execution time %s seconds; queue time %s seconds; " 118 "process %s" 119 % ( execTime, qTime, processProtocol)) 120 if (self._num_running == 0 121 and self._stopped 122 and not self._stopped.called 123 and len(self._processes) == 0): 124 self._stopped.callback("process queue is empty and stopped")
125 log.debug("Number of process being executed: %s" % self._num_running) 126 if self._num_running < self._parallel: 127 processQProtocol = None 128 if self._processes: 129 processQProtocol = self._processes.popleft() 130 if processQProtocol: 131 self._num_running += 1 132 d = processQProtocol.start() 133 d.addBoth(processFinished, processQProtocol) 134 135 if self._processes and self._num_running < self._parallel: 136 reactor.callLater(0, self._processQueue) 137 return 138
139 -class _ProcessQueueProtocol(ProcessProtocol):
140 """ 141 For interal use by ProcessQueue 142 Protocol to run processes in ProcessQueue. Controls life cycle or process 143 including timing out long running processes 144 """ 145
146 - def __init__(self, executable, args=(), env={}, path=None, 147 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60, 148 timeout_callback=None):
149 self._executable=executable 150 self._args=args 151 self._env=env 152 self._path=path 153 self._uid=uid 154 self._gid=gid 155 self._usePTY=usePTY 156 self._childFDs=childFDs 157 self._time_out=timeout 158 self._timeoutDeferred=None 159 self._timeout_callback=timeout_callback 160 self.queueStartTime = time.time() 161 self.queueStopTime = None 162 self.execStartTime = None 163 self.execStopTime = None
164
165 - def __str__(self):
166 if self._args: 167 return"process %s" % " ".join(self._args) 168 else: 169 return "process %s" % self._executable
170
171 - def start(self):
172 log.debug("spawning %s " % self) 173 now = time.time() 174 self.queueStopTime = now 175 self.execStartTime = now 176 reactor.spawnProcess(self, self._executable, self._args, 177 self._env, self._path, self._uid, self._gid, 178 self._usePTY, self._childFDs) 179 self._timeoutDeferred = createTimeout(defer.Deferred(), self._time_out, self) 180 self._timeoutDeferred.addErrback(self._timedOut) 181 if self._timeout_callback: 182 self._timeoutDeferred.addErrback(self._timeout_callback) 183 return self._timeoutDeferred
184
185 - def _timedOut(self, value):
186 "Kill a process if it takes too long" 187 try: 188 if not self.execStopTime: 189 self.execStopTime = time.time() 190 191 self.transport.signalProcess('KILL') 192 log.warning("timed out after %s seconds: %s" % (self._time_out, 193 self)) 194 except error.ProcessExitedAlready: 195 log.debug("Process already exited: %s" % self) 196 return value
197
198 - def processEnded(self, reason):
199 """ 200 This will be called when the subprocess is finished. 201 202 @type reason: L{twisted.python.failure.Failure} 203 """ 204 if not self.execStopTime: 205 self.execStopTime = time.time() 206 207 deferred = self._timeoutDeferred 208 self._timeoutDeferred = None 209 if deferred and not deferred.called: 210 msg = reason.getErrorMessage() 211 exitCode = reason.value.exitCode 212 deferred.callback((exitCode,msg))
213 214 215
216 -class _ProcessQueueProtocolDecorator(_ProcessQueueProtocol):
217 """ 218 For interal use by ProcessQueue 219 Wraps an existing ProcessProtocol so that it can be run in a ProcessQueue 220 """
221 - def __init__(self, protocol, executable, args=(), env={}, path=None, 222 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60, 223 timeout_callback=None):
224 _ProcessQueueProtocol.__init__(self, executable, args, env, path, uid, 225 gid, usePTY, childFDs, timeout, 226 timeout_callback) 227 self._protocol = protocol
228 229
230 - def outReceived(self, data):
231 """ 232 Some data was received from stdout. 233 """ 234 self._protocol.outReceived(data)
235
236 - def errReceived(self, data):
237 """ 238 Some data was received from stderr. 239 """ 240 self._protocol.errReceived(data)
241
242 - def inConnectionLost(self):
243 """ 244 This will be called when stdin is closed. 245 """ 246 self._protocol.inConnectionLost()
247
248 - def outConnectionLost(self):
249 """ 250 This will be called when stdout is closed. 251 """ 252 self._protocol.outConnectionLost()
253 254
255 - def errConnectionLost(self):
256 """ 257 This will be called when stderr is closed. 258 """ 259 self._protocol.errConnectionLost()
260
261 - def processEnded(self, reason):
262 """ 263 This will be called when the subprocess is finished. 264 265 @type reason: L{twisted.python.failure.Failure} 266 """ 267 _ProcessQueueProtocol.processEnded(self, reason) 268 self._protocol.processEnded(reason)
269
270 -class TimeoutError(Exception):
271 """ 272 Error for a defered call taking too long to complete 273 """ 274
275 - def __init__(self, *args):
276 Exception.__init__(self) 277 self.args = args
278
279 -def createTimeout(deferred, seconds, obj):
280 """ 281 Cause an error on a deferred when it is taking too long to complete. 282 @param deferred: deferred to monitor for callback/errback 283 @type deferred: Deferred 284 @param seconds: Time to wait for a callback/errback on the deferred 285 @type seconds: int 286 @pram obj: context for the TimeoutError when timeout occurs 287 @type obj: anything 288 """ 289 290 def _timeout(deferred, obj): 291 "took too long... call an errback" 292 deferred.errback(failure.Failure(TimeoutError(obj)))
293 294 def _cb(arg, timer): 295 "the process finished, possibly by timing out" 296 if not timer.called: 297 timer.cancel() 298 return arg 299 300 timer = reactor.callLater(seconds, _timeout, deferred, obj) 301 deferred.addBoth(_cb, timer) 302 return deferred 303