Package Jobber :: Module manager
[hide private]
[frames] | no frames]

Source Code for Module Jobber.manager

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  from Globals import InitializeClass 
 15  import transaction 
 16  from zope.interface import implements 
 17   
 18  from Products.ZenRelations.RelSchema import * 
 19  from Products.ZenModel.ZenModelRM import ZenModelRM 
 20   
 21  from interfaces import * 
 22  from status import JobStatus, FAILURE 
 23  import time 
 24  from copy import copy 
 25  from Products.ZenUtils import guid 
 26   
27 -def manage_addJobManager(context, id="JobManager"):
28 jm = JobManager(id) 29 context._setObject(id, jm) 30 return getattr(context, id)
31
32 -class JobManager(ZenModelRM):
33 34 implements(IJobManager) 35 36 meta_type = portal_type = 'JobManager' 37 38 _relations = ( 39 ("jobs", 40 ToManyCont( 41 ToOne, "Products.Jobber.status.JobStatus", "jobmanager" 42 ) 43 ), 44 ) 45
46 - def _getId(self, klass=None):
47 """ 48 Get a unique id for jobs. 49 50 If C{klass} is not None, its __name__ attribute will be used as the id 51 prefix. 52 53 @return: A unique id. 54 @rtype: str 55 """ 56 if klass is not None: 57 name = klass.__name__ 58 else: 59 name = 'job' 60 return "%s_%s" % (name, guid.generate())
61
62 - def addJob(self, klass, *args, **kwargs):
63 """ 64 Create a new L{Job} and L{JobStatus} from the class specified. 65 66 C{klass} must implement L{IJob} and should subclass L{Job}. 67 68 @return: The L{JobStatus} object representing the job created. 69 @rtype: L{JobStatus} 70 """ 71 assert IJob.implementedBy(klass), ("JobManager.addJob can accept" 72 " only IJob classes") 73 jobid = self._getId(klass) 74 instance = klass(jobid, *args, **kwargs) 75 # Create the JobStatus representing this Job 76 status = JobStatus(instance) 77 self.jobs._setObject(status.id, status) 78 transaction.commit() 79 return self.jobs._getOb(status.id)
80
81 - def getJob(self, jobid):
82 """ 83 Return a L{JobStatus} object that matches the id specified. 84 85 @param jobid: id of the L{JobStatus}. The "JobStatus_" prefix is not 86 necessary. 87 @type jobid: str 88 @return: A matching L{JobStatus} object, or None if none is found 89 @rtype: L{JobStatus}, None 90 """ 91 # Status objects have ids like 92 # "ShellCommandJobStatus_7680ef-9234-2875f0abc", 93 # but we only care about the part at the end. 94 uid = jobid.split('_')[-1] 95 for jid in self.jobs.objectIds(): 96 if jid.endswith(uid): 97 return self.jobs._getOb(jid) 98 return None
99
100 - def getUnfinishedJobs(self):
101 """ 102 Return JobStatus objects that have not yet completed, including those 103 that have not yet started. 104 105 @return: A list of jobs. 106 @rtype: list 107 """ 108 def isUnfinished(job): 109 return not job.isFinished()
110 return filter(isUnfinished, self.jobs())
111
112 - def getRunningJobs(self):
113 """ 114 Return JobStatus objects that have started but not finished. 115 116 @return: A list of jobs. 117 @rtype: list 118 """ 119 def isRunning(job): 120 return not job.isFinished() and job.isStarted()
121 return filter(isRunning, self.jobs()) 122
123 - def getPendingJobs(self):
124 """ 125 Return JobStatus objects that have not yet started. 126 127 @return: A list of jobs. 128 @rtype: list 129 """ 130 def isPending(job): 131 return not job.isStarted()
132 return filter(isPending, self.jobs()) 133
134 - def getFinishedJobs(self):
135 """ 136 Return JobStatus objects that have finished. 137 138 @return: A list of jobs. 139 @rtype: list 140 """ 141 def isFinished(job): 142 return job.isFinished()
143 return filter(isFinished, self.jobs()) 144
145 - def deleteUntil(self, untiltime):
146 """ 147 Delete all jobs older than untiltime. 148 """ 149 for job in self.getFinishedJobs(): 150 if job.getTimes()[1] <= untiltime: 151 job.delete()
152
153 - def clearJobs(self):
154 """ 155 Clear out all finished jobs. 156 """ 157 self.deleteUntil(time.time())
158
159 - def killRunning(self):
160 """ 161 Cancel running jobs with FAILURE. 162 """ 163 for job in self.getRunningJobs(): 164 job.interrupt()
165 166 167 InitializeClass(JobManager) 168