Package Products :: Package ZenStatus :: Module zenstatus
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.zenstatus

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """zenstatus 
 15   
 16  Check the TCP/IP connectivity of IP services. 
 17  UDP is specifically not supported. 
 18  """ 
 19   
 20  import time 
 21  from sets import Set 
 22   
 23  from twisted.internet import reactor, defer 
 24   
 25  import Globals # make zope imports work 
 26  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
 27  from Products.ZenUtils.Driver import drive, driveLater 
 28  from Products.ZenStatus.ZenTcpClient import ZenTcpClient 
 29  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 30   
 31  # required for pb.setUnjellyableForClass 
 32  from Products.ZenHub.services import StatusConfig 
 33  if 0: 
 34      StatusConfig = None                 # pyflakes 
 35   
36 -class Status:
37 """ 38 Class to track the status of all connection attempts to 39 remote devices. 40 """ 41 _running = 0 42 _fail = 0 43 _success = 0 44 _start = 0 45 _stop = 0 46 _defer = None 47
48 - def __init__(self):
49 self._remaining = []
50
51 - def start(self, jobs):
52 """ 53 Start a scan cycle with the jobs to run. 54 55 @parameter jobs: jobs to run 56 @type jobs: list of job entries 57 @return: Twisted deferred 58 @rtype: Twisted deferred 59 """ 60 self._success = 0 61 self._stop = 0 62 self._fail = 0 63 self._running = 0 64 self._remaining = jobs 65 self._start = time.time() 66 self._defer = defer.Deferred() 67 if not self._remaining: 68 self._stop = time.time() 69 self._defer.callback(self) 70 return self._defer
71
72 - def next(self):
73 """ 74 Start and return the next job that can be scheduled to run. 75 76 @return: job 77 @rtype: Twisted deferred 78 """ 79 job = self._remaining.pop() 80 d = job.start() 81 d.addCallbacks(self.success, self.failure) 82 self._running += 1 83 return d
84
85 - def testStop(self, result):
86 """ 87 Cleanup completed jobs and update stats. 88 89 @parameter result: ignored 90 @type result: Twisted deferred 91 @return: Twisted deferred 92 @rtype: Twisted deferred 93 """ 94 self._running -= 1 95 if self.done(): 96 self._stop = time.time() 97 self._defer, d = None, self._defer 98 d.callback(self) 99 return result
100
101 - def success(self, result):
102 """ 103 Record a successful job. 104 105 @parameter result: ignored 106 @type result: Twisted deferred 107 @return: Twisted deferred 108 @rtype: Twisted deferred 109 """ 110 self._success += 1 111 return self.testStop(result)
112
113 - def failure(self, result):
114 """ 115 Record a failed job. 116 117 @parameter result: ignored 118 @type result: Twisted deferred 119 @return: Twisted deferred 120 @rtype: Twisted deferred 121 """ 122 self._failure += 1 123 return self.testStop(result)
124
125 - def done(self):
126 """ 127 Are we done yet? 128 129 @return: is there anything left to do? 130 @rtype: boolean 131 """ 132 return self._running == 0 and not self._remaining
133
134 - def stats(self):
135 """ 136 Report on the number of remaining, running, 137 successful and failed jobs. 138 139 @return: counts of job status 140 @rtype: tuple of ints 141 """ 142 return (len(self._remaining), 143 self._running, 144 self._success, 145 self._fail)
146
147 - def duration(self):
148 """ 149 Total time that the daemon has been running jobs 150 this scan cycle. 151 """ 152 if self.done(): 153 return self._stop - self._start 154 return time.time() - self._start
155 156
157 -class ZenStatus(PBDaemon):
158 """ 159 Daemon class to attach to zenhub and pass along 160 device configuration information. 161 """ 162 name = agent = "zenstatus" 163 initialServices = ['EventService', 'StatusConfig'] 164 statusCycleInterval = 300 165 configCycleInterval = 20 166 properties = ('statusCycleInterval', 'configCycleInterval') 167 reconfigureTimeout = None 168
169 - def __init__(self):
170 PBDaemon.__init__(self, keeproot=True) 171 self.clients = {} 172 self.counts = {} 173 self.status = Status()
174
175 - def configService(self):
176 """ 177 Return a connection to a status service. 178 179 @return: service to gather configuration 180 @rtype: Twisted deferred 181 """ 182 return self.services.get('StatusConfig', FakeRemote())
183
184 - def startScan(self, ignored=None):
185 """ 186 Start gathering status information, taking care of the case where 187 we are invoked as a daemon or for a single run. 188 """ 189 d = drive(self.scanCycle) 190 if not self.options.cycle: 191 d.addBoth(lambda unused: self.stop())
192
193 - def connected(self):
194 """ 195 Gather our configuration and start collecting status information. 196 Called after connected to the zenhub service. 197 """ 198 d = drive(self.configCycle) 199 d.addCallbacks(self.startScan, self.configError)
200
201 - def configError(self, why):
202 """ 203 Log errors that have occurred gathering our configuration 204 205 @param why: error message 206 @type why: Twisted error instance 207 """ 208 self.log.error(why.getErrorMessage()) 209 self.stop()
210
212 """ 213 Procedure called from zenhub to get us to re-gather all 214 of our configuration. 215 """ 216 self.log.debug("Notification of config change from zenhub") 217 if self.reconfigureTimeout and not self.reconfigureTimeout.called: 218 self.reconfigureTimeout.cancel() 219 self.reconfigureTimeout = reactor.callLater( 220 self.statusCycleInterval/2, drive, self.reconfigure)
221
222 - def remote_setPropertyItems(self, items):
223 """ 224 Procedure called from zenhub to pass in new properties. 225 226 @parameter items: items to update 227 @type items: list 228 """ 229 self.log.debug("Update of collection properties from zenhub") 230 self.setPropertyItems(items)
231
232 - def setPropertyItems(self, items):
233 """ 234 Extract configuration elements used by this server. 235 236 @parameter items: items to update 237 @type items: list 238 """ 239 table = dict(items) 240 for name in self.properties: 241 value = table.get(name, None) 242 if value is not None: 243 if getattr(self, name) != value: 244 self.log.debug('Updated %s config to %s' % (name, value)) 245 setattr(self, name, value)
246
247 - def remote_deleteDevice(self, device):
248 """ 249 Remove any devices that zenhub tells us no longer exist. 250 251 @parameter device: name of device to delete 252 @type device: string 253 """ 254 self.ipservices = [s for s in self.ipservices if s.cfg.device != device]
255
256 - def configCycle(self, driver):
257 """ 258 Get our configuration from zenhub 259 260 @parameter driver: object 261 @type driver: Twisted deferred object 262 """ 263 now = time.time() 264 self.log.info("Fetching property items") 265 yield self.configService().callRemote('propertyItems') 266 self.setPropertyItems(driver.next()) 267 268 self.log.info("Fetching default RRDCreateCommand") 269 yield self.configService().callRemote('getDefaultRRDCreateCommand') 270 createCommand = driver.next() 271 272 self.log.info("Getting threshold classes") 273 yield self.configService().callRemote('getThresholdClasses') 274 self.remote_updateThresholdClasses(driver.next()) 275 276 self.log.info("Getting collector thresholds") 277 yield self.configService().callRemote('getCollectorThresholds') 278 self.rrdStats.config(self.options.monitor, self.name, driver.next(), 279 createCommand) 280 281 d = driveLater(self.configCycleInterval * 60, self.configCycle) 282 d.addErrback(self.error) 283 284 yield drive(self.reconfigure) 285 driver.next() 286 287 self.rrdStats.gauge('configTime', 288 self.configCycleInterval * 60, 289 time.time() - now)
290
291 - def reconfigure(self, driver):
292 """ 293 Contact zenhub and gather our configuration again. 294 295 @parameter driver: object 296 @type driver: Twisted deferred object 297 """ 298 self.log.debug("Getting service status") 299 yield self.configService().callRemote('serviceStatus') 300 self.counts = {} 301 for (device, component), count in driver.next(): 302 self.counts[device, component] = count 303 304 self.log.debug("Getting services for %s", self.options.configpath) 305 yield self.configService().callRemote('services', 306 self.options.configpath) 307 self.ipservices = [] 308 for s in driver.next(): 309 if self.options.device and s.device != self.options.device: 310 continue 311 count = self.counts.get((s.device, s.component), 0) 312 self.ipservices.append(ZenTcpClient(s, count)) 313 self.log.debug("ZenStatus configured with %d checks", 314 len(self.ipservices))
315 316
317 - def error(self, why):
318 """ 319 Log errors that have occurred 320 321 @param why: error message 322 @type why: Twisted error instance 323 """ 324 self.log.error(why.getErrorMessage())
325
326 - def scanCycle(self, driver):
327 """ 328 Go through all devices and start determining the status of each 329 TCP service. 330 331 @parameter driver: object 332 @type driver: Twisted deferred object 333 """ 334 d = driveLater(self.statusCycleInterval, self.scanCycle) 335 d.addErrback(self.error) 336 337 if not self.status.done(): 338 duration = self.status.duration() 339 self.log.warning("Scan cycle not complete in %.2f seconds", 340 duration) 341 if duration < self.statusCycleInterval * 2: 342 self.log.warning("Waiting for the cycle to complete") 343 return 344 self.log.warning("Restarting jobs for another cycle") 345 346 self.log.debug("Getting down devices") 347 yield self.eventService().callRemote('getDevicePingIssues') 348 ignored = Set([s[0] for s in driver.next()]) 349 350 self.log.debug("Starting scan") 351 d = self.status.start([i for i in self.ipservices 352 if i.cfg.device not in ignored]) 353 self.log.debug("Running jobs") 354 self.runSomeJobs() 355 yield d 356 driver.next() 357 self.log.debug("Scan complete") 358 self.heartbeat()
359
360 - def heartbeat(self):
361 """ 362 Twisted keep-alive mechanism to ensure that 363 we're still connected to zenhub. 364 """ 365 _, _, success, fail = self.status.stats() 366 self.log.info("Finished %d jobs (%d good, %d bad) in %.2f seconds", 367 (success + fail), success, fail, self.status.duration()) 368 if not self.options.cycle: 369 self.stop() 370 return 371 heartbeatevt = dict(eventClass=Heartbeat, 372 component=self.name, 373 device=self.options.monitor) 374 self.sendEvent(heartbeatevt, timeout=self.statusCycleInterval*3) 375 self.niceDoggie(self.statusCycleInterval) 376 for ev in (self.rrdStats.gauge('cycleTime', 377 self.statusCycleInterval, 378 self.status.duration()) + 379 self.rrdStats.gauge('success', 380 self.statusCycleInterval, 381 success) + 382 self.rrdStats.gauge('failed', 383 self.statusCycleInterval, 384 fail)): 385 self.sendEvent(ev)
386 387
388 - def runSomeJobs(self):
389 """ 390 Run IP service tests with the maximum parallelization 391 allowed. 392 """ 393 while 1: 394 left, running, good, bad = self.status.stats() 395 if not left or running >= self.options.parallel: 396 break 397 self.log.debug("Status: left %d running %d good %d bad %d", 398 left, running, good, bad) 399 d = self.status.next() 400 d.addCallbacks(self.processTest, self.processError) 401 self.log.debug("Started job")
402
403 - def processTest(self, job):
404 """ 405 Test a connection to a device. 406 407 @parameter job: device and TCP service to test 408 @type job: ZenTcpClient object 409 """ 410 key = job.cfg.device, job.cfg.component 411 evt = job.getEvent() 412 if evt: 413 self.sendEvent(evt) 414 self.counts.setdefault(key, 0) 415 self.counts[key] += 1 416 else: 417 if key in self.counts: 418 # TODO: Explain why we care about resetting the count 419 del self.counts[key]
420
421 - def processError(self, error):
422 """ 423 Log errors that have occurred from testing TCP services 424 425 @param error: error message 426 @type error: Twisted error instance 427 """ 428 self.log.warn(error.getErrorMessage())
429
430 - def buildOptions(self):
431 """ 432 Build our list of command-line options 433 """ 434 PBDaemon.buildOptions(self) 435 self.parser.add_option('--configpath', 436 dest='configpath', 437 default="/Devices/Server", 438 help="Path to our monitor config ie: /Devices/Server") 439 self.parser.add_option('--parallel', 440 dest='parallel', 441 type='int', 442 default=50, 443 help="Number of devices to collect at one time") 444 self.parser.add_option('--cycletime', 445 dest='cycletime', 446 type="int", 447 default=60, 448 help="Check events every cycletime seconds") 449 self.parser.add_option('-d', '--device', dest='device', 450 help="Device's DMD name ie www.example.com")
451 452 if __name__=='__main__': 453 pm = ZenStatus() 454 pm.run() 455