Package Jobber :: Module jobs
[hide private]
[frames] | no frames]

Source Code for Module Jobber.jobs

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import os 
 15  import time 
 16  from datetime import datetime 
 17  import transaction 
 18  from twisted.internet import defer, reactor 
 19  from twisted.internet.protocol import ProcessProtocol 
 20  from zope.interface import implements 
 21  from Globals import InitializeClass 
 22  from Products.ZenModel.ZenModelRM import ZenModelRM 
 23  from Products.ZenRelations.RelSchema import * 
 24  from Products.ZenUtils.Utils import basicAuthUrl, zenPath 
 25  from Products.ZenWidgets import messaging 
 26   
 27  from interfaces import IJob 
 28  from status import SUCCESS, FAILURE 
 29  from logfile import LogFile 
 30   
31 -class Job(ZenModelRM):
32 33 implements(IJob) 34 35 _relations = ( 36 ("status", ToOne( ToOne, "Products.Jobber.status.JobStatus", "job")), 37 ) 38
39 - def getUid(self):
40 return self.id.split('_')[-1]
41
42 - def getDescription(self):
43 return self.id
44
45 - def getJobType(self):
46 return self.__class__.__name__
47
48 - def getStatus(self):
49 """ 50 @return: The L{JobStatus} associated with this job. 51 @rtype: L{JobStatus} 52 """ 53 return self.status()
54
55 - def interrupt(self, why):
56 """ 57 Halt the job, for whatever reason. 58 59 This will almost certainly be implemented differently in various 60 subclasses. 61 62 @param why: The reason why the build is interrupted 63 @type why: str 64 """ 65 pass
66
67 - def start(self):
68 """ 69 This starts off the job. Returns a Deferred that will fire when the 70 step finishes. 71 """ 72 whendone = defer.Deferred() 73 self._v_deferred = whendone 74 status = self.getStatus() 75 if status is not None: 76 status.jobStarted() 77 d = defer.succeed(None) 78 d.addCallback(self.run) 79 return whendone
80
81 - def run(self, r):
82 """ 83 Should call self.finished(results) when done, where results is one of 84 SUCCESS, FAILURE. 85 """ 86 raise NotImplementedError("Your subclass must implement this method.")
87
88 - def finished(self, results):
89 """ 90 Called to signify the end of the job. 91 """ 92 d = self._v_deferred 93 # Tell the JobStatus we're done 94 status = self.getStatus() 95 if status is not None: 96 status.jobFinished(results) 97 # Call back to the self.start() Deferred 98 d.callback(results)
99 100
101 -class ProcessRunner(ProcessProtocol):
102 log = None
103 - def __init__(self, job, whenComplete):
104 """ 105 Initialization method. Accepts a reference to the relevant Job 106 instance and a defer.Deferred that will be called back. 107 """ 108 self.job = job 109 self.whenComplete = whenComplete
110
111 - def getLog(self):
112 """ 113 Return the log for the status for the job. 114 """ 115 if self.log is None: 116 status = self.job.getStatus() 117 if status is not None: 118 self.log = status.getLog() 119 return self.log
120
121 - def outReceived(self, text):
122 """ 123 Send output to the log file 124 """ 125 log = self.getLog() 126 if log is not None: 127 log.write(text)
128 129 # Send error to same place as other output 130 errReceived = outReceived 131
132 - def processEnded(self, reason):
133 """ 134 We're done. End the job. 135 """ 136 code = 1 137 try: 138 code = reason.value.exitCode 139 except AttributeError: 140 pass 141 if code==0: 142 result = SUCCESS 143 else: 144 result = FAILURE 145 146 if self.log is not None: 147 now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') 148 self.log.msg('Job completed at %s. Result: %s.' % (now, 149 result==SUCCESS and 'success' or 'failure')) 150 self.log.finish() 151 152 self.whenComplete.callback(result)
153 154
155 -class ShellCommandJob(Job):
156 _v_process = None 157 pid = 0
158 - def __init__(self, jobid, cmd):
159 """ 160 Initialization method. 161 162 @param cmd: The command that will be run by the job. 163 @type cmd: list or string 164 """ 165 super(ShellCommandJob, self).__init__(jobid) 166 if isinstance(cmd, basestring): 167 cmd = cmd.split() 168 self.cmd = cmd 169 # We could conceivably want to know the environment in the future, so 170 # keep it around 171 self.environ = os.environ.copy()
172
173 - def getDescription(self):
174 return " ".join(self.cmd)
175
176 - def run(self, r):
177 cmd = self.cmd 178 d = defer.Deferred() 179 protocol = ProcessRunner(self, d) 180 self._v_process = reactor.spawnProcess(protocol, cmd[0], cmd, 181 env=self.environ, usePTY=True) 182 self.pid = self._v_process.pid 183 transaction.commit() 184 d.addBoth(self.finished)
185
186 - def interrupt(self):
187 # If we're still in the reactor, use the PTYProcess. This will probably 188 # never happen. 189 if self._v_process is not None: 190 self._v_process.signalProcess('STOP') 191 # Our only hope is still having a pid 192 elif self.pid: 193 try: os.kill(self.pid, 15) 194 except OSError: pass # We can't do anything else anyway
195 196
197 -class JobMessenger(messaging.MessageSender):
198 """ 199 Adapts IJobs to send messages. Differs from MessageSender insofar as it 200 forces a commit to the database after sending. 201 """
202 - def sendToBrowser(self, *args, **kwargs):
203 super(JobMessenger, self).sendToBrowser(*args, **kwargs) 204 transaction.commit()
205
206 - def sendToUser(self, *args, **kwargs):
207 super(JobMessenger, self).sendToUser(*args, **kwargs) 208 transaction.commit()
209
210 - def sendToAll(self, *args, **kwargs):
211 super(JobMessenger, self).sendToAll(*args, **kwargs) 212 transaction.commit()
213