Trees | Indices | Help |
|
---|
|
1 #! /usr/bin/env python 2 ########################################################################### 3 # 4 # This program is part of Zenoss Core, an open source monitoring platform. 5 # Copyright (C) 2007, Zenoss Inc. 6 # 7 # This program is free software; you can redistribute it and/or modify it 8 # under the terms of the GNU General Public License version 2 as published by 9 # the Free Software Foundation. 10 # 11 # For complete information please visit: http://www.zenoss.com/oss/ 12 # 13 ########################################################################### 14 15 __doc__='''zenprocess 16 17 Gets snmp process performance data and stores it in RRD files. 18 19 $Id$ 20 ''' 21 22 __version__ = "$Revision$"[11:-2] 23 24 import logging 25 import time 26 from sets import Set 27 28 log = logging.getLogger("zen.zenprocess") 29 30 from twisted.internet import reactor, defer 31 from twisted.python import failure 32 33 try: 34 from pynetsnmp.twistedsnmp import AgentProxy 35 from pynetsnmp.tableretriever import TableRetriever 36 except ImportError: 37 import warnings 38 warnings.warn("Using python-based snmp enging") 39 from twistedsnmp.agentproxy import AgentProxy 40 from twistedsnmp.tableretriever import TableRetriever 41 42 import Globals 43 from Products.ZenUtils.Driver import drive, driveLater 44 from Products.ZenUtils.NJobs import NJobs 45 from Products.ZenUtils.Chain import Chain 46 from Products.ZenModel.PerformanceConf import performancePath 47 from Products.ZenEvents import Event 48 from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess 49 50 from Products.ZenRRD.RRDUtil import RRDUtil 51 from SnmpDaemon import SnmpDaemon 52 53 HOSTROOT ='.1.3.6.1.2.1.25' 54 RUNROOT = HOSTROOT + '.4' 55 NAMETABLE = RUNROOT + '.2.1.2' 56 PATHTABLE = RUNROOT + '.2.1.4' 57 ARGSTABLE = RUNROOT + '.2.1.5' 58 PERFROOT = HOSTROOT + '.5' 59 CPU = PERFROOT + '.1.1.1.' # note trailing dot 60 MEM = PERFROOT + '.1.1.2.' # note trailing dot 61 62 PARALLEL_JOBS = 10 63 64 WRAP=0xffffffffL 65 66 try: 67 sorted = sorted # added in python 2.4 68 except NameError: 7274 """return a dictionary with keys and values swapped: 75 all values are lists to handle the different keys mapping to the same value 76 """ 77 result = {} 78 for a, v in d.items(): 79 result.setdefault(v, []).append(a) 80 return result81 85 89 9193 cpu = None 94 memory = None 95120 12197 if n is not None: 98 try: 99 n = int(n) 100 except ValueError, er: 101 log.warning("Bad value for CPU: '%s'", n) 102 103 if self.cpu is None or n is None: 104 self.cpu = n 105 return None 106 diff = n - self.cpu 107 if diff < 0: 108 # don't provide a value when the counter falls backwards 109 n = None 110 diff = None 111 self.cpu = n 112 return diff113115 self.memory = n116 119 __repr__ = __str__123 'track process-specific configuration data' 124 name = None 125 originalName = None 126 ignoreParameters = False 127 restart = None 128 severity = Event.Warning 129 status = 0 130 cpu = 0 131 134166136 if self.name is None: 137 return False 138 if self.ignoreParameters or not args: 139 return self.originalName == name 140 return self.originalName == '%s %s' % (name, args)141 144 __repr__ = __str__ 145147 p = self.pids.setdefault(pid, Pid()) 148 cpu = p.updateCpu(value) 149 if cpu is not None: 150 self.cpu += cpu 151 self.cpu %= WRAP152154 return self.cpu155 158 162168 'track device data' 169 name = '' 170 address = ('', 0) 171 community = 'public' 172 version = '1' 173 port = 161 174 proxy = None 175 timeout = 2.5 176 tries = 2 177 protocol = None 178 lastScan = 0. 179 snmpStatus = 0 180 lastChange = 0 181 maxOidsPerRequest = 40 182239 240184 # map process name to Process object above 185 self.processes = {} 186 # map pid number to Process object 187 self.pids = {}188 193 197199 p = self.proxy 200 if (p is None or 201 (p.ip, p.port) != self.address or 202 p.snmpVersion != self.version or 203 p.port != self.port): 204 self.proxy = AgentProxy(ip=self.address[0], 205 port=self.address[1], 206 community=self.community, 207 snmpVersion=self.version, 208 protocol=self.protocol, 209 allowCache=True) 210 self.proxy.tries = self.tries 211 self.proxy.timeout = self.timeout212 213215 unused = Set(self.processes.keys()) 216 for name, originalName, ignoreParameters, restart, severity \ 217 in processes: 218 unused.discard(name) 219 p = self.processes.setdefault(name, Process()) 220 p.name = name 221 p.originalName = originalName 222 p.ignoreParameters = ignoreParameters 223 p.restart = restart 224 p.severity = severity 225 for name in unused: 226 del self.processes[name]227 228 231 232234 t = TableRetriever(self.proxy, oids, 235 timeout=self.timeout, 236 retryCount=self.tries, 237 maxRepetitions=self.maxOidsPerRequest / len(oids)) 238 return t()242 statusEvent = { 'eventClass' : Status_OSProcess, 243 'eventGroup' : 'Process' } 244 initialServices = SnmpDaemon.initialServices + ['ProcessConfig'] 245 processConfigInterval = 20*60 246 processCycleInterval = 5*60 247 properties = SnmpDaemon.properties + ('processCycleInterval',) 248281250 SnmpDaemon.__init__(self, 'zenprocess') 251 self._devices = {} 252 self.scanning = None 253 self.downDevices = Set()254256 "Return a filtered list of devices" 257 return dict([(k, v) for k, v in self._devices.items() 258 if k not in self.downDevices])259261 'Get configuration values from the Zope server' 262 def doFetchConfig(driver): 263 yield self.model().callRemote('getDefaultRRDCreateCommand') 264 createCommand = driver.next() 265 266 yield self.model().callRemote('propertyItems') 267 self.setPropertyItems(driver.next()) 268 269 self.rrd = RRDUtil(createCommand, self.processCycleInterval) 270 271 yield self.model().callRemote('getThresholdClasses') 272 self.remote_updateThresholdClasses(driver.next()) 273 274 devices = [] 275 if self.options.device: 276 devices = [self.options.device] 277 yield self.model().callRemote('getOSProcessConf', devices) 278 driver.next()279 280 return drive(doFetchConfig)283 self.log.debug("Async delete device %s" % doomed) 284 if doomed in self._devices: 285 del self._devices[doomed] 286 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")287289 self.log.debug("Async update device list %s" % devices) 290 doomed = Set(self._devices.keys()) 291 updated = [] 292 for device, lastChange in devices: 293 cfg = self._devices.get(device, None) 294 if not cfg or self._devices[device].lastChange < lastChange: 295 updated.append(device) 296 doomed.discard(device) 297 if updated: 298 log.info("Fetching the config for %s", updated) 299 d = self.model().callRemote('getOSProcessConf', devices) 300 d.addCallback(self.updateDevices, updated) 301 d.addErrback(self.error) 302 if doomed: 303 log.info("Removing %s", doomed) 304 for device in doomed: 305 del self._devices[device] 306 self.clearSnmpError(device, "device %s removed" % device)307 308310 if name in self._devices: 311 if self._devices[name].snmpStatus > 0: 312 self._devices[name].snmpStatus = 0 313 self.sendEvent(self.statusEvent, 314 eventClass=Status_Snmp, 315 component="snmp", 316 device=name, 317 summary=message, 318 severity=Event.Clear)319 320 324 325327 names = Set() 328 for cfg in cfgs: 329 lastChange, snmpConf, procs, thresholds = cfg 330 name, addr, snmpConf, maxOidsPerRequest = snmpConf 331 community, version, timeout, tries = snmpConf 332 names.add(name) 333 d = self._devices.setdefault(name, Device()) 334 d.lastChange = lastChange 335 d.name = name 336 d.address = addr 337 d.community = community 338 d.version = version 339 d.timeout = timeout 340 d.tries = tries 341 d.maxOidsPerRequest = maxOidsPerRequest 342 d.updateConfig(procs) 343 d.protocol = self.snmpPort.protocol 344 self.thresholds.updateList(thresholds) 345 for doomed in Set(fetched) - names: 346 if doomed in self._devices: 347 del self._devices[doomed]348350 'Read the basic config needed to do anything' 351 log.debug("fetching config") 352 devices = self._devices.keys() 353 yield self.fetchConfig() 354 self.updateDevices(driver.next(), devices) 355 356 yield self.model().callRemote('getSnmpStatus', self.options.device) 357 self.updateSnmpStatus(driver.next()) 358 359 yield self.model().callRemote('getProcessStatus', self.options.device) 360 self.updateProcessStatus(driver.next()) 361 362 driveLater(self.configCycleInterval * 60, self.start)363 364 370 371373 down = {} 374 for device, component, count in status: 375 down[ (device, component) ] = count 376 for name, device in self._devices.items(): 377 for p in device.processes.values(): 378 p.status = down.get( (name, p.originalName), 0)379 380382 device.open() 383 def go(driver): 384 yield self.scanDevice(device) 385 driver.next() 386 yield self.fetchPerf(device) 387 driver.next()388 d = drive(go) 389 d.addBoth(closer, device) 390 return d 391 392394 "Fetch all the process info" 395 device.lastScan = time.time() 396 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 397 d = device.getTables(tables) 398 d.addCallback(self.storeProcessNames, device) 399 d.addErrback(self.deviceFailure, device) 400 return d401 402404 "Log exception for a single device" 405 self.sendEvent(self.statusEvent, 406 eventClass=Status_Snmp, 407 component="snmp", 408 device=device.name, 409 summary='Unable to read processes on device %s' % device.name, 410 severity=Event.Error) 411 device.snmpStatus += 1 412 self.logError('Error on device %s' % device.name, error.value)413 414416 "Parse the process tables and figure what pids are on the device" 417 if not results: 418 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name 419 self.sendEvent(self.statusEvent, 420 device=device.name, 421 summary=summary, 422 severity=Event.Error) 423 log.info(summary) 424 return 425 if device.snmpStatus > 0: 426 device.snmpStatus = 0 427 summary = 'Good SNMP response from device %s' % device.name 428 self.clearSnmpError(device.name, summary) 429 430 procs = [] 431 names, paths, args = {}, {}, {} 432 def extract(dictionary, oid, value): 433 pid = int(oid.split('.')[-1]) 434 dictionary[pid] = value435 for row in results[NAMETABLE].items(): 436 extract(names, *row) 437 for row in results[PATHTABLE].items(): 438 extract(paths, *row) 439 for row in results[ARGSTABLE].items(): 440 extract(args, *row) 441 for i, path in paths.items(): 442 if i in names and i in args: 443 name = names[i] 444 if path and path.find('\\') == -1: 445 name = path 446 procs.append( (i, (name, args[i]) ) ) 447 # look for changes in pids 448 before = Set(device.pids.keys()) 449 after = {} 450 for p in device.processes.values(): 451 for pid, (name, args) in procs: 452 if p.match(name, args): 453 log.debug("Found process %d on %s" % (pid, p.name)) 454 after[pid] = p 455 afterSet = Set(after.keys()) 456 afterByConfig = reverseDict(after) 457 new = afterSet - before 458 dead = before - afterSet 459 460 # report pid restarts 461 for p in dead: 462 config = device.pids[p] 463 config.discardPid(p) 464 if afterByConfig.has_key(config): 465 if config.restart: 466 summary = 'Process restarted: %s' % config.originalName 467 self.sendEvent(self.statusEvent, 468 device=device.name, 469 summary=summary, 470 component=config.originalName, 471 severity=config.severity) 472 log.info(summary) 473 474 # report alive processes 475 for config, pids in afterByConfig.items(): 476 if config.status > 0: 477 summary = "Process up: %s" % config.originalName 478 self.sendEvent(self.statusEvent, 479 device=device.name, 480 summary=summary, 481 component=config.originalName, 482 severity=Event.Clear) 483 config.status = 0 484 log.debug(summary) 485 486 for p in new: 487 log.debug("Found new %s pid %d on %s" % ( 488 after[p].originalName, p, device.name)) 489 device.pids = after 490 491 # no pids for a config 492 for config in device.processes.values(): 493 if not afterByConfig.has_key(config): 494 config.status += 1 495 summary = 'Process not running: %s' % config.originalName 496 self.sendEvent(self.statusEvent, 497 device=device.name, 498 summary=summary, 499 component=config.originalName, 500 severity=config.severity) 501 log.warning(summary) 502 503 # store counts 504 pidCounts = dict([(p, 0) for p in device.processes]) 505 for pids, pidConfig in device.pids.items(): 506 pidCounts[pidConfig.name] += 1 507 for name, count in pidCounts.items(): 508 self.save(device.name, name, 'count_count', count, 'GAUGE') 509 510512 "Basic SNMP scan loop" 513 reactor.callLater(self.processCycleInterval, self.periodic) 514 515 if self.scanning: 516 running, unstarted, finished = self.scanning.status() 517 msg = "performance scan job not finishing: " \ 518 "%d jobs running %d jobs waiting %d jobs finished" % \ 519 (running, unstarted, finished) 520 log.error(msg) 521 return 522 523 def doPeriodic(driver): 524 525 yield self.getDevicePingIssues() 526 self.downDevices = Set([d[0] for d in driver.next()]) 527 528 self.scanning = NJobs(PARALLEL_JOBS, 529 self.oneDevice, 530 self.devices().values()) 531 yield self.scanning.start() 532 driver.next()533 534 drive(doPeriodic).addCallback(self.heartbeat) 535 536538 "Get performance data for all the monitored Processes on a device" 539 oids = [] 540 for pid, pidConf in device.pids.items(): 541 oids.extend([CPU + str(pid), MEM + str(pid)]) 542 if not oids: 543 return defer.succeed(([], device)) 544 545 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run() 546 d.addBoth(self.storePerfStats, device) 547 return d548 549551 "Save the performance data in RRD files" 552 if isinstance(results, failure.Failure): 553 self.error(results) 554 return results 555 self.clearSnmpError(device.name, 556 'Performance data read for %s' % device.name) 557 parts = {} 558 for success, values in results: 559 if success: 560 parts.update(values) 561 results = parts 562 byConf = reverseDict(device.pids) 563 for pidConf, pids in byConf.items(): 564 if len(pids) != 1: 565 log.info("There are %d pids by the name %s", 566 len(pids), pidConf.name) 567 pidName = pidConf.name 568 for pid in pids: 569 cpu = results.get(CPU + str(pid), None) 570 mem = results.get(MEM + str(pid), None) 571 pidConf.updateCpu(pid, cpu) 572 pidConf.updateMemory(pid, mem) 573 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(), 574 'DERIVE', min=0) 575 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024, 576 'GAUGE')577 578581 "Save an value in the right path in RRD files" 582 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 583 value = self.rrd.save(path, value, rrdType, min=min, max=max) 584 585 for ev in self.thresholds.check('/' + path, time.time(), value): 586 self.sendThresholdEvent(**ev)587 588 596 597 600 601 602 if __name__ == '__main__': 603 z = zenprocess() 604 z.run() 605
Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0beta1 on Thu Oct 25 16:28:48 2007 | http://epydoc.sourceforge.net |