Package Products :: Package ZenRRD :: Module zenperfsnmp
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenperfsnmp

   1  #! /usr/bin/env python  
   2  ########################################################################### 
   3  # 
   4  # This program is part of Zenoss Core, an open source monitoring platform. 
   5  # Copyright (C) 2007, Zenoss Inc. 
   6  # 
   7  # This program is free software; you can redistribute it and/or modify it 
   8  # under the terms of the GNU General Public License version 2 as published by 
   9  # the Free Software Foundation. 
  10  # 
  11  # For complete information please visit: http://www.zenoss.com/oss/ 
  12  # 
  13  ########################################################################### 
  14   
  15  __doc__="""zenperfsnmp 
  16   
  17  Gets SNMP performance data and stores it in RRD files. 
  18   
  19  """ 
  20   
  21  import os 
  22  import time 
  23  import logging 
  24  log = logging.getLogger("zen.zenperfsnmp") 
  25   
  26  import copy 
  27  from sets import Set 
  28  import cPickle 
  29   
  30  from twisted.internet import reactor, defer, error 
  31  from twisted.python import failure 
  32   
  33  import Globals 
  34  from Products.ZenUtils.Utils import unused 
  35  from Products.ZenUtils.Chain import Chain 
  36  from Products.ZenUtils.Driver import drive, driveLater 
  37  from Products.ZenModel.PerformanceConf import performancePath 
  38  from Products.ZenEvents import Event 
  39  from Products.ZenEvents.ZenEventClasses \ 
  40       import Perf_Snmp, Status_Snmp, Status_Perf 
  41  from Products.ZenEvents.ZenEventClasses import Critical, Clear 
  42   
  43  from Products.ZenRRD.RRDUtil import RRDUtil 
  44  from SnmpDaemon import SnmpDaemon 
  45   
  46  from FileCleanup import FileCleanup 
  47   
  48  # PerformanceConfig import needed to get pb to work 
  49  from Products.ZenHub.services.PerformanceConfig import PerformanceConfig 
  50  unused(PerformanceConfig) 
  51   
  52  MAX_OIDS_PER_REQUEST = 40 
  53  MAX_SNMP_REQUESTS = 20 
  54  DEVICE_LOAD_CHUNK_SIZE = 20 
  55  CYCLES_TO_WAIT_FOR_RESPONSE = 2 
  56   
57 -def makeDirs(dir):
58 """ 59 Wrapper around makedirs that sanity checks before running 60 """ 61 if os.path.exists(dir): 62 return 63 64 try: 65 os.makedirs(dir, 0750) 66 except Exception, ex: 67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68 69
70 -def read(fname):
71 """ 72 Wrapper around the standard function to open a file and read its contents 73 """ 74 if os.path.exists(fname): 75 fp = file(fname, 'rb') 76 try: 77 return fp.read() 78 finally: 79 fp.close() 80 return ''
81 82
83 -def write(fname, data):
84 """ 85 Wrapper around the standard function to open a file and write data 86 """ 87 makeDirs(os.path.dirname(fname)) 88 89 try: 90 fp = open(fname, 'wb') 91 try: 92 fp.write(data) 93 finally: 94 fp.close() 95 96 except Exception, ex: 97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98 99 106
107 -def chunk(lst, n):
108 """ 109 Break lst into n-sized chunks 110 """ 111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112 113 try: 114 sorted = sorted # added in python 2.4 115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """ 118 Keep things sane in a pre-python 2.4 environment 119 """ 120 lst.sort(*args, **kw) 121 return lst
122
123 -def firsts(lst):
124 """ 125 The first element of every item in a sequence 126 """ 127 return [item[0] for item in lst]
128
129 -def checkException(alog, function, *args, **kw):
130 """ 131 Execute the function with arguments and keywords. 132 If there is an exception, log it using the given 133 logging function 'alog'. 134 """ 135 try: 136 return function(*args, **kw) 137 except Exception, ex: 138 alog.exception(ex) 139 raise ex
140 141 142 143 from twisted.spread import pb
144 -class SnmpConfig(pb.Copyable, pb.RemoteCopy):
145 """ 146 A class to transfer the SNMP collection data to zenperfsnmp 147 """ 148 149 lastChangeTime = 0. 150 device = '' 151 connInfo = None 152 thresholds = [] 153 oids = []
154 155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig) 156 157
158 -class Status:
159 """ 160 Keep track of the status of many parallel requests 161 """ 162
163 - def __init__(self, daemon):
164 """ 165 Initializer 166 """ 167 self.daemon = daemon 168 self.reset()
169
170 - def reset(self):
171 """ 172 Reset instance variables to intial values 173 """ 174 175 # Number of requests issued this cycle that succeeded 176 self._numSucceeded = 0 177 178 # Number of requests issued prior to this cycle that responded 179 # successfully this cycle 180 self._numPrevSucceeded = 0 181 182 # Number of requests issued this cycle that failed 183 self._numFailed = 0 184 185 # Number of requests issues prior to this cycle that responded 186 # unsuccessfully this cycle 187 self._numPrevFailed = 0 188 189 # timestamp when this cycle was started 190 self._startTime = 0 191 192 # timestamp when this cycle completed 193 self._stopTime = 0 194 195 # This deferred gets triggered when this cycle completes. 196 self._deferred = defer.Deferred() 197 198 # Set of device names that should be queried this cycle 199 self._devicesToQueryThisCycle = Set() 200 201 # Names of devices with outstanding queries when this cycle was 202 # started. Values are the timestamp of when the query was made. 203 self._prevQueriesAndAges = {} 204 205 # Set of device names still to be queried this cycle. Initially 206 # this is the same as _devicesToQueryThisCycle but devices are 207 # popped from it as they are queried 208 self._queue = Set() 209 210 # Set of devices names that have reported back this cycle. The query 211 # may have been from this cycle or a previous cycle 212 self._reported = Set()
213 214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """ 217 Record our start time, and return a deferred for our devices 218 219 @type devicesToQuery: iterable 220 @param devicesToQuery: names of devices to poll 221 @type prevQueriesAndAges: dict 222 @param prevQueriesAndAges: devices with outstanding quests 223 @return: deferred 224 """ 225 self.reset() 226 self._startTime = time.time() 227 228 # If there is an outstanding request for a device we don't want to 229 # issue another 230 self._devicesToQueryThisCycle = \ 231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys()) 232 self._prevQueriesAndAges = prevQueriesAndAges 233 self._queue = copy.copy(self._devicesToQueryThisCycle) 234 self._checkFinished() # count could be zero 235 return self._deferred
236 237
238 - def record(self, name, success):
239 """ 240 Record success or failure 241 242 @type name: string 243 @param name: name of device reporting results 244 @type success: boolean 245 @param success: True if query succeeded, False otherwise. 246 """ 247 if name in self._reported: 248 log.error("Device %s is reporting more than once", name) 249 return 250 self._reported.add(name) 251 if name in self._devicesToQueryThisCycle: 252 if success: 253 self._numSucceeded += 1 254 else: 255 self._numFailed += 1 256 self._checkFinished() 257 elif name in self._prevQueriesAndAges: 258 if success: 259 self._numPrevSucceeded += 1 260 else: 261 self._numPrevFailed += 1 262 else: 263 log.debug('Unrecognized device reporting: %s' % name)
264
265 - def _checkFinished(self):
266 """ 267 Determine the stopping point and log our current stats 268 """ 269 if self.finished(): 270 self._stopTime = time.time() 271 if not self._deferred.called: 272 self._deferred.callback(self) 273 self.daemon.heartbeat() 274 info = self.stats() 275 log.info( 276 'success:%d ' % info['numSucceeded'] + 277 'fail:%d ' % info['numFailed'] + 278 'pending:%d ' % info['numInProcess'] + 279 'todo:%d ' % info['queueSize'])
280 281
282 - def finished(self):
283 """ 284 Determine if we have finished, disregarding devices that were queried 285 in a previous cycle and still haven't reported back. 286 """ 287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
289 - def stats(self):
290 """ 291 Return a dictionary with stats for this cycle: 292 numSucceeded - queries made this cycle and reported back success 293 numPrevSucceeded - queries from a prev cycle reported back success 294 numFailed - queries made this cycle reported back failure 295 numPrevFailed - queries made prev cycle reported back failure 296 startTime - timestamp when this cycle started 297 stopTime - timestamp when this cycle stopped 298 age - time cycle took to run or current age (if still running) 299 queueSize - num of devices not queried yet 300 numInProcess - num queried this cycle not reported back yet 301 numPrevInProcess - num queried prev cycle still not reported back 302 numReported - number reported back from this or previous cycles 303 """ 304 return dict( 305 numSucceeded = self._numSucceeded, 306 numPrevSucceeded = self._numPrevSucceeded, 307 numFailed = self._numFailed, 308 numPrevFailed = self._numPrevFailed, 309 startTime = self._startTime, 310 stopTime = self._stopTime, 311 age = self._stopTime and (self._stopTime - self._startTime) \ 312 or (time.time() - self._startTime), 313 queueSize = len(self._queue), 314 numInProcess = len(self.inProcess()), 315 numPrevInProcess = len(self.prevInProcess()), 316 numReported = len(self._reported) 317 )
318
319 - def inProcess(self):
320 """ 321 Return the name of the devices that have been queried this cycle 322 but from whom no response has been received. 323 """ 324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
326 - def prevInProcess(self):
327 """ 328 Return the names of the devices that were queried prior to this cycle 329 and have not yet reported. 330 """ 331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
333 - def inQueue(self):
334 """ 335 Return the names of the devices that have yet to be queried. 336 """ 337 return self._queue
338
339 - def popDevice(self):
340 """ 341 Pop a device to be queried from the queue 342 """ 343 return self._queue.pop()
344
345 - def getQueryAges(self):
346 """ 347 Return a dictionary with the device and age of each query from this 348 or previous cycles that has not yet responeded. 349 """ 350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items() 351 if d not in self._reported]) 352 waiting.update(dict([(d, self._startTime) 353 for d in self.inProcess()])) 354 return waiting
355 356
357 -class SnmpStatus:
358 """ 359 Track and report SNMP status failures 360 """ 361 362 snmpStatusEvent = {'eventClass': Status_Snmp, 363 'component': 'snmp', 364 'eventGroup': 'SnmpTest'} 365 366
367 - def __init__(self, snmpState):
368 """ 369 Initializer 370 """ 371 self.count = snmpState
372 373
374 - def updateStatus(self, deviceName, success, eventCb):
375 """ 376 Send up/down events based on SNMP results 377 """ 378 if success: 379 if self.count > 0: 380 summary='SNMP agent up' 381 eventCb(self.snmpStatusEvent, 382 device=deviceName, summary=summary, 383 severity=Event.Clear) 384 log.info("%s %s" % (deviceName, summary)) 385 self.count = 0 386 else: 387 summary='SNMP agent down' 388 eventCb(self.snmpStatusEvent, 389 device=deviceName, summary=summary, 390 severity=Event.Error) 391 log.warn("%s %s" % (deviceName, summary)) 392 self.count += 1
393 394 395
396 -class OidData:
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
398 """ 399 Container for these paramaters 400 """ 401 self.name = name 402 self.path = path 403 self.dataStorageType = dataStorageType 404 self.rrdCreateCommand = rrdCreateCommand 405 self.minmax = minmax
406
407 -class zenperfsnmp(SnmpDaemon):
408 """ 409 Periodically query all devices for SNMP values to archive in RRD files 410 """ 411 412 # these names need to match the property values in PerformanceMonitorConf 413 maxRrdFileAge = 30 * (24*60*60) # seconds 414 perfsnmpConfigInterval = 20*60 415 perfsnmpCycleInterval = 5*60 416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',) 417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig'] 418
419 - def __init__(self, noopts=0):
420 """ 421 Create any base performance directories (if necessary), 422 load cached configuration data and clean up any old RRD files 423 (if specified by --checkAgingFiles) 424 """ 425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts) 426 self.status = None 427 self.proxies = {} 428 self.unresponsiveDevices = Set() 429 self.snmpOidsRequested = 0 430 431 self.log.info( "Initializing daemon..." ) 432 433 perfRoot = performancePath('') 434 makeDirs(perfRoot) 435 436 if self.options.cacheconfigs: 437 self.loadConfigs() 438 439 self.oldFiles = Set() 440 441 # report on files older than a day 442 if self.options.checkagingfiles: 443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$', 444 24 * 60 * 60, 445 frequency=60) 446 self.oldCheck.process = self.reportOldFile 447 self.oldCheck.start() 448 449 # remove files older than maxRrdFileAge 450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$', 451 self.maxRrdFileAge, 452 frequency=90*60) 453 self.fileCleanup.process = self.cleanup 454 self.fileCleanup.start()
455 456
457 - def pickleName(self, id):
458 """ 459 Return the path to the pickle file for a device 460 """ 461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462 463 464
465 - def loadConfigs(self):
466 """ 467 Read cached configuration values from pickle files at startup. 468 469 NB: We cache in pickles to get a full collect cycle, because 470 loading the initial config can take several minutes. 471 """ 472 self.log.info( "Gathering cached configuration information" ) 473 474 base = performancePath('Devices') 475 makeDirs(base) 476 root, ds, fs = os.walk(base).next() 477 for d in ds: 478 pickle_name= self.pickleName(d) 479 config = read( pickle_name ) 480 if config: 481 try: 482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name ) 483 data= cPickle.loads(config) 484 self.updateDeviceConfig( data ) 485 486 except Exception, ex: 487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) ) 488 try: 489 os.unlink( pickle_name ) 490 except Exception, ex: 491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492 493 494
495 - def cleanup(self, fullPath):
496 """ 497 Delete an old RRD file 498 """ 499 self.log.warning("Deleting old RRD file: %s", fullPath) 500 os.unlink(fullPath) 501 self.oldFiles.discard(fullPath)
502 503
504 - def reportOldFile(self, fullPath):
505 """ 506 Add an RRD file to the list of files to be removed 507 """ 508 self.oldFiles.add(fullPath)
509
510 - def remote_updateDeviceList(self, devices):
511 """ 512 Gather the list of devices from zenhub, update all devices config 513 in the list of devices, and remove any devices that we know about, 514 but zenhub doesn't know about. 515 516 NB: This is callable from within zenhub. 517 """ 518 SnmpDaemon.remote_updateDeviceList(self, devices) 519 # NB: Anything not explicitly sent by zenhub should be deleted 520 survivors = [] 521 doomed = Set(self.proxies.keys()) 522 for device, lastChange in devices: 523 doomed.discard(device) 524 proxy = self.proxies.get(device) 525 if not proxy or proxy.lastChange < lastChange: 526 survivors.append(device) 527 528 log.info("Deleting %s", doomed) 529 for d in doomed: 530 del self.proxies[d] 531 532 if survivors: 533 log.info("Fetching configs: %s", survivors) 534 d = self.model().callRemote('getDevices', survivors) 535 d.addCallback(self.updateDeviceList, survivors) 536 d.addErrback(self.error)
537 538 539
540 - def startUpdateConfig(self, driver):
541 """ 542 Periodically ask the Zope server for basic configuration data. 543 """ 544 545 now = time.time() 546 547 log.info("Fetching property items...") 548 yield self.model().callRemote('propertyItems') 549 self.setPropertyItems(driver.next()) 550 551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig) 552 553 log.info("Getting threshold classes...") 554 yield self.model().callRemote('getThresholdClasses') 555 self.remote_updateThresholdClasses(driver.next()) 556 557 devices = [] 558 if self.options.device: 559 devices = [self.options.device] 560 else: 561 log.info("Checking for outdated configs...") 562 current = [(k, v.lastChange) for k, v in self.proxies.items()] 563 yield self.model().callRemote('getDeviceUpdates', current) 564 devices = driver.next() 565 566 log.info("Fetching configs for %s", repr(devices)[0:800]+'...') 567 yield self.model().callRemote('getDevices', devices) 568 updatedDevices = driver.next() 569 570 log.info("Fetching default RRDCreateCommand...") 571 yield self.model().callRemote('getDefaultRRDCreateCommand') 572 createCommand = driver.next() 573 574 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval) 575 576 log.info( "Getting collector thresholds..." ) 577 yield self.model().callRemote('getCollectorThresholds') 578 self.rrdStats.config(self.options.monitor, self.name, driver.next(), 579 createCommand) 580 581 log.info("Fetching SNMP status...") 582 yield self.model().callRemote('getSnmpStatus', self.options.device) 583 self.updateSnmpStatus(driver.next()) 584 585 # Kick off the device load 586 log.info("Initiating incremental device load") 587 if self.options.cycle: 588 d = self.updateDeviceList(updatedDevices, devices) 589 def report(result): 590 """ 591 Twisted deferred errBack to check for errors 592 """ 593 if result: 594 log.error("Error loading devices: %s", result)
595 d.addBoth(report) 596 else: 597 #if not in cycle mode wait for the devices to load before collecting 598 yield self.updateDeviceList(updatedDevices, devices) 599 driver.next() 600 601 self.sendEvents(self.rrdStats.gauge('configTime', 602 self.configCycleInterval * 60, 603 time.time() - now))
604 605
606 - def updateDeviceList(self, responses, requested):
607 """ 608 Update the config for devices 609 """ 610 611 def fetchDevices(driver): 612 """ 613 An iterable to go over the list of devices 614 """ 615 deviceNames = Set() 616 length = len(responses) 617 log.debug("Fetching configs for %d devices", length) 618 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE): 619 log.debug("Fetching config for %s", devices) 620 yield self.model().callRemote('getDeviceConfigs', devices) 621 try: 622 for response in driver.next(): 623 self.updateDeviceConfig(response) 624 except Exception, ex: 625 log.warning("Error loading config for devices %s" % devices) 626 for d in devices: 627 deviceNames.add(d) 628 log.debug("Finished fetching configs for %d devices", length) 629 630 # stop collecting those no longer in the list 631 doomed = Set(requested) - deviceNames 632 if self.options.device: 633 self.log.debug('Gathering performance data for %s ' % 634 self.options.device) 635 doomed = Set(self.proxies.keys()) 636 doomed.discard(self.options.device) 637 for name in doomed: 638 self.log.info('Removing device %s' % name) 639 if name in self.proxies: 640 del self.proxies[name] 641 642 # Just in case, delete any pickle files that might exist 643 config = self.pickleName(name) 644 unlink(config) 645 # we could delete the RRD files, too 646 647 ips = Set() 648 for name, proxy in self.proxies.items(): 649 if proxy.snmpConnInfo.manageIp in ips: 650 log.warning("Warning: device %s has a duplicate address %s", 651 name, proxy.snmpConnInfo.manageIp) 652 ips.add(proxy.snmpConnInfo.manageIp) 653 self.log.info('Configured %d of %d devices', 654 len(deviceNames), len(self.proxies)) 655 yield defer.succeed(None)
656 return drive(fetchDevices) 657 658 659
660 - def updateAgentProxy(self, deviceName, snmpConnInfo):
661 """ 662 Create or update proxy 663 664 @parameter deviceName: device name known by zenhub 665 @type deviceName: string 666 @parameter snmpConnInfo: object information passed from zenhub 667 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py 668 @return: connection information from the proxy 669 @rtype: SnmpConnInfo class 670 """ 671 p = self.proxies.get(deviceName, None) 672 if not p: 673 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 674 allowCache=True) 675 p.oidMap = {} 676 p.snmpStatus = SnmpStatus(0) 677 p.singleOidMode = False 678 p.lastChange = 0 679 680 if p.snmpConnInfo != snmpConnInfo: 681 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol, 682 allowCache=True) 683 t.oidMap = p.oidMap 684 t.snmpStatus = p.snmpStatus 685 t.singleOidMode = p.singleOidMode 686 t.lastChange = p.lastChange 687 p = t 688 689 return p
690 691 692
693 - def updateSnmpStatus(self, status):
694 """ 695 Update the SNMP failure counts from Status database 696 """ 697 countMap = dict(status) 698 for name, proxy in self.proxies.items(): 699 proxy.snmpStatus.count = countMap.get(name, 0)
700 701
702 - def remote_deleteDevice(self, doomed):
703 """ 704 Allows zenhub to delete a device from our configuration 705 """ 706 if self.options.device and doomed != self.options.device: 707 return 708 709 self.log.debug("Async delete device %s" % doomed) 710 if doomed in self.proxies: 711 del self.proxies[doomed]
712 713
714 - def remote_updateDeviceConfig(self, snmpTargets):
715 """ 716 Allows zenhub to update our device configuration 717 """ 718 if self.options.device and snmpTargets.device != self.options.device: 719 return 720 721 self.log.debug("Device updates from zenhub received") 722 self.updateDeviceConfig(snmpTargets)
723 724 725
726 - def updateDeviceConfig(self, configs):
727 """ 728 Examine the given device configuration, and if newer update the device 729 as well as its pickle file. 730 If no SNMP proxy created for the device, create one. 731 """ 732 self.log.debug("Received config for %s", configs.device) 733 p = self.updateAgentProxy(configs.device, configs.connInfo) 734 735 if self.options.cacheconfigs: 736 p.lastChange = configs.lastChangeTime 737 data= cPickle.dumps(configs) 738 pickle_name= self.pickleName(configs.device) 739 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name ) 740 write(pickle_name, data) 741 742 # Sanity check all OIDs and prep for eventual RRD file creation 743 oidMap, p.oidMap = p.oidMap, {} 744 for name, oid, path, dsType, createCmd, minmax in configs.oids: 745 createCmd = createCmd.strip() # RRD create options 746 oid = str(oid).strip('.') 747 # beware empty OIDs 748 if oid: 749 oid = '.' + oid 750 oid_status = oidMap.setdefault(oid, OidData()) 751 oid_status.update(name, path, dsType, createCmd, minmax) 752 p.oidMap[oid] = oid_status 753 754 self.proxies[configs.device] = p 755 self.thresholds.updateForDevice(configs.device, configs.thresholds)
756 757 758
759 - def scanCycle(self, *unused):
760 """ 761 """ 762 reactor.callLater(self.perfsnmpCycleInterval, self.scanCycle) 763 self.log.debug("Getting device ping issues") 764 evtSvc = self.services.get('EventService', None) 765 if evtSvc: 766 d = evtSvc.callRemote('getDevicePingIssues') 767 d.addBoth(self.setUnresponsiveDevices) 768 else: 769 self.setUnresponsiveDevices('No event service')
770 771
772 - def setUnresponsiveDevices(self, arg):
773 """ 774 Remember all the unresponsive devices 775 """ 776 if isinstance(arg, list): 777 deviceList = arg 778 self.log.debug('unresponsive devices: %r' % deviceList) 779 self.unresponsiveDevices = Set(firsts(deviceList)) 780 else: 781 self.log.error('Could not get unresponsive devices: %s', arg) 782 self.readDevices()
783 784
785 - def readDevices(self, unused=None):
786 """ 787 Periodically fetch the performance values from all known devices 788 """ 789 # If self.status then this is not the first cycle 790 if self.status: 791 # pending is a dictionary of devices that haven't responded 792 # and the values are the timestamps of each query 793 pending = self.status.getQueryAges() 794 # doneWaiting is the devices from pending that have exceeded 795 # the time we're willing to wait for them 796 doneWaiting = [] 797 for device, age in pending.items(): 798 beenWaiting = time.time() - age 799 if beenWaiting >= self.perfsnmpCycleInterval \ 800 * CYCLES_TO_WAIT_FOR_RESPONSE: 801 self.log.error('No response from %s after %s cycles.' 802 % (device, CYCLES_TO_WAIT_FOR_RESPONSE)) 803 doneWaiting.append(device) 804 else: 805 self.log.warning('Continuing to wait for response from' 806 ' %s after %s seconds' % (device, beenWaiting)) 807 for device in doneWaiting: 808 del pending[device] 809 810 # Report on devices that we didn't have the time to get to 811 queued = self.status.inQueue() 812 if queued: 813 self.log.error('%s devices still queued at end of cycle and did' 814 ' not get queried.' % len(queued)) 815 self.log.debug('Devices not queried: %s' % ', '.join(queued)) 816 817 # If the previous cycle did not complete then report stats 818 # (If it did complete then stats were reports by a 819 # callback on the deferred.) 820 if not self.status._stopTime: 821 self.reportRate() 822 else: 823 pending = {} 824 825 devicesToQuery = Set(self.proxies.keys()) 826 # Don't query devices that can't be pinged 827 devicesToQuery -= self.unresponsiveDevices 828 # Don't query devices we're still waiting for responses from 829 devicesToQuery -= Set(pending.keys()) 830 self.status = Status(self) 831 d = self.status.start(devicesToQuery, pending) 832 d.addCallback(self.reportRate) 833 for unused in range(MAX_SNMP_REQUESTS): 834 if not len(self.status.inQueue()): 835 break 836 d = self.startReadDevice(self.status.popDevice()) 837 838 def printError(reason): 839 """ 840 Twisted errBack to record a traceback and log messages 841 """ 842 from StringIO import StringIO 843 out = StringIO() 844 reason.printTraceback(out) 845 self.log.error(reason)
846 847 d.addErrback(printError) 848 849
850 - def reportRate(self, *unused):
851 """ 852 Finished reading all the devices, report stats and maybe stop 853 """ 854 info = self.status.stats() 855 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0 856 857 self.log.info('******** Cycle completed ********') 858 self.log.info("Sent %d OID requests", oidsRequested) 859 self.log.info('Queried %d devices' % (info['numSucceeded'] \ 860 + info['numFailed'] + info['numInProcess'])) 861 self.log.info(' %s in queue still unqueried' % info['queueSize']) 862 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 863 (info['numSucceeded'], info['numFailed'], info['numInProcess'])) 864 self.log.info('Waited on %d queries from previous cycles.' % 865 (info['numPrevSucceeded'] + info['numPrevFailed'] \ 866 + info['numPrevInProcess'])) 867 self.log.info(' Successes: %d Failures: %d Not reporting: %d' % 868 (info['numPrevSucceeded'], info['numPrevFailed'], 869 info['numPrevInProcess'])) 870 self.log.info('Cycle lasted %.2f seconds' % info['age']) 871 self.log.info('*********************************') 872 873 cycle = self.perfsnmpCycleInterval 874 self.sendEvents( 875 self.rrdStats.gauge('success', cycle, 876 info['numSucceeded'] + info['numPrevSucceeded']) + 877 self.rrdStats.gauge('failed', cycle, 878 info['numFailed'] + info['numPrevFailed']) + 879 self.rrdStats.gauge('cycleTime', cycle, info['age']) + 880 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) + 881 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) 882 ) 883 # complain about RRD files that have not been updated 884 self.checkOldFiles()
885
886 - def checkOldFiles(self):
887 """ 888 Send an event showing whether we have old files or not 889 """ 890 if not self.options.checkagingfiles: 891 return 892 self.oldFiles = Set( 893 [f for f in self.oldFiles 894 if os.path.exists(f) and self.oldCheck.test(f)] 895 ) 896 if self.oldFiles: 897 root = performancePath('') 898 filenames = [f.lstrip(root) for f in self.oldFiles] 899 message = 'RRD files not updated: ' + ' '.join(filenames) 900 self.sendEvent(dict( 901 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'), 902 severity=Critical, 903 device=self.options.monitor, 904 eventClass=Status_Perf, 905 summary=message)) 906 else: 907 self.sendEvent(dict( 908 severity=Clear, 909 device=self.options.monitor, 910 eventClass=Status_Perf, 911 summary='All RRD files have been recently updated'))
912 913
914 - def startReadDevice(self, deviceName):
915 """ 916 Initiate a request (or several) to read the performance data 917 from a device 918 """ 919 proxy = self.proxies.get(deviceName, None) 920 if proxy is None: 921 return 922 923 # ensure that the request will fit in a packet 924 # TODO: sanity check this number 925 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest) 926 if proxy.singleOidMode: 927 n = 1 928 929 def getLater(oids): 930 """ 931 Return the result of proxy.get( oids, timeoute, tries ) 932 """ 933 return checkException(self.log, 934 proxy.get, 935 oids, 936 proxy.snmpConnInfo.zSnmpTimeout, 937 proxy.snmpConnInfo.zSnmpTries)
938 939 # Chain a series of deferred actions serially 940 proxy.open() 941 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n))) 942 d = chain.run() 943 944 def closer(arg, proxy): 945 """ 946 Close the proxy 947 """ 948 try: 949 proxy.close() 950 except Exception, ex: 951 self.log.exception(ex) 952 raise ex 953 954 return arg 955 956 d.addCallback(closer, proxy) 957 d.addCallback(self.storeValues, deviceName) 958 959 # Track the total number of OIDs requested this cycle 960 self.snmpOidsRequested += len(proxy.oidMap) 961 962 return d 963 964
965 - def badOid(self, deviceName, oid):
966 """ 967 Report any bad OIDs (eg to a file log and Zenoss event) and then remove 968 the OID so we dont generate any further errors. 969 """ 970 proxy = self.proxies.get(deviceName, None) 971 if proxy is None: 972 return 973 974 name = proxy.oidMap[oid].name 975 summary = 'Error reading value for "%s" on %s (oid %s is bad)' % ( 976 name, deviceName, oid) 977 self.sendEvent(proxy.snmpStatus.snmpStatusEvent, 978 eventClass=Perf_Snmp, 979 device=deviceName, 980 summary=summary, 981 component=name, 982 severity=Event.Debug) 983 self.log.warn(summary) 984 985 del proxy.oidMap[oid]
986 987
988 - def storeValues(self, updates, deviceName):
989 """ 990 Decode responses from devices and store the elements in RRD files 991 """ 992 993 proxy = self.proxies.get(deviceName, None) 994 if proxy is None: 995 self.status.record(deviceName, True) 996 return 997 998 # Look for problems 999 for success, update in updates: 1000 # empty update is probably a bad OID in the request somewhere 1001 if success and not update and not proxy.singleOidMode: 1002 proxy.singleOidMode = True 1003 self.log.warn('Error collecting data on %s -- retrying in single-OID mode', 1004 deviceName) 1005 self.startReadDevice(deviceName) 1006 return 1007 1008 if not success: 1009 if isinstance(update, failure.Failure) and \ 1010 isinstance(update.value, error.TimeoutError): 1011 self.log.debug("Device %s timed out" % deviceName) 1012 else: 1013 self.log.warning('Failed to collect on %s (%s: %s)', 1014 deviceName, 1015 update.__class__, 1016 update) 1017 1018 successCount = sum(firsts(updates)) 1019 oids = [] 1020 for success, update in updates: 1021 if success: 1022 for oid, value in update.items(): 1023 # should always get something back 1024 if value == '' or value is None: 1025 self.badOid(deviceName, oid) 1026 else: 1027 self.storeRRD(deviceName, oid, value) 1028 oids.append(oid) 1029 1030 if successCount == len(updates) and proxy.singleOidMode: 1031 # remove any oids that didn't report 1032 for doomed in Set(proxy.oidMap.keys()) - Set(oids): 1033 self.badOid(deviceName, doomed) 1034 1035 if self.status.inQueue(): 1036 self.startReadDevice(self.status.popDevice()) 1037 1038 if successCount and len(updates) > 0: 1039 successPercent = successCount * 100 / len(updates) 1040 if successPercent not in (0, 100): 1041 self.log.debug("Successful request ratio for %s is %2d%%", 1042 deviceName, 1043 successPercent) 1044 success = True 1045 if updates: 1046 success = successCount > 0 1047 self.status.record(deviceName, success) 1048 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1049 1050
1051 - def storeRRD(self, device, oid, value):
1052 """ 1053 Store a value into an RRD file 1054 1055 @param device: remote device name 1056 @type device: string 1057 @param oid: SNMP OID used as our performance metric 1058 @type oid: string 1059 @param value: data to be stored 1060 @type value: number 1061 """ 1062 oidData = self.proxies[device].oidMap.get(oid, None) 1063 if not oidData: return 1064 1065 raw_value = value 1066 min, max = oidData.minmax 1067 try: 1068 value = self.rrd.save(oidData.path, 1069 value, 1070 oidData.dataStorageType, 1071 oidData.rrdCreateCommand, 1072 min=min, max=max) 1073 except Exception, ex: 1074 summary= "Unable to save data for OID %s in RRD %s" % \ 1075 ( oid, oidData.path ) 1076 self.log.critical( summary ) 1077 1078 message= """Data was value= %s, type=%s, min=%s, max=%s 1079 RRD create command: %s""" % \ 1080 ( value, oidData.dataStorageType, min, max, \ 1081 oidData.rrdCreateCommand ) 1082 self.log.critical( message ) 1083 self.log.exception( ex ) 1084 1085 import traceback 1086 trace_info= traceback.format_exc() 1087 1088 evid= self.sendEvent(dict( 1089 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'), 1090 severity=Critical, 1091 device=self.options.monitor, 1092 eventClass=Status_Perf, 1093 component="RRD", 1094 oid=oid, 1095 path=oidData.path, 1096 message=message, 1097 traceback=trace_info, 1098 summary=summary)) 1099 1100 # Skip thresholds 1101 return 1102 1103 if self.options.showdeviceresults: 1104 self.log.info("%s %s results: raw=%s RRD-converted=%s" 1105 " type=%s, min=%s, max=%s" % ( 1106 device, oid, raw_value, value, oidData.dataStorageType, min, max)) 1107 1108 for ev in self.thresholds.check(oidData.path, time.time(), value): 1109 eventKey = oidData.path.rsplit('/')[-1] 1110 if ev.has_key('eventKey'): 1111 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 1112 else: 1113 ev['eventKey'] = eventKey 1114 self.sendThresholdEvent(**ev)
1115 1116 1117
1118 - def connected(self):
1119 """ 1120 Run forever, fetching and storing 1121 """ 1122 self.log.debug( "Connected to zenhub" ) 1123 d = drive(self.startUpdateConfig) 1124 d.addCallbacks(self.scanCycle, self.errorStop)
1125 1126
1127 - def buildOptions(self):
1128 """ 1129 Build a list of command-line options 1130 """ 1131 SnmpDaemon.buildOptions(self) 1132 self.parser.add_option('--checkAgingFiles', 1133 dest='checkagingfiles', 1134 action="store_true", 1135 default=False, 1136 help="Send events when RRD files are not being updated regularly") 1137 1138 self.parser.add_option('--cacheconfigs', 1139 dest='cacheconfigs', 1140 action="store_true", 1141 default=False, 1142 help="To improve startup times, cache configuration received from zenhub") 1143 1144 self.parser.add_option('--showdeviceresults', 1145 dest='showdeviceresults', 1146 action="store_true", 1147 default=False, 1148 help="Show the raw RRD values. For debugging purposes only.")
1149 1150 1151 if __name__ == '__main__': 1152 # The following bizarre include is required for PB to be happy 1153 from Products.ZenRRD.zenperfsnmp import zenperfsnmp 1154 1155 zpf = zenperfsnmp() 1156 zpf.run() 1157