1
2
3
4
5
6
7
8
9
10
11 from collections import deque
12 from twisted.internet import reactor, defer, error
13 from twisted.internet.protocol import ProcessProtocol
14 from twisted.python import failure
15 import time
16 import logging
17
18 log = logging.getLogger("zen.processqueue")
19
20
23
25 """
26 Ansynchronously run processes. Processes are queued up and run in a FIFO
27 order. Processes are run in concurrently up to the configured amount.
28 """
29
31 """
32 initialize the process queue; process in queue will not executed until
33 start is called.
34 @param parallel: number of process to run concurrently
35 @type parallel: int
36 """
37 self._parallel=parallel
38 self._processes=deque()
39 self._started=False
40 self._num_running=0
41
42 self._maxQtime = 0
43 self._maxExecTime = 0
44 self._stopped=None
45
46 - def queueProcess(self, executable, args=(), env={}, path=None, uid=None,
47 gid=None, usePTY=0, childFDs=None, processProtocol=None,
48 timeout=60, timeout_callback=None):
49 """
50 add a process to the queue. args are similar to reactor.spawnProcess
51 @param processProtocol: optional protocol to control the process
52 @type processProtocol: ProcessProtocol from twisted
53 @param timeout: how many seconds to let the process execute
54 @type timeout: int
55 @param timeout_callback: callable to call if the process times out
56 @type timeout_callback: callable w/ one arg
57 @raise QueueStopped: if the queue has been stopped
58 """
59 if self._stopped:
60 raise QueueStopped()
61 processQProtocol = None
62 if processProtocol:
63 processQProtocol = _ProcessQueueProtocolDecorator(processProtocol,
64 executable, args,
65 env, path, uid,
66 gid, usePTY,
67 childFDs,timeout,
68 timeout_callback)
69 else:
70 processQProtocol = _ProcessQueueProtocol(executable, args,
71 env, path, uid,
72 gid, usePTY,
73 childFDs,timeout,
74 timeout_callback)
75 log.debug("Adding process %s to queue" % processQProtocol)
76 log.debug("Processes in queue: %s" % len(self._processes))
77
78 self._processes.append(processQProtocol)
79
80 if self._started:
81 self._processQueue()
82
84 """
85 stops the process queue; no more processes will be accepted. deferred
86 will be called back when process queue is empty
87 """
88 if self._stopped: return self._stopped
89 self._stopped = defer.Deferred()
90 if self._num_running ==0 and len(self._processes) == 0:
91 self._stopped.callback("process queue is empty and stopped")
92 return self._stopped
93
95 """
96 start processing the queue. Processes will only be executed when the
97 reactor starts
98 """
99 def _doStart():
100
101
102 if not self._started:
103 self._started=True
104 self._processQueue()
105
106 reactor.callLater(0,_doStart)
107
109 def processFinished(value, processProtocol):
110 self._num_running -= 1
111 reactor.callLater(0, self._processQueue)
112
113 execTime = processProtocol.execStopTime - processProtocol.execStartTime
114 qTime = processProtocol.queueStopTime - processProtocol.queueStartTime
115 self._maxQtime = max(self._maxQtime, qTime)
116 self._maxExecTime = max(self._maxExecTime, execTime)
117 log.debug("execution time %s seconds; queue time %s seconds; "
118 "process %s"
119 % ( execTime, qTime, processProtocol))
120 if (self._num_running == 0
121 and self._stopped
122 and not self._stopped.called
123 and len(self._processes) == 0):
124 self._stopped.callback("process queue is empty and stopped")
125 log.debug("Number of process being executed: %s" % self._num_running)
126 if self._num_running < self._parallel:
127 processQProtocol = None
128 if self._processes:
129 processQProtocol = self._processes.popleft()
130 if processQProtocol:
131 self._num_running += 1
132 d = processQProtocol.start()
133 d.addBoth(processFinished, processQProtocol)
134
135 if self._processes and self._num_running < self._parallel:
136 reactor.callLater(0, self._processQueue)
137 return
138
140 """
141 For interal use by ProcessQueue
142 Protocol to run processes in ProcessQueue. Controls life cycle or process
143 including timing out long running processes
144 """
145
146 - def __init__(self, executable, args=(), env={}, path=None,
147 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60,
148 timeout_callback=None):
149 self._executable=executable
150 self._args=args
151 self._env=env
152 self._path=path
153 self._uid=uid
154 self._gid=gid
155 self._usePTY=usePTY
156 self._childFDs=childFDs
157 self._time_out=timeout
158 self._timeoutDeferred=None
159 self._timeout_callback=timeout_callback
160 self.queueStartTime = time.time()
161 self.queueStopTime = None
162 self.execStartTime = None
163 self.execStopTime = None
164
166 if self._args:
167 return"process %s" % " ".join(self._args)
168 else:
169 return "process %s" % self._executable
170
172 log.debug("spawning %s " % self)
173 now = time.time()
174 self.queueStopTime = now
175 self.execStartTime = now
176 reactor.spawnProcess(self, self._executable, self._args,
177 self._env, self._path, self._uid, self._gid,
178 self._usePTY, self._childFDs)
179 self._timeoutDeferred = createTimeout(defer.Deferred(), self._time_out, self)
180 self._timeoutDeferred.addErrback(self._timedOut)
181 if self._timeout_callback:
182 self._timeoutDeferred.addErrback(self._timeout_callback)
183 return self._timeoutDeferred
184
186 "Kill a process if it takes too long"
187 try:
188 if not self.execStopTime:
189 self.execStopTime = time.time()
190
191 self.transport.signalProcess('KILL')
192 log.warning("timed out after %s seconds: %s" % (self._time_out,
193 self))
194 except error.ProcessExitedAlready:
195 log.debug("Process already exited: %s" % self)
196 return value
197
199 """
200 This will be called when the subprocess is finished.
201
202 @type reason: L{twisted.python.failure.Failure}
203 """
204 if not self.execStopTime:
205 self.execStopTime = time.time()
206
207 deferred = self._timeoutDeferred
208 self._timeoutDeferred = None
209 if deferred and not deferred.called:
210 msg = reason.getErrorMessage()
211 exitCode = reason.value.exitCode
212 deferred.callback((exitCode,msg))
213
214
215
217 """
218 For interal use by ProcessQueue
219 Wraps an existing ProcessProtocol so that it can be run in a ProcessQueue
220 """
221 - def __init__(self, protocol, executable, args=(), env={}, path=None,
222 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60,
223 timeout_callback=None):
224 _ProcessQueueProtocol.__init__(self, executable, args, env, path, uid,
225 gid, usePTY, childFDs, timeout,
226 timeout_callback)
227 self._protocol = protocol
228
229
231 """
232 Some data was received from stdout.
233 """
234 self._protocol.outReceived(data)
235
237 """
238 Some data was received from stderr.
239 """
240 self._protocol.errReceived(data)
241
243 """
244 This will be called when stdin is closed.
245 """
246 self._protocol.inConnectionLost()
247
249 """
250 This will be called when stdout is closed.
251 """
252 self._protocol.outConnectionLost()
253
254
256 """
257 This will be called when stderr is closed.
258 """
259 self._protocol.errConnectionLost()
260
269
271 """
272 Error for a defered call taking too long to complete
273 """
274
276 Exception.__init__(self)
277 self.args = args
278
280 """
281 Cause an error on a deferred when it is taking too long to complete.
282 @param deferred: deferred to monitor for callback/errback
283 @type deferred: Deferred
284 @param seconds: Time to wait for a callback/errback on the deferred
285 @type seconds: int
286 @pram obj: context for the TimeoutError when timeout occurs
287 @type obj: anything
288 """
289
290 def _timeout(deferred, obj):
291 "took too long... call an errback"
292 deferred.errback(failure.Failure(TimeoutError(obj)))
293
294 def _cb(arg, timer):
295 "the process finished, possibly by timing out"
296 if not timer.called:
297 timer.cancel()
298 return arg
299
300 timer = reactor.callLater(seconds, _timeout, deferred, obj)
301 deferred.addBoth(_cb, timer)
302 return deferred
303