Package Jobber :: Module zenjobs
[hide private]
[frames] | no frames]

Source Code for Module Jobber.zenjobs

 1  ########################################################################### 
 2  # 
 3  # This program is part of Zenoss Core, an open source monitoring platform. 
 4  # Copyright (C) 2009, Zenoss Inc. 
 5  # 
 6  # This program is free software; you can redistribute it and/or modify it 
 7  # under the terms of the GNU General Public License version 2 as published by 
 8  # the Free Software Foundation. 
 9  # 
10  # For complete information please visit: http://www.zenoss.com/oss/ 
11  # 
12  ########################################################################### 
13   
14  import sys 
15  import time 
16  import logging 
17  from Globals import * 
18  from twisted.internet import reactor, defer 
19  from Products.ZenUtils.CyclingDaemon import CyclingDaemon 
20  import transaction 
21  from status import FAILURE 
22   
23  logger = logging.getLogger('zen.Jobs') 
24  logger.setLevel(20) 
25   
26 -class ZenJobs(CyclingDaemon):
27 """ 28 Daemon to run jobs. 29 """ 30 name = 'zenjobs' 31
32 - def __init__(self, *args, **kwargs):
33 CyclingDaemon.__init__(self, *args, **kwargs) 34 self.jm = self.dmd.JobManager 35 self.runningjobs = []
36
37 - def run_job(self, job):
38 logger.info("Starting %s %s" % ( 39 job.getJobType(), 40 job.getDescription())) 41 self.runningjobs.append(job.start()) 42 # Zope will want to know the job has started 43 transaction.commit() 44 job.getStatus().waitUntilFinished().addCallback(self.job_done)
45
46 - def job_done(self, jobstatus):
47 logger.info('%s %s completed in %s seconds.' % ( 48 jobstatus.getJob().getJobType(), 49 jobstatus.getJob().getDescription(), 50 jobstatus.getDuration())) 51 # Zope will want to know the job has finished 52 transaction.commit()
53
55 return defer.DeferredList(self.runningjobs)
56
57 - def main_loop(self):
58 for job in self.get_new_jobs(): 59 self.run_job(job) 60 self.finish_loop()
61
62 - def finish_loop(self):
63 if self.options.cycle: 64 self.sendHeartbeat() 65 reactor.callLater(self.options.cycletime, self.runCycle) 66 else: 67 # Can't stop the reactor until jobs are done 68 whenDone = self.waitUntilRunningJobsFinish() 69 whenDone.addBoth(self.finish)
70
71 - def runCycle(self):
72 try: 73 start = time.time() 74 self.syncdb() 75 self.main_loop() 76 except: 77 self.log.exception("unexpected exception")
78
79 - def get_new_jobs(self):
80 return [s.getJob() for s in self.jm.getPendingJobs()]
81
82 - def finish(self, r=None):
83 for d in self.runningjobs: 84 try: 85 d.callback(FAILURE) 86 except defer.AlreadyCalledError: 87 pass 88 CyclingDaemon.finish(self, r)
89 90 if __name__ == "__main__": 91 zj = ZenJobs() 92 zj.run() 93