Package Products :: Package ZenRRD :: Module zenperfsnmp
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenperfsnmp

   1  #! /usr/bin/env python  
   2  ########################################################################### 
   3  # 
   4  # This program is part of Zenoss Core, an open source monitoring platform. 
   5  # Copyright (C) 2007, Zenoss Inc. 
   6  # 
   7  # This program is free software; you can redistribute it and/or modify it 
   8  # under the terms of the GNU General Public License version 2 as published by 
   9  # the Free Software Foundation. 
  10  # 
  11  # For complete information please visit: http://www.zenoss.com/oss/ 
  12  # 
  13  ########################################################################### 
  14   
  15  __doc__="""zenperfsnmp 
  16   
  17  Gets SNMP performance data and stores it in RRD files. 
  18   
  19  """ 
  20   
  21  import os 
  22  import time 
  23  import logging 
  24  log = logging.getLogger("zen.zenperfsnmp") 
  25   
  26  import copy 
  27  from sets import Set 
  28  import cPickle 
  29   
  30  from twisted.internet import reactor, defer, error 
  31  from twisted.python import failure 
  32   
  33  import Globals 
  34  from Products.ZenUtils.Utils import unused 
  35  from Products.ZenUtils.Chain import Chain 
  36  from Products.ZenUtils.Driver import drive, driveLater 
  37  from Products.ZenModel.PerformanceConf import performancePath 
  38  from Products.ZenEvents import Event 
  39  from Products.ZenEvents.ZenEventClasses \ 
  40       import Perf_Snmp, Status_Snmp, Status_Perf 
  41  from Products.ZenEvents.ZenEventClasses import Critical, Clear 
  42   
  43  from Products.ZenRRD.RRDUtil import RRDUtil 
  44  from SnmpDaemon import SnmpDaemon 
  45   
  46  from FileCleanup import FileCleanup 
  47   
  48  # PerformanceConfig import needed to get pb to work 
  49  from Products.ZenHub.services.PerformanceConfig import PerformanceConfig 
  50  unused(PerformanceConfig) 
  51   
  52  MAX_OIDS_PER_REQUEST = 40 
  53  MAX_SNMP_REQUESTS = 20 
  54  DEVICE_LOAD_CHUNK_SIZE = 20 
  55  CYCLES_TO_WAIT_FOR_RESPONSE = 2 
  56   
57 -def makeDirs(dir):
58 """ 59 Wrapper around makedirs that sanity checks before running 60 """ 61 if os.path.exists(dir): 62 return 63 64 try: 65 os.makedirs(dir, 0750) 66 except Exception, ex: 67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68 69
70 -def read(fname):
71 """ 72 Wrapper around the standard function to open a file and read its contents 73 """ 74 if os.path.exists(fname): 75 fp = file(fname, 'rb') 76 try: 77 return fp.read() 78 finally: 79 fp.close() 80 return ''
81 82
83 -def write(fname, data):
84 """ 85 Wrapper around the standard function to open a file and write data 86 """ 87 makeDirs(os.path.dirname(fname)) 88 89 try: 90 fp = open(fname, 'wb') 91 try: 92 fp.write(data) 93 finally: 94 fp.close() 95 96 except Exception, ex: 97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98 99 106
107 -def chunk(lst, n):
108 """ 109 Break lst into n-sized chunks 110 """ 111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112 113 try: 114 sorted = sorted # added in python 2.4 115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """ 118 Keep things sane in a pre-python 2.4 environment 119 """ 120 lst.sort(*args, **kw) 121 return lst
122
123 -def firsts(lst):
124 """ 125 The first element of every item in a sequence 126 """ 127 return [item[0] for item in lst]
128
129 -def checkException(alog, function, *args, **kw):
130 """ 131 Execute the function with arguments and keywords. 132 If there is an exception, log it using the given 133 logging function 'alog'. 134 """ 135 try: 136 return function(*args, **kw) 137 except Exception, ex: 138 alog.exception(ex) 139 raise ex
140 141 142 143 from twisted.spread import pb
144 -class SnmpConfig(pb.Copyable, pb.RemoteCopy):
145 """ 146 A class to transfer the SNMP collection data to zenperfsnmp 147 """ 148 149 lastChangeTime = 0. 150 device = '' 151 connInfo = None 152 thresholds = [] 153 oids = []
154 155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig) 156 157
158 -class Status:
159 """ 160 Keep track of the status of many parallel requests 161 """ 162
163 - def __init__(self, daemon):
164 """ 165 Initializer 166 """ 167 self.daemon = daemon 168 self.reset()
169
170 - def reset(self):
171 """ 172 Reset instance variables to intial values 173 """ 174 175 # Number of requests issued this cycle that succeeded 176 self._numSucceeded = 0 177 178 # Number of requests issued prior to this cycle that responded 179 # successfully this cycle 180 self._numPrevSucceeded = 0 181 182 # Number of requests issued this cycle that failed 183 self._numFailed = 0 184 185 # Number of requests issues prior to this cycle that responded 186 # unsuccessfully this cycle 187 self._numPrevFailed = 0 188 189 # timestamp when this cycle was started 190 self._startTime = 0 191 192 # timestamp when this cycle completed 193 self._stopTime = 0 194 195 # This deferred gets triggered when this cycle completes. 196 self._deferred = defer.Deferred() 197 198 # Set of device names that should be queried this cycle 199 self._devicesToQueryThisCycle = Set() 200 201 # Names of devices with outstanding queries when this cycle was 202 # started. Values are the timestamp of when the query was made. 203 self._prevQueriesAndAges = {} 204 205 # Set of device names still to be queried this cycle. Initially 206 # this is the same as _devicesToQueryThisCycle but devices are 207 # popped from it as they are queried 208 self._queue = Set() 209 210 # Set of devices names that have reported back this cycle. The query 211 # may have been from this cycle or a previous cycle 212 self._reported = Set()
213 214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """ 217 Record our start time, and return a deferred for our devices 218 219 @type devicesToQuery: iterable 220 @param devicesToQuery: names of devices to poll 221 @type prevQueriesAndAges: dict 222 @param prevQueriesAndAges: devices with outstanding quests 223 @return: deferred 224 """ 225 self.reset() 226 self._startTime = time.time() 227 228 # If there is an outstanding request for a device we don't want to 229 # issue another 230 self._devicesToQueryThisCycle = \ 231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys()) 232 self._prevQueriesAndAges = prevQueriesAndAges 233 self._queue = copy.copy(self._devicesToQueryThisCycle) 234 self._checkFinished() # count could be zero 235 return self._deferred
236 237
238 - def record(self, name, success):
239 """ 240 Record success or failure 241 242 @type name: string 243 @param name: name of device reporting results 244 @type success: boolean 245 @param success: True if query succeeded, False otherwise. 246 """ 247 if name in self._reported: 248 log.error("Device %s is reporting more than once", name) 249 return 250 self._reported.add(name) 251 if name in self._devicesToQueryThisCycle: 252 if success: 253 self._numSucceeded += 1 254 else: 255 self._numFailed += 1 256 self._checkFinished() 257 elif name in self._prevQueriesAndAges: 258 if success: 259 self._numPrevSucceeded += 1 260 else: 261 self._numPrevFailed += 1 262 else: 263 log.debug('Unrecognized device reporting: %s' % name)
264
265 - def _checkFinished(self):
266 """ 267 Determine the stopping point and log our current stats 268 """ 269 if self.finished(): 270 self._stopTime = time.time() 271 if not self._deferred.called: 272 self._deferred.callback(self) 273 self.daemon.heartbeat() 274 info = self.stats() 275 log.info( 276 'success:%d ' % info['numSucceeded'] + 277 'fail:%d ' % info['numFailed'] + 278 'pending:%d ' % info['numInProcess'] + 279 'todo:%d ' % info['queueSize'])
280 281
282 - def finished(self):
283 """ 284 Determine if we have finished, disregarding devices that were queried 285 in a previous cycle and still haven't reported back. 286 """ 287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
289 - def stats(self):
290 """ 291 Return a dictionary with stats for this cycle: 292 numSucceeded - queries made this cycle and reported back success 293 numPrevSucceeded - queries from a prev cycle reported back success 294 numFailed - queries made this cycle reported back failure 295 numPrevFailed - queries made prev cycle reported back failure 296 startTime - timestamp when this cycle started 297 stopTime - timestamp when this cycle stopped 298 age - time cycle took to run or current age (if still running) 299 queueSize - num of devices not queried yet 300 numInProcess - num queried this cycle not reported back yet 301 numPrevInProcess - num queried prev cycle still not reported back 302 numReported - number reported back from this or previous cycles 303 """ 304 return dict( 305 numSucceeded = self._numSucceeded, 306 numPrevSucceeded = self._numPrevSucceeded, 307 numFailed = self._numFailed, 308 numPrevFailed = self._numPrevFailed, 309 startTime = self._startTime, 310 stopTime = self._stopTime, 311 age = self._stopTime and (self._stopTime - self._startTime) \ 312 or (time.time() - self._startTime), 313 queueSize = len(self._queue), 314 numInProcess = len(self.inProcess()), 315 numPrevInProcess = len(self.prevInProcess()), 316 numReported = len(self._reported) 317 )
318
319 - def inProcess(self):
320 """ 321 Return the name of the devices that have been queried this cycle 322 but from whom no response has been received. 323 """ 324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
326 - def prevInProcess(self):
327 """ 328 Return the names of the devices that were queried prior to this cycle 329 and have not yet reported. 330 """ 331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
333 - def inQueue(self):
334 """ 335 Return the names of the devices that have yet to be queried. 336 """ 337 return self._queue
338
339 - def popDevice(self):
340 """ 341 Pop a device to be queried from the queue 342 """ 343 return self._queue.pop()
344
345 - def getQueryAges(self):
346 """ 347 Return a dictionary with the device and age of each query from this 348 or previous cycles that has not yet responeded. 349 """ 350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items() 351 if d not in self._reported]) 352 waiting.update(dict([(d, self._startTime) 353 for d in self.inProcess()])) 354 return waiting
355 356
357 -class SnmpStatus:
358 """ 359 Track and report SNMP status failures 360 """ 361 362 snmpStatusEvent = {'eventClass': Status_Snmp, 363 'component': 'snmp', 364 'eventGroup': 'SnmpTest'} 365 366
367 - def __init__(self, snmpState):
368 """ 369 Initializer 370 """ 371 self.count = snmpState
372 373
374 - def updateStatus(self, deviceName, success, eventCb):
375 """ 376 Send up/down events based on SNMP results 377 """ 378 if success: 379 if self.count > 0: 380 summary='SNMP agent up' 381 eventCb(self.snmpStatusEvent, 382 device=deviceName, summary=summary, 383 severity=Event.Clear) 384 log.info("%s %s" % (deviceName, summary)) 385 self.count = 0 386 else: 387 summary='SNMP agent down' 388 eventCb(self.snmpStatusEvent, 389 device=deviceName, summary=summary, 390 severity=Event.Error) 391 log.warn("%s %s" % (deviceName, summary)) 392 self.count += 1
393 394 395
396 -class OidData:
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
398 """ 399 Container for these paramaters 400 """ 401 self.name = name 402 self.path = path 403 self.dataStorageType = dataStorageType 404 self.rrdCreateCommand = rrdCreateCommand 405 self.minmax = minmax
406
407 -class zenperfsnmp(SnmpDaemon):
408 """ 409 Periodically query all devices for SNMP values to archive in RRD files 410 """ 411 412 # these names need to match the property values in PerformanceMonitorConf 413 maxRrdFileAge = 30 * (24*60*60) # seconds 414 perfsnmpConfigInterval = 20*60 415 perfsnmpCycleInterval = 5*60 416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',) 417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig'] 418
419 - def __init__(self, noopts=0):
420 """ 421 Create any base performance directories (if necessary), 422 load cached configuration data and clean up any old RRD files 423 (if specified by --checkAgingFiles) 424 """ 425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts) 426 self.status = None 427 self.proxies = {} 428 self.unresponsiveDevices = Set() 429 self.snmpOidsRequested = 0 430 431 self.log.info( "Initializing daemon..." ) 432 433 perfRoot = performancePath('') 434 makeDirs(perfRoot) 435 436 if self.options.cacheconfigs: 437 self.loadConfigs() 438 439 self.oldFiles = Set() 440 441 # report on files older than a day 442 if self.options.checkagingfiles: 443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$', 444 24 * 60 * 60, 445 frequency=60) 446 self.oldCheck.process = self.reportOldFile 447 self.oldCheck.start() 448 449 # remove files older than maxRrdFileAge 450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$', 451 self.maxRrdFileAge, 452 frequency=90*60) 453 self.fileCleanup.process = self.cleanup 454 self.fileCleanup.start()
455 456
457 - def pickleName(self, id):
458 """ 459 Return the path to the pickle file for a device 460 """ 461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462 463 464
465 - def loadConfigs(self):
466 """ 467 Read cached configuration values from pickle files at startup. 468 469 NB: We cache in pickles to get a full collect cycle, because 470 loading the initial config can take several minutes. 471 """ 472 self.log.info( "Gathering cached configuration information" ) 473 474 base = performancePath('Devices') 475 makeDirs(base) 476 root, ds, fs = os.walk(base).next() 477 for d in ds: 478 pickle_name= self.pickleName(d) 479 config = read( pickle_name ) 480 if config: 481 try: 482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name ) 483 data= cPickle.loads(config) 484 self.updateDeviceConfig( data ) 485 486 except Exception, ex: 487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) ) 488 try: 489 os.unlink( pickle_name ) 490 except Exception, ex: 491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492 493 494
495 - def cleanup(self, fullPath):
496 """ 497 Delete an old RRD file 498 """ 499 self.log.warning("Deleting old RRD file: %s", fullPath) 500 os.unlink(fullPath) 501 self.oldFiles.discard(fullPath)
502 503
504 - def reportOldFile(self, fullPath):
505 """ 506 Add an RRD file to the list of files to be removed 507 """ 508 self.oldFiles.add(fullPath)
509
510 - def remote_updateDeviceList(self, devices):
511 """ 512 Gather the list of devices from zenhub, update all devices config 513 in the list of devices, and remove any devices that we know about, 514 but zenhub doesn't know about. 515 516 NB: This is callable from within zenhub. 517 """ 518 SnmpDaemon.remote_updateDeviceList(self, devices) 519 # NB: Anything not explicitly sent by zenhub should be deleted 520 survivors = [] 521 doomed = Set(self.proxies.keys()) 522 for device, lastChange in devices: 523 doomed.discard(device) 524 proxy = self.proxies.get(device) 525 if not proxy or proxy.lastChange < lastChange: 526 survivors.append(device) 527 528 log.info("Deleting %s", doomed) 529 for d in doomed: 530 del self.proxies[d] 531 532 if survivors: 533 log.info("Fetching configs: %s", survivors) 534 d = self.model().callRemote('getDevices', survivors) 535 d.addCallback(self.updateDeviceList, survivors) 536 d.addErrback(self.error)
537 538 539
540 - def startUpdateConfig(self, driver):
541 """ 542 Periodically ask the Zope server for basic configuration data. 543 """ 544 545 now = time.time() 546 547 log.info("Fetching property items...") 548 yield self.model().callRemote('propertyItems') 549 self.setPropertyItems(driver.next()) 550 551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig) 552 553 log.info("Getting threshold classes...") 554 yield self.model().callRemote('getThresholdClasses') 555 self.remote_updateThresholdClasses(driver.next()) 556 557 log.info("Checking for outdated configs...") 558 current = [(k, v.lastChange) for k, v in self.proxies.items()] 559 yield self.model().callRemote('getDeviceUpdates', current) 560 561 devices = driver.next() 562 if self.options.device: 563 devices = [self.options.device] 564 565 log.info("Fetching configs for %s", repr(devices)[0:800]+'...') 566 yield self.model().callRemote('getDevices', devices) 567 updatedDevices = driver.next() 568 569 log.info("Fetching default RRDCreateCommand...") 570 yield self.model().callRemote('getDefaultRRDCreateCommand') 571 createCommand = driver.next() 572 573 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval) 574 575 log.info( "Getting collector thresholds..." ) 576 yield self.model().callRemote('getCollectorThresholds') 577 self.rrdStats.config(self.options.monitor, self.name, driver.next(), 578 createCommand) 579 580 log.info("Fetching SNMP status...") 581 yield self.model().callRemote('getSnmpStatus', self.options.device) 582 self.updateSnmpStatus(driver.next()) 583 584 # Kick off the device load 585 log.info("Initiating incremental device load") 586 if self.options.cycle: 587 d = self.updateDeviceList(updatedDevices, devices) 588 def report(result): 589 """ 590 Twisted deferred errBack to check for errors 591 """ 592 if result: 593 log.error("Error loading devices: %s", result)
594 d.addBoth(report) 595 else: 596 #if not in cycle mode wait for the devices to load before collecting 597 yield self.updateDeviceList(updatedDevices, devices) 598 driver.next() 599 600 self.sendEvents(self.rrdStats.gauge('configTime', 601 self.configCycleInterval * 60, 602 time.time() - now))
603 604
605 - def updateDeviceList(self, responses, requested):
606 """ 607 Update the config for devices 608 """ 609 610 def fetchDevices(driver): 611 """ 612 An iterable to go over the list of devices 613 """ 614 deviceNames = Set() 615 length = len(responses) 616 log.debug("Fetching configs for %d devices", length) 617 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE): 618 log.debug("Fetching config for %s", devices) 619 yield self.model().callRemote('getDeviceConfigs', devices) 620 try: 621 for response in driver.next(): 622 self.updateDeviceConfig(response) 623 except Exception, ex: 624 log.warning("Error loading config for devices %s" % devices) 625 for d in devices: 626 deviceNames.add(d) 627 log.debug("Finished fetching configs for %d devices", length) 628 629 # stop collecting those no longer in the list 630 doomed = Set(requested) - deviceNames 631 if self.options.device: 632 self.log.debug('Gathering performance data for %s ' % 633 self.options.device) 634 doomed = Set(self.proxies.keys()) 635 doomed.discard(self.options.device) 636 for name in doomed: 637 self.log.info('Removing device %s' % name) 638 if name in self.proxies: 639 del self.proxies[name] 640 641 # Just in case, delete any pickle files that might exist 642 config = self.pickleName(name) 643 unlink(config) 644 # we could delete the RRD files, too 645 646 ips = Set() 647 for name, proxy in self.proxies.items(): 648 if proxy.snmpConnInfo.manageIp in ips: 649 log.warning("Warning: device %s has a duplicate address %s", 650 name, proxy.snmpConnInfo.manageIp) 651 ips.add(proxy.snmpConnInfo.manageIp) 652 self.log.info('Configured %d of %d devices', 653 len(deviceNames), len(self.proxies)) 654 yield defer.succeed(None)
655 return drive(fetchDevices) 656 657 658
659 - def updateAgentProxy(self, deviceName, snmpConnInfo):
660 """ 661 Create or update proxy 662 663 @parameter deviceName: device name known by zenhub 664 @type deviceName: string 665 @parameter snmpConnInfo: object information passed from zenhub 666 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py 667 @return: connection information from the proxy 668 @rtype: SnmpConnInfo class 669 """ 670 p = self.proxies.get(deviceName, None) 671 if not p: 672 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 673 allowCache=True) 674 p.oidMap = {} 675 p.snmpStatus = SnmpStatus(0) 676 p.singleOidMode = False 677 p.lastChange = 0 678 679 if p.snmpConnInfo != snmpConnInfo: 680 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 681 allowCache=True) 682 t.oidMap = p.oidMap 683 t.snmpStatus = p.snmpStatus 684 t.singleOidMode = p.singleOidMode 685 t.lastChange = p.lastChange 686 p = t 687 688 return p
689 690 691
692 - def updateSnmpStatus(self, status):
693 """ 694 Update the SNMP failure counts from Status database 695 """ 696 countMap = dict(status) 697 for name, proxy in self.proxies.items(): 698 proxy.snmpStatus.count = countMap.get(name, 0)
699 700
701 - def remote_deleteDevice(self, doomed):
702 """ 703 Allows zenhub to delete a device from our configuration 704 """ 705 self.log.debug("Async delete device %s" % doomed) 706 if doomed in self.proxies: 707 del self.proxies[doomed]
708 709
710 - def remote_updateDeviceConfig(self, snmpTargets):
711 """ 712 Allows zenhub to update our device configuration 713 """ 714 self.log.debug("Device updates from zenhub received") 715 self.updateDeviceConfig(snmpTargets)
716 717 718
719 - def updateDeviceConfig(self, configs):
720 """ 721 Examine the given device configuration, and if newer update the device 722 as well as its pickle file. 723 If no SNMP proxy created for the device, create one. 724 """ 725 726 self.log.debug("Received config for %s", configs.device) 727 p = self.updateAgentProxy(configs.device, configs.connInfo) 728 729 if self.options.cacheconfigs: 730 p.lastChange = configs.lastChangeTime 731 data= cPickle.dumps(configs) 732 pickle_name= self.pickleName(configs.device) 733 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name ) 734 write(pickle_name, data) 735 736 # Sanity check all OIDs and prep for eventual RRD file creation 737 oidMap, p.oidMap = p.oidMap, {} 738 for name, oid, path, dsType, createCmd, minmax in configs.oids: 739 createCmd = createCmd.strip() # RRD create options 740 oid = str(oid).strip('.') 741 # beware empty OIDs 742 if oid: 743 oid = '.' + oid 744 oid_status = oidMap.setdefault(oid, OidData()) 745 oid_status.update(name, path, dsType, createCmd, minmax) 746 p.oidMap[oid] = oid_status 747 748 self.proxies[configs.device] = p 749 self.thresholds.updateForDevice(configs.device, configs.thresholds)
750 751 752
753 - def scanCycle(self, *unused):
754 """ 755 """ 756 reactor.callLater(self.perfsnmpCycleInterval, self.scanCycle) 757 self.log.debug("Getting device ping issues") 758 evtSvc = self.services.get('EventService', None) 759 if evtSvc: 760 d = evtSvc.callRemote('getDevicePingIssues') 761 d.addBoth(self.setUnresponsiveDevices) 762 else: 763 self.setUnresponsiveDevices('No event service')
764 765
766 - def setUnresponsiveDevices(self, arg):
767 """ 768 Remember all the unresponsive devices 769 """ 770 if isinstance(arg, list): 771 deviceList = arg 772 self.log.debug('unresponsive devices: %r' % deviceList) 773 self.unresponsiveDevices = Set(firsts(deviceList)) 774 else: 775 self.log.error('Could not get unresponsive devices: %s', arg) 776 self.readDevices()
777 778
779 - def readDevices(self, unused=None):
780 """ 781 Periodically fetch the performance values from all known devices 782 """ 783 # If self.status then this is not the first cycle 784 if self.status: 785 # pending is a dictionary of devices that haven't responded 786 # and the values are the timestamps of each query 787 pending = self.status.getQueryAges() 788 # doneWaiting is the devices from pending that have exceeded 789 # the time we're willing to wait for them 790 doneWaiting = [] 791 for device, age in pending.items(): 792 beenWaiting = time.time() - age 793 if beenWaiting >= self.perfsnmpCycleInterval \ 794 * CYCLES_TO_WAIT_FOR_RESPONSE: 795 self.log.error('No response from %s after %s cycles.' 796 % (device, CYCLES_TO_WAIT_FOR_RESPONSE)) 797 doneWaiting.append(device) 798 else: 799 self.log.warning('Continuing to wait for response from' 800 ' %s after %s seconds' % (device, beenWaiting)) 801 for device in doneWaiting: 802 del pending[device] 803 804 # Report on devices that we didn't have the time to get to 805 queued = self.status.inQueue() 806 if queued: 807 self.log.error('%s devices still queued at end of cycle and did' 808 ' not get queried.' % len(queued)) 809 self.log.debug('Devices not queried: %s' % ', '.join(queued)) 810 811 # If the previous cycle did not complete then report stats 812 # (If it did complete then stats were reports by a 813 # callback on the deferred.) 814 if not self.status._stopTime: 815 self.reportRate() 816 else: 817 pending = {} 818 819 devicesToQuery = Set(self.proxies.keys()) 820 # Don't query devices that can't be pinged 821 devicesToQuery -= self.unresponsiveDevices 822 # Don't query devices we're still waiting for responses from 823 devicesToQuery -= Set(pending.keys()) 824 self.status = Status(self) 825 d = self.status.start(devicesToQuery, pending) 826 d.addCallback(self.reportRate) 827 for unused in range(MAX_SNMP_REQUESTS): 828 if not len(self.status.inQueue()): 829 break 830 d = self.startReadDevice(self.status.popDevice()) 831 832 def printError(reason): 833 """ 834 Twisted errBack to record a traceback and log messages 835 """ 836 from StringIO import StringIO 837 out = StringIO() 838 reason.printTraceback(out) 839 self.log.error(reason)
840 841 d.addErrback(printError) 842 843
844 - def reportRate(self, *unused):
845 """ 846 Finished reading all the devices, report stats and maybe stop 847 """ 848 info = self.status.stats() 849 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0 850 851 self.log.info('******** Cycle completed ********') 852 self.log.info("Sent %d OID requests", oidsRequested) 853 self.log.info('Queried %d devices' % (info['numSucceeded'] \ 854 + info['numFailed'] + info['numInProcess'])) 855 self.log.info(' %s in queue still unqueried' % info['queueSize']) 856 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 857 (info['numSucceeded'], info['numFailed'], info['numInProcess'])) 858 self.log.info('Waited on %d queries from previous cycles.' % 859 (info['numPrevSucceeded'] + info['numPrevFailed'] \ 860 + info['numPrevInProcess'])) 861 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 862 (info['numPrevSucceeded'], info['numPrevFailed'], 863 info['numPrevInProcess'])) 864 self.log.info('Cycle lasted %.2f seconds' % info['age']) 865 self.log.info('*********************************') 866 867 cycle = self.perfsnmpCycleInterval 868 self.sendEvents( 869 self.rrdStats.gauge('success', cycle, 870 info['numSucceeded'] + info['numPrevSucceeded']) + 871 self.rrdStats.gauge('failed', cycle, 872 info['numFailed'] + info['numPrevFailed']) + 873 self.rrdStats.gauge('cycleTime', cycle, info['age']) + 874 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) + 875 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) 876 ) 877 # complain about RRD files that have not been updated 878 self.checkOldFiles()
879
880 - def checkOldFiles(self):
881 """ 882 Send an event showing whether we have old files or not 883 """ 884 if not self.options.checkagingfiles: 885 return 886 self.oldFiles = Set( 887 [f for f in self.oldFiles 888 if os.path.exists(f) and self.oldCheck.test(f)] 889 ) 890 if self.oldFiles: 891 root = performancePath('') 892 filenames = [f.lstrip(root) for f in self.oldFiles] 893 message = 'RRD files not updated: ' + ' '.join(filenames) 894 self.sendEvent(dict( 895 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'), 896 severity=Critical, 897 device=self.options.monitor, 898 eventClass=Status_Perf, 899 summary=message)) 900 else: 901 self.sendEvent(dict( 902 severity=Clear, 903 device=self.options.monitor, 904 eventClass=Status_Perf, 905 summary='All RRD files have been recently updated'))
906 907
908 - def startReadDevice(self, deviceName):
909 """ 910 Initiate a request (or several) to read the performance data 911 from a device 912 """ 913 proxy = self.proxies.get(deviceName, None) 914 if proxy is None: 915 return 916 917 # ensure that the request will fit in a packet 918 # TODO: sanity check this number 919 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest) 920 if proxy.singleOidMode: 921 n = 1 922 923 def getLater(oids): 924 """ 925 Return the result of proxy.get( oids, timeoute, tries ) 926 """ 927 return checkException(self.log, 928 proxy.get, 929 oids, 930 proxy.snmpConnInfo.zSnmpTimeout, 931 proxy.snmpConnInfo.zSnmpTries)
932 933 # Chain a series of deferred actions serially 934 proxy.open() 935 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n))) 936 d = chain.run() 937 938 def closer(arg, proxy): 939 """ 940 Close the proxy 941 """ 942 try: 943 proxy.close() 944 except Exception, ex: 945 self.log.exception(ex) 946 raise ex 947 948 return arg 949 950 d.addCallback(closer, proxy) 951 d.addCallback(self.storeValues, deviceName) 952 953 # Track the total number of OIDs requested this cycle 954 self.snmpOidsRequested += len(proxy.oidMap) 955 956 return d 957 958
959 - def badOid(self, deviceName, oid):
960 """ 961 Report any bad OIDs (eg to a file log and Zenoss event) and then remove 962 the OID so we dont generate any further errors. 963 """ 964 proxy = self.proxies.get(deviceName, None) 965 if proxy is None: 966 return 967 968 name = proxy.oidMap[oid].name 969 summary = 'Error reading value for "%s" on %s (oid %s is bad)' % ( 970 name, deviceName, oid) 971 self.sendEvent(proxy.snmpStatus.snmpStatusEvent, 972 eventClass=Perf_Snmp, 973 device=deviceName, 974 summary=summary, 975 component=name, 976 severity=Event.Debug) 977 self.log.warn(summary) 978 979 del proxy.oidMap[oid]
980 981
982 - def storeValues(self, updates, deviceName):
983 """ 984 Decode responses from devices and store the elements in RRD files 985 """ 986 987 proxy = self.proxies.get(deviceName, None) 988 if proxy is None: 989 self.status.record(deviceName, True) 990 return 991 992 # Look for problems 993 for success, update in updates: 994 # empty update is probably a bad OID in the request somewhere 995 if success and not update and not proxy.singleOidMode: 996 proxy.singleOidMode = True 997 self.log.warn('Error collecting data on %s -- retrying in single-OID mode', 998 deviceName) 999 self.startReadDevice(deviceName) 1000 return 1001 1002 if not success: 1003 if isinstance(update, failure.Failure) and \ 1004 isinstance(update.value, error.TimeoutError): 1005 self.log.debug("Device %s timed out" % deviceName) 1006 else: 1007 self.log.warning('Failed to collect on %s (%s: %s)', 1008 deviceName, 1009 update.__class__, 1010 update) 1011 1012 successCount = sum(firsts(updates)) 1013 oids = [] 1014 for success, update in updates: 1015 if success: 1016 for oid, value in update.items(): 1017 # should always get something back 1018 if value == '' or value is None: 1019 self.badOid(deviceName, oid) 1020 else: 1021 self.storeRRD(deviceName, oid, value) 1022 oids.append(oid) 1023 1024 if successCount == len(updates) and proxy.singleOidMode: 1025 # remove any oids that didn't report 1026 for doomed in Set(proxy.oidMap.keys()) - Set(oids): 1027 self.badOid(deviceName, doomed) 1028 1029 if self.status.inQueue(): 1030 self.startReadDevice(self.status.popDevice()) 1031 1032 if successCount and len(updates) > 0: 1033 successPercent = successCount * 100 / len(updates) 1034 if successPercent not in (0, 100): 1035 self.log.debug("Successful request ratio for %s is %2d%%", 1036 deviceName, 1037 successPercent) 1038 success = True 1039 if updates: 1040 success = successCount > 0 1041 self.status.record(deviceName, success) 1042 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1043 1044
1045 - def storeRRD(self, device, oid, value):
1046 """ 1047 Store a value into an RRD file 1048 1049 @param device: remote device name 1050 @type device: string 1051 @param oid: SNMP OID used as our performance metric 1052 @type oid: string 1053 @param value: data to be stored 1054 @type value: number 1055 """ 1056 oidData = self.proxies[device].oidMap.get(oid, None) 1057 if not oidData: return 1058 1059 raw_value = value 1060 min, max = oidData.minmax 1061 try: 1062 value = self.rrd.save(oidData.path, 1063 value, 1064 oidData.dataStorageType, 1065 oidData.rrdCreateCommand, 1066 min=min, max=max) 1067 except Exception, ex: 1068 summary= "Unable to save data for OID %s in RRD %s" % \ 1069 ( oid, oidData.path ) 1070 self.log.critical( summary ) 1071 1072 message= """Data was value= %s, type=%s, min=%s, max=%s 1073 RRD create command: %s""" % \ 1074 ( value, oidData.dataStorageType, min, max, \ 1075 oidData.rrdCreateCommand ) 1076 self.log.critical( message ) 1077 self.log.exception( ex ) 1078 1079 import traceback 1080 trace_info= traceback.format_exc() 1081 1082 evid= self.sendEvent(dict( 1083 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'), 1084 severity=Critical, 1085 device=self.options.monitor, 1086 eventClass=Status_Perf, 1087 component="RRD", 1088 oid=oid, 1089 path=oidData.path, 1090 message=message, 1091 traceback=trace_info, 1092 summary=summary)) 1093 1094 # Skip thresholds 1095 return 1096 1097 if self.options.showdeviceresults: 1098 self.log.info("%s %s results: raw=%s RRD-converted=%s" 1099 " type=%s, min=%s, max=%s" % ( 1100 device, oid, raw_value, value, oidData.dataStorageType, min, max)) 1101 1102 for ev in self.thresholds.check(oidData.path, time.time(), value): 1103 eventKey = oidData.path.rsplit('/')[-1] 1104 if ev.has_key('eventKey'): 1105 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 1106 else: 1107 ev['eventKey'] = eventKey 1108 self.sendThresholdEvent(**ev)
1109 1110 1111
1112 - def connected(self):
1113 """ 1114 Run forever, fetching and storing 1115 """ 1116 self.log.debug( "Connected to zenhub" ) 1117 d = drive(self.startUpdateConfig) 1118 d.addCallbacks(self.scanCycle, self.errorStop)
1119 1120
1121 - def buildOptions(self):
1122 """ 1123 Build a list of command-line options 1124 """ 1125 SnmpDaemon.buildOptions(self) 1126 self.parser.add_option('--checkAgingFiles', 1127 dest='checkagingfiles', 1128 action="store_true", 1129 default=False, 1130 help="Send events when RRD files are not being updated regularly") 1131 1132 self.parser.add_option('--cacheconfigs', 1133 dest='cacheconfigs', 1134 action="store_true", 1135 default=False, 1136 help="To improve startup times, cache configuration received from zenhub") 1137 1138 self.parser.add_option('--showdeviceresults', 1139 dest='showdeviceresults', 1140 action="store_true", 1141 default=False, 1142 help="Show the raw RRD values. For debugging purposes only.")
1143 1144 1145 if __name__ == '__main__': 1146 # The following bizarre include is required for PB to be happy 1147 from Products.ZenRRD.zenperfsnmp import zenperfsnmp 1148 1149 zpf = zenperfsnmp() 1150 zpf.run() 1151