Package ZenRRD :: Module zenperfsnmp
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zenperfsnmp

   1  #! /usr/bin/env python  
   2  ########################################################################### 
   3  # 
   4  # This program is part of Zenoss Core, an open source monitoring platform. 
   5  # Copyright (C) 2007, Zenoss Inc. 
   6  # 
   7  # This program is free software; you can redistribute it and/or modify it 
   8  # under the terms of the GNU General Public License version 2 as published by 
   9  # the Free Software Foundation. 
  10  # 
  11  # For complete information please visit: http://www.zenoss.com/oss/ 
  12  # 
  13  ########################################################################### 
  14   
  15  __doc__="""zenperfsnmp 
  16   
  17  Gets SNMP performance data and stores it in RRD files. 
  18   
  19  """ 
  20   
  21  import os 
  22  import time 
  23  import logging 
  24  log = logging.getLogger("zen.zenperfsnmp") 
  25   
  26  import copy 
  27  from sets import Set 
  28  import cPickle 
  29   
  30  from twisted.internet import reactor, defer, error 
  31  from twisted.python import failure 
  32   
  33  import Globals 
  34  from Products.ZenUtils.Utils import unused 
  35  from Products.ZenUtils.Chain import Chain 
  36  from Products.ZenUtils.Driver import drive, driveLater 
  37  from Products.ZenModel.PerformanceConf import performancePath 
  38  from Products.ZenEvents import Event 
  39  from Products.ZenEvents.ZenEventClasses \ 
  40       import Perf_Snmp, Status_Snmp, Status_Perf 
  41  from Products.ZenEvents.ZenEventClasses import Critical, Clear 
  42   
  43  from Products.ZenRRD.RRDUtil import RRDUtil 
  44  from SnmpDaemon import SnmpDaemon 
  45   
  46  from FileCleanup import FileCleanup 
  47   
  48  # PerformanceConfig import needed to get pb to work 
  49  from Products.ZenHub.services.PerformanceConfig import PerformanceConfig 
  50  unused(PerformanceConfig) 
  51   
  52  MAX_OIDS_PER_REQUEST = 40 
  53  MAX_SNMP_REQUESTS = 20 
  54  DEVICE_LOAD_CHUNK_SIZE = 20 
  55  CYCLES_TO_WAIT_FOR_RESPONSE = 2 
  56   
57 -def makeDirs(dir):
58 """ 59 Wrapper around makedirs that sanity checks before running 60 """ 61 if os.path.exists(dir): 62 return 63 64 try: 65 os.makedirs(dir, 0750) 66 except Exception, ex: 67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68 69
70 -def read(fname):
71 """ 72 Wrapper around the standard function to open a file and read its contents 73 """ 74 if os.path.exists(fname): 75 fp = file(fname, 'rb') 76 try: 77 return fp.read() 78 finally: 79 fp.close() 80 return ''
81 82
83 -def write(fname, data):
84 """ 85 Wrapper around the standard function to open a file and write data 86 """ 87 makeDirs(os.path.dirname(fname)) 88 89 try: 90 fp = open(fname, 'wb') 91 try: 92 fp.write(data) 93 finally: 94 fp.close() 95 96 except Exception, ex: 97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98 99 106
107 -def chunk(lst, n):
108 """ 109 Break lst into n-sized chunks 110 """ 111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112 113 try: 114 sorted = sorted # added in python 2.4 115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """ 118 Keep things sane in a pre-python 2.4 environment 119 """ 120 lst.sort(*args, **kw) 121 return lst
122
123 -def firsts(lst):
124 """ 125 The first element of every item in a sequence 126 """ 127 return [item[0] for item in lst]
128
129 -def checkException(alog, function, *args, **kw):
130 """ 131 Execute the function with arguments and keywords. 132 If there is an exception, log it using the given 133 logging function 'alog'. 134 """ 135 try: 136 return function(*args, **kw) 137 except Exception, ex: 138 alog.exception(ex) 139 raise ex
140 141 142 143 from twisted.spread import pb
144 -class SnmpConfig(pb.Copyable, pb.RemoteCopy):
145 """ 146 A class to transfer the SNMP collection data to zenperfsnmp 147 """ 148 149 lastChangeTime = 0. 150 device = '' 151 connInfo = None 152 thresholds = [] 153 oids = []
154 155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig) 156 157
158 -class Status:
159 """ 160 Keep track of the status of many parallel requests 161 """ 162
163 - def __init__(self, daemon):
164 """ 165 Initializer 166 """ 167 self.daemon = daemon 168 self.reset()
169
170 - def reset(self):
171 """ 172 Reset instance variables to intial values 173 """ 174 175 # Number of requests issued this cycle that succeeded 176 self._numSucceeded = 0 177 178 # Number of requests issued prior to this cycle that responded 179 # successfully this cycle 180 self._numPrevSucceeded = 0 181 182 # Number of requests issued this cycle that failed 183 self._numFailed = 0 184 185 # Number of requests issues prior to this cycle that responded 186 # unsuccessfully this cycle 187 self._numPrevFailed = 0 188 189 # timestamp when this cycle was started 190 self._startTime = 0 191 192 # timestamp when this cycle completed 193 self._stopTime = 0 194 195 # This deferred gets triggered when this cycle completes. 196 self._deferred = defer.Deferred() 197 198 # Set of device names that should be queried this cycle 199 self._devicesToQueryThisCycle = Set() 200 201 # Names of devices with outstanding queries when this cycle was 202 # started. Values are the timestamp of when the query was made. 203 self._prevQueriesAndAges = {} 204 205 # Set of device names still to be queried this cycle. Initially 206 # this is the same as _devicesToQueryThisCycle but devices are 207 # popped from it as they are queried 208 self._queue = Set() 209 210 # Set of devices names that have reported back this cycle. The query 211 # may have been from this cycle or a previous cycle 212 self._reported = Set()
213 214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """ 217 Record our start time, and return a deferred for our devices 218 219 @type devicesToQuery: iterable 220 @param devicesToQuery: names of devices to poll 221 @type prevQueriesAndAges: dict 222 @param prevQueriesAndAges: devices with outstanding quests 223 @return: deferred 224 """ 225 self.reset() 226 self._startTime = time.time() 227 228 # If there is an outstanding request for a device we don't want to 229 # issue another 230 self._devicesToQueryThisCycle = \ 231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys()) 232 self._prevQueriesAndAges = prevQueriesAndAges 233 self._queue = copy.copy(self._devicesToQueryThisCycle) 234 self._checkFinished() # count could be zero 235 return self._deferred
236 237
238 - def record(self, name, success):
239 """ 240 Record success or failure 241 242 @type name: string 243 @param name: name of device reporting results 244 @type success: boolean 245 @param success: True if query succeeded, False otherwise. 246 """ 247 if name in self._reported: 248 log.error("Device %s is reporting more than once", name) 249 return 250 self._reported.add(name) 251 if name in self._devicesToQueryThisCycle: 252 if success: 253 self._numSucceeded += 1 254 else: 255 self._numFailed += 1 256 self._checkFinished() 257 elif name in self._prevQueriesAndAges: 258 if success: 259 self._numPrevSucceeded += 1 260 else: 261 self._numPrevFailed += 1 262 else: 263 log.debug('Unrecognized device reporting: %s' % name)
264
265 - def _checkFinished(self):
266 """ 267 Determine the stopping point and log our current stats 268 """ 269 if self.finished(): 270 self._stopTime = time.time() 271 if not self._deferred.called: 272 self._deferred.callback(self) 273 self.daemon.heartbeat() 274 info = self.stats() 275 log.info( 276 'success:%d ' % info['numSucceeded'] + 277 'fail:%d ' % info['numFailed'] + 278 'pending:%d ' % info['numInProcess'] + 279 'todo:%d ' % info['queueSize'])
280 281
282 - def finished(self):
283 """ 284 Determine if we have finished, disregarding devices that were queried 285 in a previous cycle and still haven't reported back. 286 """ 287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
289 - def stats(self):
290 """ 291 Return a dictionary with stats for this cycle: 292 numSucceeded - queries made this cycle and reported back success 293 numPrevSucceeded - queries from a prev cycle reported back success 294 numFailed - queries made this cycle reported back failure 295 numPrevFailed - queries made prev cycle reported back failure 296 startTime - timestamp when this cycle started 297 stopTime - timestamp when this cycle stopped 298 age - time cycle took to run or current age (if still running) 299 queueSize - num of devices not queried yet 300 numInProcess - num queried this cycle not reported back yet 301 numPrevInProcess - num queried prev cycle still not reported back 302 numReported - number reported back from this or previous cycles 303 """ 304 return dict( 305 numSucceeded = self._numSucceeded, 306 numPrevSucceeded = self._numPrevSucceeded, 307 numFailed = self._numFailed, 308 numPrevFailed = self._numPrevFailed, 309 startTime = self._startTime, 310 stopTime = self._stopTime, 311 age = self._stopTime and (self._stopTime - self._startTime) \ 312 or (time.time() - self._startTime), 313 queueSize = len(self._queue), 314 numInProcess = len(self.inProcess()), 315 numPrevInProcess = len(self.prevInProcess()), 316 numReported = len(self._reported) 317 )
318
319 - def inProcess(self):
320 """ 321 Return the name of the devices that have been queried this cycle 322 but from whom no response has been received. 323 """ 324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
326 - def prevInProcess(self):
327 """ 328 Return the names of the devices that were queried prior to this cycle 329 and have not yet reported. 330 """ 331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
333 - def inQueue(self):
334 """ 335 Return the names of the devices that have yet to be queried. 336 """ 337 return self._queue
338
339 - def popDevice(self):
340 """ 341 Pop a device to be queried from the queue 342 """ 343 return self._queue.pop()
344
345 - def getQueryAges(self):
346 """ 347 Return a dictionary with the device and age of each query from this 348 or previous cycles that has not yet responeded. 349 """ 350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items() 351 if d not in self._reported]) 352 waiting.update(dict([(d, self._startTime) 353 for d in self.inProcess()])) 354 return waiting
355 356
357 -class SnmpStatus:
358 """ 359 Track and report SNMP status failures 360 """ 361 362 snmpStatusEvent = {'eventClass': Status_Snmp, 363 'component': 'snmp', 364 'eventGroup': 'SnmpTest'} 365 366
367 - def __init__(self, snmpState):
368 """ 369 Initializer 370 """ 371 self.count = snmpState
372 373
374 - def updateStatus(self, deviceName, success, eventCb):
375 """ 376 Send up/down events based on SNMP results 377 """ 378 if success: 379 if self.count > 0: 380 summary='SNMP agent up' 381 eventCb(self.snmpStatusEvent, 382 device=deviceName, summary=summary, 383 severity=Event.Clear) 384 log.info("%s %s" % (deviceName, summary)) 385 self.count = 0 386 else: 387 summary='SNMP agent down' 388 eventCb(self.snmpStatusEvent, 389 device=deviceName, summary=summary, 390 severity=Event.Error) 391 log.warn("%s %s" % (deviceName, summary)) 392 self.count += 1
393 394 395
396 -class OidData:
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
398 """ 399 Container for these paramaters 400 """ 401 self.name = name 402 self.path = path 403 self.dataStorageType = dataStorageType 404 self.rrdCreateCommand = rrdCreateCommand 405 self.minmax = minmax
406
407 -class zenperfsnmp(SnmpDaemon):
408 """ 409 Periodically query all devices for SNMP values to archive in RRD files 410 """ 411 412 # these names need to match the property values in PerformanceMonitorConf 413 maxRrdFileAge = 30 * (24*60*60) # seconds 414 perfsnmpConfigInterval = 20*60 415 perfsnmpCycleInterval = 5*60 416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',) 417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig'] 418
419 - def __init__(self, noopts=0):
420 """ 421 Create any base performance directories (if necessary), 422 load cached configuration data and clean up any old RRD files 423 (if specified by --checkAgingFiles) 424 """ 425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts) 426 self.status = None 427 self.proxies = {} 428 self.unresponsiveDevices = Set() 429 self.snmpOidsRequested = 0 430 431 self.log.info( "Initializing daemon..." ) 432 433 perfRoot = performancePath('') 434 makeDirs(perfRoot) 435 436 if self.options.cacheconfigs: 437 self.loadConfigs() 438 439 self.oldFiles = Set() 440 441 # report on files older than a day 442 if self.options.checkagingfiles: 443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$', 444 24 * 60 * 60, 445 frequency=60) 446 self.oldCheck.process = self.reportOldFile 447 self.oldCheck.start() 448 449 # remove files older than maxRrdFileAge 450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$', 451 self.maxRrdFileAge, 452 frequency=90*60) 453 self.fileCleanup.process = self.cleanup 454 self.fileCleanup.start()
455 456
457 - def pickleName(self, id):
458 """ 459 Return the path to the pickle file for a device 460 """ 461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462 463 464
465 - def loadConfigs(self):
466 """ 467 Read cached configuration values from pickle files at startup. 468 469 NB: We cache in pickles to get a full collect cycle, because 470 loading the initial config can take several minutes. 471 """ 472 self.log.info( "Gathering cached configuration information" ) 473 474 base = performancePath('Devices') 475 makeDirs(base) 476 root, ds, fs = os.walk(base).next() 477 for d in ds: 478 pickle_name= self.pickleName(d) 479 config = read( pickle_name ) 480 if config: 481 try: 482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name ) 483 data= cPickle.loads(config) 484 self.updateDeviceConfig( data ) 485 486 except Exception, ex: 487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) ) 488 try: 489 os.unlink( pickle_name ) 490 except Exception, ex: 491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492 493 494
495 - def cleanup(self, fullPath):
496 """ 497 Delete an old RRD file 498 """ 499 self.log.warning("Deleting old RRD file: %s", fullPath) 500 os.unlink(fullPath) 501 self.oldFiles.discard(fullPath)
502 503
504 - def reportOldFile(self, fullPath):
505 """ 506 Add an RRD file to the list of files to be removed 507 """ 508 self.oldFiles.add(fullPath)
509
510 - def remote_updateDeviceList(self, devices):
511 """ 512 Gather the list of devices from zenhub, update all devices config 513 in the list of devices, and remove any devices that we know about, 514 but zenhub doesn't know about. 515 516 NB: This is callable from within zenhub. 517 """ 518 SnmpDaemon.remote_updateDeviceList(self, devices) 519 # NB: Anything not explicitly sent by zenhub should be deleted 520 survivors = [] 521 doomed = Set(self.proxies.keys()) 522 for device, lastChange in devices: 523 doomed.discard(device) 524 proxy = self.proxies.get(device) 525 if not proxy or proxy.lastChange < lastChange: 526 survivors.append(device) 527 528 log.info("Deleting %s", doomed) 529 for d in doomed: 530 del self.proxies[d] 531 532 if survivors: 533 log.info("Fetching configs: %s", survivors) 534 d = self.model().callRemote('getDevices', survivors) 535 d.addCallback(self.updateDeviceList, survivors) 536 d.addErrback(self.error)
537 538 539
540 - def startUpdateConfig(self, driver):
541 """ 542 Periodically ask the Zope server for basic configuration data. 543 """ 544 545 now = time.time() 546 547 log.info("Fetching property items...") 548 yield self.model().callRemote('propertyItems') 549 self.setPropertyItems(driver.next()) 550 551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig) 552 553 log.info("Getting threshold classes...") 554 yield self.model().callRemote('getThresholdClasses') 555 self.remote_updateThresholdClasses(driver.next()) 556 557 log.info("Checking for outdated configs...") 558 current = [(k, v.lastChange) for k, v in self.proxies.items()] 559 yield self.model().callRemote('getDeviceUpdates', current) 560 561 devices = driver.next() 562 if self.options.device: 563 devices = [self.options.device] 564 565 log.info("Fetching configs for %s", repr(devices)[0:800]+'...') 566 yield self.model().callRemote('getDevices', devices) 567 updatedDevices = driver.next() 568 569 log.info("Fetching default RRDCreateCommand...") 570 yield self.model().callRemote('getDefaultRRDCreateCommand') 571 createCommand = driver.next() 572 573 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval) 574 575 log.info( "Getting collector thresholds..." ) 576 yield self.model().callRemote('getCollectorThresholds') 577 self.rrdStats.config(self.options.monitor, self.name, driver.next(), 578 createCommand) 579 580 log.info("Fetching SNMP status...") 581 yield self.model().callRemote('getSnmpStatus', self.options.device) 582 self.updateSnmpStatus(driver.next()) 583 584 # Kick off the device load 585 log.info("Initiating incremental device load") 586 d = self.updateDeviceList(updatedDevices, devices) 587 def report(result): 588 """ 589 Twisted deferred errBack to check for errors 590 """ 591 if result: 592 log.error("Error loading devices: %s", result)
593 d.addBoth(report) 594 self.sendEvents(self.rrdStats.gauge('configTime', 595 self.configCycleInterval * 60, 596 time.time() - now))
597 598
599 - def updateDeviceList(self, responses, requested):
600 """ 601 Update the config for devices 602 """ 603 604 def fetchDevices(driver): 605 """ 606 An iterable to go over the list of devices 607 """ 608 deviceNames = Set() 609 length = len(responses) 610 log.debug("Fetching configs for %d devices", length) 611 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE): 612 log.debug("Fetching config for %s", devices) 613 yield self.model().callRemote('getDeviceConfigs', devices) 614 try: 615 for response in driver.next(): 616 self.updateDeviceConfig(response) 617 except Exception, ex: 618 log.warning("Error loading config for devices %s" % devices) 619 for d in devices: 620 deviceNames.add(d) 621 log.debug("Finished fetching configs for %d devices", length) 622 623 # stop collecting those no longer in the list 624 doomed = Set(requested) - deviceNames 625 if self.options.device: 626 self.log.debug('Gathering performance data for %s ' % 627 self.options.device) 628 doomed = Set(self.proxies.keys()) 629 doomed.discard(self.options.device) 630 for name in doomed: 631 self.log.info('Removing device %s' % name) 632 if name in self.proxies: 633 del self.proxies[name] 634 635 # Just in case, delete any pickle files that might exist 636 config = self.pickleName(name) 637 unlink(config) 638 # we could delete the RRD files, too 639 640 ips = Set() 641 for name, proxy in self.proxies.items(): 642 if proxy.snmpConnInfo.manageIp in ips: 643 log.warning("Warning: device %s has a duplicate address %s", 644 name, proxy.snmpConnInfo.manageIp) 645 ips.add(proxy.snmpConnInfo.manageIp) 646 self.log.info('Configured %d of %d devices', 647 len(deviceNames), len(self.proxies)) 648 yield defer.succeed(None)
649 return drive(fetchDevices) 650 651 652
653 - def updateAgentProxy(self, deviceName, snmpConnInfo):
654 """ 655 Create or update proxy 656 657 @parameter deviceName: device name known by zenhub 658 @type deviceName: string 659 @parameter snmpConnInfo: object information passed from zenhub 660 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py 661 @return: connection information from the proxy 662 @rtype: SnmpConnInfo class 663 """ 664 p = self.proxies.get(deviceName, None) 665 if not p: 666 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 667 allowCache=True) 668 p.oidMap = {} 669 p.snmpStatus = SnmpStatus(0) 670 p.singleOidMode = False 671 p.lastChange = 0 672 673 if p.snmpConnInfo != snmpConnInfo: 674 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 675 allowCache=True) 676 t.oidMap = p.oidMap 677 t.snmpStatus = p.snmpStatus 678 t.singleOidMode = p.singleOidMode 679 t.lastChange = p.lastChange 680 p = t 681 682 return p
683 684 685
686 - def updateSnmpStatus(self, status):
687 """ 688 Update the SNMP failure counts from Status database 689 """ 690 countMap = dict(status) 691 for name, proxy in self.proxies.items(): 692 proxy.snmpStatus.count = countMap.get(name, 0)
693 694
695 - def remote_deleteDevice(self, doomed):
696 """ 697 Allows zenhub to delete a device from our configuration 698 """ 699 self.log.debug("Async delete device %s" % doomed) 700 if doomed in self.proxies: 701 del self.proxies[doomed]
702 703
704 - def remote_updateDeviceConfig(self, snmpTargets):
705 """ 706 Allows zenhub to update our device configuration 707 """ 708 self.log.debug("Device updates from zenhub received") 709 self.updateDeviceConfig(snmpTargets)
710 711 712
713 - def updateDeviceConfig(self, configs):
714 """ 715 Examine the given device configuration, and if newer update the device 716 as well as its pickle file. 717 If no SNMP proxy created for the device, create one. 718 """ 719 720 self.log.debug("Received config for %s", configs.device) 721 p = self.updateAgentProxy(configs.device, configs.connInfo) 722 723 if self.options.cacheconfigs: 724 p.lastChange = configs.lastChangeTime 725 data= cPickle.dumps(configs) 726 pickle_name= self.pickleName(configs.device) 727 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name ) 728 write(pickle_name, data) 729 730 # Sanity check all OIDs and prep for eventual RRD file creation 731 oidMap, p.oidMap = p.oidMap, {} 732 for name, oid, path, dsType, createCmd, minmax in configs.oids: 733 createCmd = createCmd.strip() # RRD create options 734 oid = str(oid).strip('.') 735 # beware empty OIDs 736 if oid: 737 oid = '.' + oid 738 oid_status = oidMap.setdefault(oid, OidData()) 739 oid_status.update(name, path, dsType, createCmd, minmax) 740 p.oidMap[oid] = oid_status 741 742 self.proxies[configs.device] = p 743 self.thresholds.updateForDevice(configs.device, configs.thresholds)
744 745 746
747 - def scanCycle(self, *unused):
748 """ 749 """ 750 reactor.callLater(self.perfsnmpCycleInterval, self.scanCycle) 751 self.log.debug("Getting device ping issues") 752 evtSvc = self.services.get('EventService', None) 753 if evtSvc: 754 d = evtSvc.callRemote('getDevicePingIssues') 755 d.addBoth(self.setUnresponsiveDevices) 756 else: 757 self.setUnresponsiveDevices('No event service')
758 759
760 - def setUnresponsiveDevices(self, arg):
761 """ 762 Remember all the unresponsive devices 763 """ 764 if isinstance(arg, list): 765 deviceList = arg 766 self.log.debug('unresponsive devices: %r' % deviceList) 767 self.unresponsiveDevices = Set(firsts(deviceList)) 768 else: 769 self.log.error('Could not get unresponsive devices: %s', arg) 770 self.readDevices()
771 772
773 - def readDevices(self, unused=None):
774 """ 775 Periodically fetch the performance values from all known devices 776 """ 777 # If self.status then this is not the first cycle 778 if self.status: 779 # pending is a dictionary of devices that haven't responded 780 # and the values are the timestamps of each query 781 pending = self.status.getQueryAges() 782 # doneWaiting is the devices from pending that have exceeded 783 # the time we're willing to wait for them 784 doneWaiting = [] 785 for device, age in pending.items(): 786 beenWaiting = time.time() - age 787 if beenWaiting >= self.perfsnmpCycleInterval \ 788 * CYCLES_TO_WAIT_FOR_RESPONSE: 789 self.log.error('No response from %s after %s cycles.' 790 % (device, CYCLES_TO_WAIT_FOR_RESPONSE)) 791 doneWaiting.append(device) 792 else: 793 self.log.warning('Continuing to wait for response from' 794 ' %s after %s seconds' % (device, beenWaiting)) 795 for device in doneWaiting: 796 del pending[device] 797 798 # Report on devices that we didn't have the time to get to 799 queued = self.status.inQueue() 800 if queued: 801 self.log.error('%s devices still queued at end of cycle and did' 802 ' not get queried.' % len(queued)) 803 self.log.debug('Devices not queried: %s' % ', '.join(queued)) 804 805 # If the previous cycle did not complete then report stats 806 # (If it did complete then stats were reports by a 807 # callback on the deferred.) 808 if not self.status._stopTime: 809 self.reportRate() 810 else: 811 pending = {} 812 813 devicesToQuery = Set(self.proxies.keys()) 814 # Don't query devices that can't be pinged 815 devicesToQuery -= self.unresponsiveDevices 816 # Don't query devices we're still waiting for responses from 817 devicesToQuery -= Set(pending.keys()) 818 self.status = Status(self) 819 d = self.status.start(devicesToQuery, pending) 820 d.addCallback(self.reportRate) 821 for unused in range(MAX_SNMP_REQUESTS): 822 if not len(self.status.inQueue()): 823 break 824 d = self.startReadDevice(self.status.popDevice()) 825 826 def printError(reason): 827 """ 828 Twisted errBack to record a traceback and log messages 829 """ 830 from StringIO import StringIO 831 out = StringIO() 832 reason.printTraceback(out) 833 self.log.error(reason)
834 835 d.addErrback(printError) 836 837
838 - def reportRate(self, *unused):
839 """ 840 Finished reading all the devices, report stats and maybe stop 841 """ 842 info = self.status.stats() 843 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0 844 845 self.log.info('******** Cycle completed ********') 846 self.log.info("Sent %d OID requests", oidsRequested) 847 self.log.info('Queried %d devices' % (info['numSucceeded'] \ 848 + info['numFailed'] + info['numInProcess'])) 849 self.log.info(' %s in queue still unqueried' % info['queueSize']) 850 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 851 (info['numSucceeded'], info['numFailed'], info['numInProcess'])) 852 self.log.info('Waited on %d queries from previous cycles.' % 853 (info['numPrevSucceeded'] + info['numPrevFailed'] \ 854 + info['numPrevInProcess'])) 855 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 856 (info['numPrevSucceeded'], info['numPrevFailed'], 857 info['numPrevInProcess'])) 858 self.log.info('Cycle lasted %.2f seconds' % info['age']) 859 self.log.info('*********************************') 860 861 cycle = self.perfsnmpCycleInterval 862 self.sendEvents( 863 self.rrdStats.gauge('success', cycle, 864 info['numSucceeded'] + info['numPrevSucceeded']) + 865 self.rrdStats.gauge('failed', cycle, 866 info['numFailed'] + info['numPrevFailed']) + 867 self.rrdStats.gauge('cycleTime', cycle, info['age']) + 868 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) + 869 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) 870 ) 871 # complain about RRD files that have not been updated 872 self.checkOldFiles()
873
874 - def checkOldFiles(self):
875 """ 876 Send an event showing whether we have old files or not 877 """ 878 if not self.options.checkagingfiles: 879 return 880 self.oldFiles = Set( 881 [f for f in self.oldFiles 882 if os.path.exists(f) and self.oldCheck.test(f)] 883 ) 884 if self.oldFiles: 885 root = performancePath('') 886 filenames = [f.lstrip(root) for f in self.oldFiles] 887 message = 'RRD files not updated: ' + ' '.join(filenames) 888 self.sendEvent(dict( 889 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'), 890 severity=Critical, 891 device=self.options.monitor, 892 eventClass=Status_Perf, 893 summary=message)) 894 else: 895 self.sendEvent(dict( 896 severity=Clear, 897 device=self.options.monitor, 898 eventClass=Status_Perf, 899 summary='All RRD files have been recently updated'))
900 901
902 - def startReadDevice(self, deviceName):
903 """ 904 Initiate a request (or several) to read the performance data 905 from a device 906 """ 907 proxy = self.proxies.get(deviceName, None) 908 if proxy is None: 909 return 910 911 # ensure that the request will fit in a packet 912 # TODO: sanity check this number 913 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest) 914 if proxy.singleOidMode: 915 n = 1 916 917 def getLater(oids): 918 """ 919 Return the result of proxy.get( oids, timeoute, tries ) 920 """ 921 return checkException(self.log, 922 proxy.get, 923 oids, 924 proxy.snmpConnInfo.zSnmpTimeout, 925 proxy.snmpConnInfo.zSnmpTries)
926 927 # Chain a series of deferred actions serially 928 proxy.open() 929 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n))) 930 d = chain.run() 931 932 def closer(arg, proxy): 933 """ 934 Close the proxy 935 """ 936 try: 937 proxy.close() 938 except Exception, ex: 939 self.log.exception(ex) 940 raise ex 941 942 return arg 943 944 d.addCallback(closer, proxy) 945 d.addCallback(self.storeValues, deviceName) 946 947 # Track the total number of OIDs requested this cycle 948 self.snmpOidsRequested += len(proxy.oidMap) 949 950 return d 951 952
953 - def badOid(self, deviceName, oid):
954 """ 955 Report any bad OIDs (eg to a file log and Zenoss event) and then remove 956 the OID so we dont generate any further errors. 957 """ 958 proxy = self.proxies.get(deviceName, None) 959 if proxy is None: 960 return 961 962 name = proxy.oidMap[oid].name 963 summary = 'Error reading value for "%s" on %s (oid %s is bad)' % ( 964 name, deviceName, oid) 965 self.sendEvent(proxy.snmpStatus.snmpStatusEvent, 966 eventClass=Perf_Snmp, 967 device=deviceName, 968 summary=summary, 969 component=name, 970 severity=Event.Debug) 971 self.log.warn(summary) 972 973 del proxy.oidMap[oid]
974 975
976 - def storeValues(self, updates, deviceName):
977 """ 978 Decode responses from devices and store the elements in RRD files 979 """ 980 981 proxy = self.proxies.get(deviceName, None) 982 if proxy is None: 983 self.status.record(deviceName, True) 984 return 985 986 # Look for problems 987 for success, update in updates: 988 # empty update is probably a bad OID in the request somewhere 989 if success and not update and not proxy.singleOidMode: 990 proxy.singleOidMode = True 991 self.log.warn('Error collecting data on %s -- retrying in single-OID mode', 992 deviceName) 993 self.startReadDevice(deviceName) 994 return 995 996 if not success: 997 if isinstance(update, failure.Failure) and \ 998 isinstance(update.value, error.TimeoutError): 999 self.log.debug("Device %s timed out" % deviceName) 1000 else: 1001 self.log.warning('Failed to collect on %s (%s: %s)', 1002 deviceName, 1003 update.__class__, 1004 update) 1005 1006 successCount = sum(firsts(updates)) 1007 oids = [] 1008 for success, update in updates: 1009 if success: 1010 for oid, value in update.items(): 1011 # should always get something back 1012 if value == '' or value is None: 1013 self.badOid(deviceName, oid) 1014 else: 1015 self.storeRRD(deviceName, oid, value) 1016 oids.append(oid) 1017 1018 if successCount == len(updates) and proxy.singleOidMode: 1019 # remove any oids that didn't report 1020 for doomed in Set(proxy.oidMap.keys()) - Set(oids): 1021 self.badOid(deviceName, doomed) 1022 1023 if self.status.inQueue(): 1024 self.startReadDevice(self.status.popDevice()) 1025 1026 if successCount and len(updates) > 0: 1027 successPercent = successCount * 100 / len(updates) 1028 if successPercent not in (0, 100): 1029 self.log.debug("Successful request ratio for %s is %2d%%", 1030 deviceName, 1031 successPercent) 1032 success = True 1033 if updates: 1034 success = successCount > 0 1035 self.status.record(deviceName, success) 1036 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1037 1038
1039 - def storeRRD(self, device, oid, value):
1040 """ 1041 Store a value into an RRD file 1042 1043 @param device: remote device name 1044 @type device: string 1045 @param oid: SNMP OID used as our performance metric 1046 @type oid: string 1047 @param value: data to be stored 1048 @type value: number 1049 """ 1050 oidData = self.proxies[device].oidMap.get(oid, None) 1051 if not oidData: return 1052 1053 raw_value = value 1054 min, max = oidData.minmax 1055 try: 1056 value = self.rrd.save(oidData.path, 1057 value, 1058 oidData.dataStorageType, 1059 oidData.rrdCreateCommand, 1060 min=min, max=max) 1061 except Exception, ex: 1062 summary= "Unable to save data for OID %s in RRD %s" % \ 1063 ( oid, oidData.path ) 1064 self.log.critical( summary ) 1065 1066 message= """Data was value= %s, type=%s, min=%s, max=%s 1067 RRD create command: %s""" % \ 1068 ( value, oidData.dataStorageType, min, max, \ 1069 oidData.rrdCreateCommand ) 1070 self.log.critical( message ) 1071 self.log.exception( ex ) 1072 1073 import traceback 1074 trace_info= traceback.format_exc() 1075 1076 evid= self.sendEvent(dict( 1077 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'), 1078 severity=Critical, 1079 device=self.options.monitor, 1080 eventClass=Status_Perf, 1081 component="RRD", 1082 oid=oid, 1083 path=oidData.path, 1084 message=message, 1085 traceback=trace_info, 1086 summary=summary)) 1087 1088 # Skip thresholds 1089 return 1090 1091 if self.options.showdeviceresults: 1092 self.log.info("%s %s results: raw=%s RRD-converted=%s" 1093 " type=%s, min=%s, max=%s" % ( 1094 device, oid, raw_value, value, oidData.dataStorageType, min, max)) 1095 1096 for ev in self.thresholds.check(oidData.path, time.time(), value): 1097 eventKey = oidData.path.rsplit('/')[-1] 1098 if ev.has_key('eventKey'): 1099 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 1100 else: 1101 ev['eventKey'] = eventKey 1102 self.sendThresholdEvent(**ev)
1103 1104 1105
1106 - def connected(self):
1107 """ 1108 Run forever, fetching and storing 1109 """ 1110 self.log.debug( "Connected to zenhub" ) 1111 d = drive(self.startUpdateConfig) 1112 d.addCallbacks(self.scanCycle, self.errorStop)
1113 1114
1115 - def buildOptions(self):
1116 """ 1117 Build a list of command-line options 1118 """ 1119 SnmpDaemon.buildOptions(self) 1120 self.parser.add_option('--checkAgingFiles', 1121 dest='checkagingfiles', 1122 action="store_true", 1123 default=False, 1124 help="Send events when RRD files are not being updated regularly") 1125 1126 self.parser.add_option('--cacheconfigs', 1127 dest='cacheconfigs', 1128 action="store_true", 1129 default=False, 1130 help="To improve startup times, cache configuration received from zenhub") 1131 1132 self.parser.add_option('--showdeviceresults', 1133 dest='showdeviceresults', 1134 action="store_true", 1135 default=False, 1136 help="Show the raw RRD values. For debugging purposes only.")
1137 1138 1139 if __name__ == '__main__': 1140 # The following bizarre include is required for PB to be happy 1141 from Products.ZenRRD.zenperfsnmp import zenperfsnmp 1142 1143 zpf = zenperfsnmp() 1144 zpf.run() 1145