Package Products :: Package ZenUtils :: Module Executor
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.Executor

 1  ############################################################################## 
 2  #  
 3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
 4  #  
 5  # This content is made available according to terms specified in 
 6  # License.zenoss under the directory where your Zenoss product is installed. 
 7  #  
 8  ############################################################################## 
 9   
10   
11  from twisted.internet.defer import Deferred, maybeDeferred 
12  from twisted.internet import reactor 
13 14 -class TwistedExecutor(object):
15 """ 16 Executes up to N callables at a time. N is determined by the maxParrallel 17 used to construct an instance, unlimited by default. 18 """
19 - def __init__(self, maxParrallel=None):
20 self._max = maxParrallel 21 self._running = 0 22 self._taskQueue = []
23
24 - def setMax(self, max):
25 self._max = max 26 reactor.callLater(0, self._runTask)
27
28 - def getMax(self):
29 return self._max
30 31 @property
32 - def running(self):
33 return self._running
34 35 @property
36 - def queued(self):
37 return len(self._taskQueue)
38 39
40 - def submit(self, callable, *args, **kw):
41 """ 42 submit a callable to be executed. A deferred will be returned with the 43 the result of the callable. 44 """ 45 deferred = Deferred() 46 deferred.addBoth(self._taskFinished) 47 task = ExecutorTask(deferred, callable, *args, **kw) 48 self._taskQueue.append(task) 49 reactor.callLater(0, self._runTask) 50 return deferred
51
52 - def _runTask(self):
53 if self._taskQueue and (self._max is None or self._running < self._max): 54 self._running += 1 55 task = self._taskQueue.pop(0) 56 task() 57 reactor.callLater(0, self._runTask)
58
59 - def _taskFinished(self, result):
60 self._running -= 1 61 reactor.callLater(0, self._runTask) 62 return result
63
64 65 -class ExecutorTask(object):
66 """ 67 Used by TwistedExecutor to execute queued tasks 68 """
69 - def __init__(self, deferred, callable, *args, **kw):
70 self._callable = callable 71 self._args = args 72 self._kw = kw 73 self._deferred = deferred
74
75 - def __call__(self):
76 deferred = maybeDeferred(self._callable,*self._args, **self._kw) 77 deferred.addCallback(self._finished) 78 deferred.addErrback(self._error)
79
80 - def _finished(self, result):
81 self._deferred.callback(result)
82
83 - def _error(self, result):
84 self._deferred.errback(result)
85