Package ZenRRD :: Module zenperfsnmp
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zenperfsnmp

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__='''zenperfsnmp 
 16   
 17  Gets snmp performance data and stores it in the RRD files. 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24  import os 
 25  import time 
 26  import logging 
 27  log = logging.getLogger("zen.zenperfsnmp") 
 28   
 29  from sets import Set 
 30  import cPickle 
 31   
 32  from twisted.internet import reactor, defer 
 33   
 34  try: 
 35      from pynetsnmp.twistedsnmp import AgentProxy 
 36  except ImportError: 
 37      import warnings 
 38      warnings.warn("Using python-based snmp engine") 
 39      from twistedsnmp.agentproxy import AgentProxy 
 40  if not hasattr(AgentProxy, 'open'): 
41 - def ignore(self): pass
42 AgentProxy.open = ignore 43 AgentProxy.close = ignore 44 45 import Globals 46 from Products.ZenUtils.Chain import Chain 47 from Products.ZenUtils.Driver import drive, driveLater 48 from Products.ZenModel.PerformanceConf import performancePath 49 from Products.ZenEvents import Event 50 from Products.ZenEvents.ZenEventClasses import Perf_Snmp, Status_Snmp 51 52 from Products.ZenRRD.RRDUtil import RRDUtil 53 from SnmpDaemon import SnmpDaemon 54 55 from FileCleanup import FileCleanup 56 57 MAX_OIDS_PER_REQUEST = 40 58 MAX_SNMP_REQUESTS = 20 59 DEVICE_LOAD_CHUNK_SIZE = 2 60
61 -def makeDirs(dir):
62 if not os.path.exists(dir): 63 os.makedirs(dir, 0750)
64
65 -def read(fname):
66 if os.path.exists(fname): 67 fp = file(fname, 'rb') 68 try: 69 return fp.read() 70 finally: 71 fp.close() 72 return ''
73
74 -def write(fname, data):
75 makeDirs(os.path.dirname(fname)) 76 fp = open(fname, 'wb') 77 try: 78 fp.write(data) 79 finally: 80 fp.close()
81 85
86 -def chunk(lst, n):
87 'break lst into n-sized chunks' 88 return [lst[i:i+n] for i in range(0, len(lst), n)]
89 90 try: 91 sorted = sorted # added in python 2.4 92 except NameError:
93 - def sorted(lst, *args, **kw):
94 lst.sort(*args, **kw) 95 return lst
96
97 -def firsts(lst):
98 'the first element of every item in a sequence' 99 return [item[0] for item in lst]
100
101 -def checkException(log, function, *args, **kw):
102 try: 103 return function(*args, **kw) 104 except Exception, ex: 105 log.exception(ex) 106 raise ex
107
108 -class Status:
109 'keep track of the status of many parallel requests' 110 _success = _fail = 0 111 _startTime = _stopTime = 0.0 112 _deferred = None 113
114 - def __init__(self):
115 self._allDevices = Set() 116 self._reported = Set()
117
118 - def start(self, devices):
119 'start the clock' 120 self._allDevices = Set(devices) 121 self._reported = Set() 122 self._startTime = time.time() 123 self._deferred = defer.Deferred() 124 self._checkFinished() # count could be zero 125 return self._deferred
126 127
128 - def record(self, name, successOrFailure):
129 'Record success or failure' 130 if name in self._reported: 131 log.error("Device %s is reporting more than once", name) 132 return 133 self._reported.add(name) 134 if successOrFailure: 135 self.success() 136 else: 137 self.fail()
138 139
140 - def success(self, *unused):
141 'record a successful result' 142 self._success += 1 143 self._checkFinished()
144 145
146 - def fail(self, *unused):
147 'record a failed operation' 148 self._fail += 1 149 self._checkFinished()
150 151
152 - def _checkFinished(self):
153 'determine the stopping point' 154 if self.finished(): 155 self._stopTime = time.time() 156 if not self._deferred.called: 157 self._deferred.callback(self) 158 log.info("Count %d good %d bad %d time %f", *self.stats())
159 160
161 - def finished(self):
162 'determine if we have finished everything' 163 return self.outstanding() <= 0
164 165
166 - def stats(self):
167 'provide a summary of the effort' 168 age = self._stopTime - self._startTime 169 if not self._startTime: 170 age = 0.0 171 elif not self.finished(): 172 age = time.time() - self._startTime 173 return (len(self._allDevices), self._success, self._fail, age)
174 175
176 - def outstanding(self):
177 'return the number of unfinished operations' 178 return len(self.outstandingNames())
179
180 - def outstandingNames(self):
181 'return the number of unfinished operations' 182 return self._allDevices - self._reported
183 184
185 -class SnmpStatus:
186 "track and report SNMP status failures" 187 188 snmpStatusEvent = {'eventClass': Status_Snmp, 189 'component': 'snmp', 190 'eventGroup': 'SnmpTest'} 191 192
193 - def __init__(self, snmpState):
194 self.count = snmpState
195 196
197 - def updateStatus(self, deviceName, success, eventCb):
198 'Send events on snmp failures' 199 if success: 200 if self.count > 0: 201 summary='snmp agent up on device ' + deviceName 202 eventCb(self.snmpStatusEvent, 203 device=deviceName, summary=summary, 204 severity=Event.Clear) 205 log.info(summary) 206 self.count = 0 207 else: 208 summary='snmp agent down on device ' + deviceName 209 eventCb(self.snmpStatusEvent, 210 device=deviceName, summary=summary, 211 severity=Event.Error) 212 log.warn(summary) 213 self.count += 1
214 215
216 -class OidData:
217 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
218 self.name = name 219 self.path = path 220 self.dataStorageType = dataStorageType 221 self.rrdCreateCommand = rrdCreateCommand 222 self.minmax = minmax
223 224
225 -class zenperfsnmp(SnmpDaemon):
226 "Periodically query all devices for SNMP values to archive in RRD files" 227 228 # these names need to match the property values in StatusMonitorConf 229 maxRrdFileAge = 30 * (24*60*60) # seconds 230 perfsnmpConfigInterval = 20*60 231 perfsnmpCycleInterval = 5*60 232 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',) 233 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig'] 234
235 - def __init__(self):
236 SnmpDaemon.__init__(self, 'zenperfsnmp') 237 self.status = None 238 self.proxies = {} 239 self.queryWorkList = Set() 240 self.unresponsiveDevices = Set() 241 self.snmpOidsRequested = 0 242 perfRoot = performancePath('') 243 makeDirs(perfRoot) 244 self.loadConfigs(perfRoot) 245 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$', 246 self.maxRrdFileAge, 247 frequency=90*60) 248 self.fileCleanup.process = self.cleanup 249 self.fileCleanup.start()
250
251 - def pickleName(self, id):
252 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
253
254 - def loadConfigs(self, perfRoot):
255 "Read local configuration values at startup" 256 base = performancePath('Devices') 257 makeDirs(base) 258 root, ds, fs = os.walk(base).next() 259 for d in ds: 260 config = read(self.pickleName(d)) 261 if config: 262 try: 263 self.updateDeviceConfig(cPickle.loads(config)) 264 except Exception, ex: 265 self.log.debug("Ignoring updateDeviceConfigFailure: %s", ex)
266
267 - def cleanup(self, fullPath):
268 self.log.warning("Deleting old RRD file: %s", fullPath) 269 os.unlink(fullPath)
270
271 - def maybeQuit(self):
272 "Stop if all performance has been fetched, and we aren't cycling" 273 if not self.options.daemon and \ 274 not self.options.cycle: 275 reactor.callLater(0, reactor.stop)
276
277 - def remote_updateDeviceList(self, devices):
278 SnmpDaemon.remote_updateDeviceList(self, devices) 279 updated = [] 280 doomed = Set(self.proxies.keys()) 281 for device, lastChange in devices: 282 doomed.discard(device) 283 proxy = self.proxies.get(device) 284 if not proxy or proxy.lastChange < lastChange: 285 updated.append(device) 286 log.info("Deleting %s", doomed) 287 for d in doomed: 288 del self.proxies[d] 289 if updated: 290 log.info("Fetching configs: %s", updated) 291 d = self.model().callRemote('getDevices', updated) 292 d.addCallback(self.updateDeviceList, updated) 293 d.addErrback(self.error)
294
295 - def startUpdateConfig(self, driver):
296 'Periodically ask the Zope server for basic configuration data.' 297 298 log.info("fetching property items") 299 yield self.model().callRemote('propertyItems') 300 self.setPropertyItems(driver.next()) 301 302 driveLater(self.configCycleInterval * 60, self.startUpdateConfig) 303 304 log.info("getting threshold classes") 305 yield self.model().callRemote('getThresholdClasses') 306 self.remote_updateThresholdClasses(driver.next()) 307 308 log.info("checking for outdated configs") 309 current = [(k, v.lastChange) for k, v in self.proxies.items()] 310 yield self.model().callRemote('getDeviceUpdates', current) 311 312 devices = driver.next() 313 if self.options.device: 314 devices = [self.options.device] 315 316 log.info("fetching configs for %s", repr(devices)[0:800]+'...') 317 yield self.model().callRemote('getDevices', devices) 318 updatedDevices = driver.next() 319 320 log.info("fetching default RRDCreateCommand") 321 yield self.model().callRemote('getDefaultRRDCreateCommand') 322 createCommand = driver.next() 323 324 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval) 325 326 log.info("fetching snmp status") 327 yield self.model().callRemote('getSnmpStatus', self.options.device) 328 self.updateSnmpStatus(driver.next()) 329 330 # Kick off the device load 331 log.info("Initiating incremental device load") 332 d = self.updateDeviceList(updatedDevices, devices) 333 def report(result): 334 if result: 335 log.error("Error loading devices: %s", result)
336 d.addBoth(report)
337 338 339
340 - def updateDeviceList(self, responses, requested):
341 'Update the config for devices devices' 342 def fetchDevices(driver): 343 deviceNames = Set() 344 length = len(responses) 345 log.debug("Fetching configs for %d devices", length) 346 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE): 347 log.debug("Fetching config for %s", devices) 348 yield self.model().callRemote('getDeviceConfigs', devices) 349 for response in driver.next(): 350 self.updateDeviceConfig(response) 351 for d in devices: 352 deviceNames.add(d) 353 log.debug("Finished fetching configs for %d devices", length) 354 355 # stop collecting those no longer in the list 356 doomed = Set(requested) - deviceNames 357 if self.options.device: 358 self.log.debug('Gathering performance data for %s ' % 359 self.options.device) 360 doomed = Set(self.proxies.keys()) 361 doomed.discard(self.options.device) 362 for name in doomed: 363 self.log.info('removing device %s' % name) 364 if name in self.proxies: 365 del self.proxies[name] 366 config = self.pickleName(name) 367 unlink(config) 368 # we could delete the RRD files, too 369 370 ips = Set() 371 for name, proxy in self.proxies.items(): 372 if proxy.ip in ips: 373 log.warning("Warning: device %s has a duplicate address %s", 374 name, proxy.ip) 375 ips.add(proxy.ip) 376 self.log.info('Configured %d of %d devices', 377 len(deviceNames), len(self.proxies)) 378 yield defer.succeed(None)
379 return drive(fetchDevices) 380 381
382 - def updateAgentProxy(self, 383 deviceName, ip, port, community, 384 version, timeout, tries, maxoids=40):
385 "create or update proxy" 386 # find any cached proxy 387 p = self.proxies.get(deviceName, None) 388 if not p: 389 p = AgentProxy(ip=ip, 390 port=port, 391 community=community, 392 snmpVersion=version, 393 protocol=self.snmpPort.protocol, 394 allowCache=True) 395 p.oidMap = {} 396 p.snmpStatus = SnmpStatus(0) 397 p.singleOidMode = False 398 p.lastChange = 0 399 else: 400 p.ip = ip 401 p.port = port 402 p.community = community 403 p.snmpVersion = version 404 p.timeout = timeout 405 p.tries = tries 406 p.maxoids = maxoids 407 return p
408
409 - def updateSnmpStatus(self, status):
410 "Update the Snmp failure counts from Status database" 411 countMap = dict(status) 412 for name, proxy in self.proxies.items(): 413 proxy.snmpStatus.count = countMap.get(name, 0)
414 415
416 - def remote_deleteDevice(self, doomed):
417 self.log.debug("Async delete device %s" % doomed) 418 if doomed in self.proxies: 419 del self.proxies[doomed]
420 421
422 - def remote_updateDeviceConfig(self, snmpTargets):
423 self.log.debug("Async device update") 424 self.updateDeviceConfig(snmpTargets)
425 426
427 - def updateDeviceConfig(self, snmpTargets):
428 'Save the device configuration and create an SNMP proxy to talk to it' 429 last, identity, thresholds, oidData = snmpTargets 430 deviceName, hostPort, snmpConfig, maxOIDs = identity 431 432 if not oidData: return 433 (ip, port)= hostPort 434 (community, version, timeout, tries) = snmpConfig 435 self.log.debug("received config for %s", deviceName) 436 if version.find('1') >= 0: 437 version = '1' 438 else: 439 version = '2' 440 p = self.updateAgentProxy(deviceName, 441 ip, port, str(community), 442 version, timeout, tries, maxOIDs) 443 if p.lastChange < last: 444 p.lastChange = last 445 write(self.pickleName(deviceName), cPickle.dumps(snmpTargets)) 446 447 oidMap, p.oidMap = p.oidMap, {} 448 for name, oid, path, dsType, createCmd, minmax in oidData: 449 createCmd = createCmd.strip() 450 oid = str(oid).strip('.') 451 # beware empty oids 452 if oid: 453 oid = '.' + oid 454 p.oidMap[oid] = d = oidMap.setdefault(oid, OidData()) 455 d.update(name, path, dsType, createCmd, minmax) 456 self.proxies[deviceName] = p 457 self.thresholds.updateList(thresholds)
458 459
460 - def scanCycle(self, *unused):
461 reactor.callLater(self.perfsnmpCycleInterval, self.scanCycle) 462 self.log.debug("getting device ping issues") 463 evtSvc = self.services.get('EventService', None) 464 if evtSvc: 465 d = evtSvc.callRemote('getDevicePingIssues') 466 d.addBoth(self.setUnresponsiveDevices)
467 468
469 - def setUnresponsiveDevices(self, arg):
470 "remember all the unresponsive devices" 471 if isinstance(arg, list): 472 deviceList = arg 473 self.log.debug('unresponsive devices: %r' % deviceList) 474 self.unresponsiveDevices = Set(firsts(deviceList)) 475 else: 476 self.log.error('Could not get unresponsive devices: %s', arg) 477 self.readDevices()
478 479
480 - def readDevices(self, unused=None):
481 'Periodically fetch the performance values from all known devices' 482 483 if self.status and not self.status.finished(): 484 _, _, _, age = self.status.stats() 485 self.log.warning("There are still %d devices to query", 486 self.status.outstanding()) 487 self.log.warning("Problem devices: %r", 488 list(self.status.outstandingNames())) 489 if age < self.perfsnmpCycleInterval * 2: 490 self.log.warning("Waiting one more cycle period") 491 return 492 self.log.warning("Devices status is not clearing. Restarting.") 493 494 self.queryWorkList = Set(self.proxies.keys()) 495 self.queryWorkList -= self.unresponsiveDevices 496 self.status = Status() 497 d = self.status.start(self.queryWorkList) 498 d.addCallback(self.reportRate) 499 for unused in range(MAX_SNMP_REQUESTS): 500 if not self.queryWorkList: break 501 d = self.startReadDevice(self.queryWorkList.pop()) 502 def printError(reason): 503 from StringIO import StringIO 504 out = StringIO() 505 reason.printTraceback(out) 506 self.log.error(reason)
507 d.addErrback(printError) 508 509
510 - def reportRate(self, *unused):
511 'Finished reading all the devices, report stats and maybe stop' 512 total, success, failed, runtime = self.status.stats() 513 self.log.info("sent %d OID requests", self.snmpOidsRequested) 514 self.log.info("collected %d of %d devices in %.2f" % 515 (success, total, runtime)) 516 self.snmpOidsRequested = 0 517 self.heartbeat()
518 519
520 - def startReadDevice(self, deviceName):
521 '''Initiate a request (or several) to read the performance data 522 from a device''' 523 proxy = self.proxies.get(deviceName, None) 524 if proxy is None: 525 return 526 # ensure that the request will fit in a packet 527 n = int(proxy.maxoids) 528 if proxy.singleOidMode: 529 n = 1 530 def getLater(oids): 531 return checkException(self.log, proxy.get, 532 oids, proxy.timeout, proxy.tries)
533 proxy.open() 534 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n))) 535 d = chain.run() 536 def closer(arg, proxy): 537 checkException(self.log, proxy.close) 538 return arg 539 d.addCallback(closer, proxy) 540 d.addCallback(self.storeValues, deviceName) 541 self.snmpOidsRequested += len(proxy.oidMap) 542 return d 543 544
545 - def badOid(self, deviceName, oid):
546 proxy = self.proxies.get(deviceName, None) 547 if proxy is None: 548 return 549 name = proxy.oidMap[oid].name 550 summary = 'Error reading value for "%s" on %s (oid %s is bad)' % ( 551 name, deviceName, oid) 552 self.sendEvent(proxy.snmpStatus.snmpStatusEvent, 553 eventClass=Perf_Snmp, 554 device=deviceName, 555 summary=summary, 556 component=name, 557 severity=Event.Info) 558 self.log.warn(summary) 559 del proxy.oidMap[oid]
560 561
562 - def storeValues(self, updates, deviceName):
563 'decode responses from devices and store the elements in RRD files' 564 565 proxy = self.proxies.get(deviceName, None) 566 if proxy is None: 567 self.status.record(deviceName, True) 568 return 569 570 # Look for problems 571 for success, update in updates: 572 # empty update is probably a bad OID in the request somewhere 573 if success and not update and not proxy.singleOidMode: 574 proxy.singleOidMode = True 575 self.log.warn('Error collecting data on %s, recollecting', 576 deviceName) 577 self.startReadDevice(deviceName) 578 return 579 if not success: 580 self.log.debug('Failed to collect on %s (%s: %s)', 581 deviceName, 582 update.__class__, 583 update) 584 585 successCount = sum(firsts(updates)) 586 oids = [] 587 for success, update in updates: 588 if success: 589 for oid, value in update.items(): 590 # should always get something back 591 if value == '': 592 self.badOid(deviceName, oid) 593 else: 594 self.storeRRD(deviceName, oid, value) 595 oids.append(oid) 596 597 if successCount == len(updates) and proxy.singleOidMode: 598 # remove any oids that didn't report 599 for doomed in Set(proxy.oidMap.keys()) - Set(oids): 600 self.badOid(deviceName, doomed) 601 602 if self.queryWorkList: 603 self.startReadDevice(self.queryWorkList.pop()) 604 605 if successCount: 606 successPercent = successCount * 100 / len(updates) 607 if successPercent not in (0, 100): 608 self.log.debug("Successful request ratio for %s is %2d%%", 609 deviceName, 610 successPercent) 611 success = True 612 if updates: 613 success = successCount > 0 614 self.status.record(deviceName, success) 615 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
616 617
618 - def storeRRD(self, device, oid, value):
619 'store a value into an RRD file' 620 oidData = self.proxies[device].oidMap.get(oid, None) 621 if not oidData: return 622 623 min, max = oidData.minmax 624 value = self.rrd.save(oidData.path, 625 value, 626 oidData.dataStorageType, 627 oidData.rrdCreateCommand, 628 min=min, max=max) 629 630 for ev in self.thresholds.check(oidData.path, time.time(), value): 631 ev['eventKey'] = oid 632 self.sendThresholdEvent(**ev)
633
634 - def connected(self):
635 "Run forever, fetching and storing" 636 d = drive(self.startUpdateConfig) 637 d.addCallbacks(self.scanCycle, self.errorStop)
638 639 640 if __name__ == '__main__': 641 zpf = zenperfsnmp() 642 zpf.run() 643