Package ZenStatus :: Module zenstatus
[hide private]
[frames] | no frames]

Source Code for Module ZenStatus.zenstatus

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import time 
 15   
 16  from twisted.internet import reactor, defer 
 17   
 18  import Globals # make zope imports work 
 19  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
 20  from Products.ZenUtils.Driver import drive, driveLater 
 21  from Products.ZenStatus.ZenTcpClient import ZenTcpClient 
 22  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 23   
 24  # required for pb.setUnjellyableForClass  
 25  from Products.ZenHub.services import StatusConfig 
 26  if 0: 
 27      StatusConfig = None                 # pyflakes 
 28   
 29  from sets import Set 
 30   
31 -class Status:
32 _running = 0 33 _fail = 0 34 _success = 0 35 _start = 0 36 _stop = 0 37 _defer = None 38
39 - def __init__(self):
40 self._remaining = []
41
42 - def start(self, jobs):
43 self._success = 0 44 self._stop = 0 45 self._fail = 0 46 self._running = 0 47 self._remaining = jobs 48 self._start = time.time() 49 self._defer = defer.Deferred() 50 if not self._remaining: 51 self._stop = time.time() 52 self._defer.callback(self) 53 return self._defer
54
55 - def next(self):
56 j = self._remaining.pop() 57 d = j.start() 58 d.addCallbacks(self.success, self.failure) 59 self._running += 1 60 return d
61
62 - def testStop(self, result):
63 self._running -= 1 64 if self.done(): 65 self._stop = time.time() 66 self._defer, d = None, self._defer 67 d.callback(self) 68 return result
69
70 - def success(self, result):
71 self._success += 1 72 return self.testStop(result)
73
74 - def failure(self, result):
75 self._failure += 1 76 return self.testStop(result)
77
78 - def done(self):
79 return self._running == 0 and not self._remaining
80
81 - def stats(self):
82 return (len(self._remaining), 83 self._running, 84 self._success, 85 self._fail)
86
87 - def duration(self):
88 if self.done(): 89 return self._stop - self._start 90 return time.time() - self._start
91 92
93 -class ZenStatus(PBDaemon):
94 95 name = agent = "zenstatus" 96 initialServices = ['EventService', 'StatusConfig'] 97 statusCycleInterval = 300 98 configCycleInterval = 20 99 properties = ('statusCycleInterval', 'configCycleInterval') 100 reconfigureTimeout = None 101
102 - def __init__(self):
103 PBDaemon.__init__(self, keeproot=True) 104 self.clients = {} 105 self.counts = {} 106 self.status = Status()
107
108 - def configService(self):
109 return self.services.get('StatusConfig', FakeRemote())
110
111 - def startScan(self, ignored=None):
112 d = drive(self.scanCycle) 113 if not self.options.cycle: 114 d.addBoth(lambda unused: self.stop())
115
116 - def connected(self):
117 d = drive(self.configCycle) 118 d.addCallbacks(self.startScan, self.configError)
119
120 - def configError(self, why):
121 self.log.error(why.getErrorMessage()) 122 self.stop()
123
125 self.log.debug("Notification of config change from zenhub") 126 if self.reconfigureTimeout and not self.reconfigureTimeout.called: 127 self.reconfigureTimeout.cancel() 128 self.reconfigureTimeout = reactor.callLater( 129 self.statusCycleInterval/2, drive, self.reconfigure)
130
131 - def remote_setPropertyItems(self, items):
132 self.log.debug("Async update of collection properties") 133 self.setPropertyItems(items)
134
135 - def setPropertyItems(self, items):
136 'extract configuration elements used by this server' 137 table = dict(items) 138 for name in self.properties: 139 value = table.get(name, None) 140 if value is not None: 141 if getattr(self, name) != value: 142 self.log.debug('Updated %s config to %s' % (name, value)) 143 setattr(self, name, value)
144
145 - def remote_deleteDevice(self, device):
146 self.ipservices = [s for s in self.ipservices if s.cfg.device != device]
147
148 - def configCycle(self, driver):
149 now = time.time() 150 self.log.info("fetching property items") 151 yield self.configService().callRemote('propertyItems') 152 self.setPropertyItems(driver.next()) 153 154 self.log.info("fetching default RRDCreateCommand") 155 yield self.configService().callRemote('getDefaultRRDCreateCommand') 156 createCommand = driver.next() 157 158 self.log.info("getting threshold classes") 159 yield self.configService().callRemote('getThresholdClasses') 160 self.remote_updateThresholdClasses(driver.next()) 161 162 self.log.info("getting collector thresholds") 163 yield self.configService().callRemote('getCollectorThresholds') 164 self.rrdStats.config(self.options.monitor, self.name, driver.next(), 165 createCommand) 166 167 d = driveLater(self.configCycleInterval * 60, self.configCycle) 168 d.addErrback(self.error) 169 170 yield drive(self.reconfigure) 171 driver.next() 172 173 self.rrdStats.gauge('configTime', 174 self.configCycleInterval * 60, 175 time.time() - now)
176
177 - def reconfigure(self, driver):
178 self.log.debug("Getting service status") 179 yield self.configService().callRemote('serviceStatus') 180 self.counts = {} 181 for (device, component), count in driver.next(): 182 self.counts[device, component] = count 183 184 self.log.debug("Getting services") 185 yield self.configService().callRemote('services', 186 self.options.configpath) 187 self.ipservices = [] 188 for s in driver.next(): 189 count = self.counts.get((s.device, s.component), 0) 190 self.ipservices.append(ZenTcpClient(s, count)) 191 self.log.debug("ZenStatus configured")
192 193
194 - def error(self, why):
195 self.log.error(why.getErrorMessage())
196
197 - def scanCycle(self, driver):
198 d = driveLater(self.statusCycleInterval, self.scanCycle) 199 d.addErrback(self.error) 200 201 if not self.status.done(): 202 duration = self.status.duration() 203 self.log.warning("Scan cycle not complete in %.2f seconds", 204 duration) 205 if duration < self.statusCycleInterval * 2: 206 self.log.warning("Waiting for the cycle to complete") 207 return 208 self.log.warning("Ditching this cycle") 209 210 self.log.debug("Getting down devices") 211 yield self.eventService().callRemote('getDevicePingIssues') 212 ignored = Set([s[0] for s in driver.next()]) 213 214 self.log.debug("Starting scan") 215 d = self.status.start([i for i in self.ipservices 216 if i.cfg.device not in ignored]) 217 self.log.debug("Running jobs") 218 self.runSomeJobs() 219 yield d 220 driver.next() 221 self.log.debug("Scan complete") 222 self.heartbeat()
223
224 - def heartbeat(self):
225 _, _, success, fail = self.status.stats() 226 self.log.info("Finished %d jobs (%d good, %d bad) in %.2f seconds", 227 (success + fail), success, fail, self.status.duration()) 228 if not self.options.cycle: 229 self.stop() 230 return 231 heartbeatevt = dict(eventClass=Heartbeat, 232 component='zenstatus', 233 device=self.options.monitor) 234 self.sendEvent(heartbeatevt, timeout=self.statusCycleInterval*3) 235 self.niceDoggie(self.statusCycleInterval) 236 for ev in (self.rrdStats.gauge('cycleTime', 237 self.statusCycleInterval, 238 self.status.duration()) + 239 self.rrdStats.gauge('success', 240 self.statusCycleInterval, 241 success) + 242 self.rrdStats.gauge('failed', 243 self.statusCycleInterval, 244 fail)): 245 self.sendEvent(ev)
246 247
248 - def runSomeJobs(self):
249 while 1: 250 left, running, good, bad = self.status.stats() 251 self.log.debug("Status: left %d running %d good %d bad %d", 252 left, running, good, bad) 253 if not left or running >= self.options.parallel: 254 break 255 d = self.status.next() 256 d.addCallbacks(self.processTest, self.processError) 257 self.log.debug("Started job")
258
259 - def processTest(self, job):
260 self.runSomeJobs() 261 key = job.cfg.device, job.cfg.component 262 evt = job.getEvent() 263 if evt: 264 self.sendEvent(evt) 265 self.counts.setdefault(key, 0) 266 self.counts[key] += 1 267 else: 268 if key in self.counts: 269 del self.counts[key]
270
271 - def processError(self, error):
272 self.log.warn(error.getErrorMessage())
273
274 - def buildOptions(self):
275 PBDaemon.buildOptions(self) 276 p = self.parser 277 p.add_option('--configpath', 278 dest='configpath', 279 default="/Devices/Server", 280 help="path to our monitor config ie: /Devices/Server") 281 p.add_option('--parallel', 282 dest='parallel', 283 type='int', 284 default=50, 285 help="number of devices to collect at one time") 286 p.add_option('--cycletime', 287 dest='cycletime', 288 type="int", 289 default=60, 290 help="check events every cycletime seconds")
291 292 293 if __name__=='__main__': 294 pm = ZenStatus() 295 pm.run() 296