Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zenprocess

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__= """zenprocess 
 16   
 17  Gets SNMP process performance data and stores it in RRD files. 
 18  """ 
 19   
 20  import logging 
 21  import time 
 22  from sets import Set 
 23   
 24  log = logging.getLogger("zen.zenprocess") 
 25   
 26  from twisted.internet import reactor, defer, error 
 27   
 28  import Globals 
 29  from Products.ZenUtils.Driver import drive, driveLater 
 30  from Products.ZenUtils.NJobs import NJobs 
 31  from Products.ZenUtils.Chain import Chain 
 32  from Products.ZenEvents import Event 
 33  from Products.ZenEvents.ZenEventClasses import Status_Snmp, \ 
 34        Status_OSProcess, Critical, Status_Perf 
 35   
 36  from Products.ZenRRD.RRDUtil import RRDUtil 
 37  from SnmpDaemon import SnmpDaemon 
 38   
 39  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 40  # needed for pb comms 
 41  SnmpConnInfo = SnmpConnInfo 
 42   
 43  HOSTROOT  ='.1.3.6.1.2.1.25' 
 44  RUNROOT   = HOSTROOT + '.4' 
 45  NAMETABLE = RUNROOT + '.2.1.2' 
 46  PATHTABLE = RUNROOT + '.2.1.4' 
 47  ARGSTABLE = RUNROOT + '.2.1.5' 
 48  PERFROOT  = HOSTROOT + '.5' 
 49  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 50  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 51   
 52  DEFAULT_PARALLEL_JOBS = 10 
 53   
 54  WRAP=0xffffffffL 
 55   
 56  try: 
 57      sorted = sorted                     # added in python 2.4 
 58  except NameError: 
59 - def sorted(x, *args, **kw):
60 x.sort(*args, **kw) 61 return x
62
63 -def reverseDict(d):
64 """return a dictionary with keys and values swapped: 65 all values are lists to handle the different keys mapping to the same value 66 """ 67 result = {} 68 for a, v in d.items(): 69 result.setdefault(v, []).append(a) 70 return result
71
72 -def chunk(lst, n):
73 'break lst into n-sized chunks' 74 return [lst[i:i+n] for i in range(0, len(lst), n)]
75
76 -class ScanFailure(Exception): pass
77
78 -class Pid:
79 cpu = None 80 memory = None 81
82 - def updateCpu(self, n):
83 if n is not None: 84 try: 85 n = int(n) 86 except ValueError, er: 87 log.warning("Bad value for CPU: '%s'", n) 88 89 if self.cpu is None or n is None: 90 self.cpu = n 91 return None 92 diff = n - self.cpu 93 if diff < 0: 94 # don't provide a value when the counter falls backwards 95 n = None 96 diff = None 97 self.cpu = n 98 return diff
99
100 - def updateMemory(self, n):
101 self.memory = n
102
103 - def __str__(self):
104 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
105 __repr__ = __str__
106 107 108 from twisted.spread import pb
109 -class Process(pb.Copyable, pb.RemoteCopy):
110 'track process-specific configuration data' 111 name = None 112 originalName = None 113 ignoreParameters = False 114 restart = None 115 severity = Event.Warning 116 status = 0 117 cpu = 0 118 cycleTime = None 119
120 - def __init__(self):
121 self.pids = {}
122
123 - def match(self, name, args):
124 if self.name is None: 125 return False 126 if self.ignoreParameters or not args: 127 return self.originalName == name 128 return self.originalName == '%s %s' % (name, args)
129
130 - def __str__(self):
131 return str(self.name)
132 __repr__ = __str__ 133
134 - def updateCpu(self, pid, value):
135 p = self.pids.setdefault(pid, Pid()) 136 cpu = p.updateCpu(value) 137 if cpu is not None: 138 self.cpu += cpu 139 self.cpu %= WRAP
140
141 - def getCpu(self):
142 return self.cpu
143
144 - def updateMemory(self, pid, value):
145 self.pids.setdefault(pid, Pid()).memory = value
146
147 - def getMemory(self):
148 return sum([x.memory for x in self.pids.values() 149 if x.memory is not None])
150
151 - def discardPid(self, pid):
152 if self.pids.has_key(pid): 153 del self.pids[pid]
154
155 - def updateConfig(self, update):
156 if self is update: 157 return 158 self.name = update.name 159 self.originalName = update.originalName 160 self.ignoreParameters = update.ignoreParameters 161 self.restart = update.restart 162 self.severity = update.severity
163 164 pb.setUnjellyableForClass(Process, Process) 165
166 -class Device(pb.Copyable, pb.RemoteCopy):
167 'track device data' 168 name = '' 169 snmpConnInfo = None 170 proxy = None 171 lastScan = 0. 172 snmpStatus = 0 173 lastChange = 0 174 maxOidsPerRequest = 40 175
176 - def __init__(self):
177 # map process name to Process object above 178 self.processes = {} 179 # map pid number to Process object 180 self.pids = {}
181
182 - def open(self):
183 self._makeProxy()
184
185 - def close(self, unused=None):
186 if self.proxy: 187 self.proxy.close() 188 self.proxy = None 189 return unused
190
191 - def _makeProxy(self):
192 p = self.proxy 193 c = self.snmpConnInfo 194 if (p is None or p.snmpConnInfo != c): 195 self.proxy = self.snmpConnInfo.createSession() 196 self.proxy.open()
197
198 - def updateConfig(self, cfg):
199 if self is cfg: 200 return 201 self.snmpConnInfo = cfg.snmpConnInfo 202 unused = Set(self.processes.keys()) 203 for update in cfg.processes.values(): 204 unused.discard(update.name) 205 p = self.processes.setdefault(update.name, Process()) 206 p.updateConfig(update) 207 for name in unused: 208 del self.processes[name]
209 210
211 - def get(self, oids):
212 return self.proxy.get(oids, 213 self.snmpConnInfo.zSnmpTimeout, 214 self.snmpConnInfo.zSnmpTries)
215 216
217 - def getTables(self, oids):
218 repetitions = self.maxOidsPerRequest / len(oids) 219 t = self.proxy.getTable(oids, 220 timeout=self.snmpConnInfo.zSnmpTimeout, 221 retryCount=self.snmpConnInfo.zSnmpTries, 222 maxRepetitions=repetitions) 223 return t
224 pb.setUnjellyableForClass(Device, Device) 225 226
227 -class zenprocess(SnmpDaemon):
228 statusEvent = { 'eventClass' : Status_OSProcess, 229 'eventGroup' : 'Process' } 230 initialServices = SnmpDaemon.initialServices + ['ProcessConfig'] 231 processConfigInterval = 20*60 232 processCycleInterval = 5*60 233 properties = SnmpDaemon.properties + ('processCycleInterval',) 234 missing = 0 235 restarted = 0 236 parallelJobs = DEFAULT_PARALLEL_JOBS 237
238 - def __init__(self, noopts=False):
239 SnmpDaemon.__init__(self, 'zenprocess', noopts) 240 self._devices = {} 241 self.scanning = None 242 self.downDevices = Set()
243
244 - def devices(self):
245 "Return a filtered list of devices" 246 return dict([(k, v) for k, v in self._devices.items() 247 if k not in self.downDevices])
248
249 - def fetchConfig(self):
250 'Get configuration values from the Zope server' 251 def doFetchConfig(driver): 252 now = time.time() 253 254 yield self.model().callRemote('getDefaultRRDCreateCommand') 255 createCommand = driver.next() 256 257 yield self.model().callRemote('getZenProcessParallelJobs') 258 self.parallelJobs = int(driver.next()) 259 260 yield self.model().callRemote('propertyItems') 261 self.setPropertyItems(driver.next()) 262 263 self.rrd = RRDUtil(createCommand, self.processCycleInterval) 264 265 yield self.model().callRemote('getThresholdClasses') 266 self.remote_updateThresholdClasses(driver.next()) 267 268 yield self.model().callRemote('getCollectorThresholds') 269 self.rrdStats.config(self.options.monitor, 270 self.name, 271 driver.next(), 272 createCommand) 273 274 devices = [] 275 if self.options.device: 276 devices = [self.options.device] 277 yield self.model().callRemote('getOSProcessConf', devices) 278 driver.next() 279 self.sendEvents( 280 self.rrdStats.gauge('configTime', 281 self.processConfigInterval, 282 time.time() - now) 283 )
284 285 return drive(doFetchConfig)
286
287 - def remote_deleteDevice(self, doomed):
288 self.log.debug("Async delete device %s" % doomed) 289 if doomed in self._devices: 290 del self._devices[doomed] 291 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
292
293 - def remote_updateDeviceList(self, devices):
294 self.log.debug("Async update device list %s" % devices) 295 doomed = Set(self._devices.keys()) 296 updated = [] 297 for device, lastChange in devices: 298 cfg = self._devices.get(device, None) 299 if not cfg or self._devices[device].lastChange < lastChange: 300 updated.append(device) 301 doomed.discard(device) 302 if updated: 303 log.info("Fetching the config for %s", updated) 304 d = self.model().callRemote('getOSProcessConf', devices) 305 d.addCallback(self.updateDevices, updated) 306 d.addErrback(self.error) 307 if doomed: 308 log.info("Removing %s", doomed) 309 for device in doomed: 310 del self._devices[device] 311 self.clearSnmpError(device, "device %s removed" % device)
312 313
314 - def clearSnmpError(self, name, message):
315 if name in self._devices: 316 if self._devices[name].snmpStatus > 0: 317 self._devices[name].snmpStatus = 0 318 self.sendEvent(self.statusEvent, 319 eventClass=Status_Snmp, 320 component="process", 321 device=name, 322 summary=message, 323 severity=Event.Clear)
324 325
326 - def remote_updateDevice(self, cfg):
327 self.log.debug("Async config update for %s", cfg.name) 328 self.updateDevices([cfg],[])
329 330
331 - def updateDevices(self, cfgs, fetched):
332 received = Set() 333 for cfg in cfgs: 334 received.add(cfg.name) 335 d = self._devices.setdefault(cfg.name, cfg) 336 d.updateConfig(cfg) 337 self.thresholds.updateForDevice(cfg.name, cfg.thresholds) 338 for doomed in Set(fetched) - received: 339 if doomed in self._devices: 340 del self._devices[doomed]
341
342 - def start(self, driver):
343 'Read the basic config needed to do anything' 344 log.debug("fetching config") 345 devices = self._devices.keys() 346 yield self.fetchConfig() 347 self.updateDevices(driver.next(), devices) 348 349 yield self.model().callRemote('getSnmpStatus', self.options.device) 350 self.updateSnmpStatus(driver.next()) 351 352 yield self.model().callRemote('getProcessStatus', self.options.device) 353 self.updateProcessStatus(driver.next()) 354 355 driveLater(self.configCycleInterval * 60, self.start)
356 357
358 - def updateSnmpStatus(self, updates):
359 for name, count in updates: 360 d = self._devices.get(name) 361 if d: 362 d.snmpStatus = count
363 364
365 - def updateProcessStatus(self, status):
366 down = {} 367 for device, component, count in status: 368 down[ (device, component) ] = count 369 for name, device in self._devices.items(): 370 for p in device.processes.values(): 371 p.status = down.get( (name, p.originalName), 0)
372 373
374 - def oneDevice(self, device):
375 def go(driver): 376 try: 377 device.open() 378 yield self.scanDevice(device) 379 driver.next() 380 381 # Only fetch performance data if status data was found. 382 if device.snmpStatus == 0: 383 yield self.fetchPerf(device) 384 driver.next() 385 except: 386 log.debug('Failed to scan device %s' % device.name)
387 388 def close(res): 389 try: 390 device.close() 391 except: 392 log.debug("Failed to close device %s" % device.name) 393 394 d = drive(go) 395 d.addBoth(close) 396 return d 397 398
399 - def scanDevice(self, device):
400 "Fetch all the process info" 401 device.lastScan = time.time() 402 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 403 d = device.getTables(tables) 404 d.addCallback(self.storeProcessNames, device) 405 d.addErrback(self.deviceFailure, device) 406 return d
407 408
409 - def deviceFailure(self, reason, device):
410 "Log exception for a single device" 411 self.sendEvent(self.statusEvent, 412 eventClass=Status_Snmp, 413 component="process", 414 device=device.name, 415 summary='Unable to read processes on device %s' % device.name, 416 severity=Event.Error) 417 device.snmpStatus += 1 418 if isinstance(reason.value, error.TimeoutError): 419 self.log.debug('Timeout on device %s' % device.name) 420 else: 421 self.logError('Error on device %s' % device.name, reason.value)
422 423
424 - def storeProcessNames(self, results, device):
425 "Parse the process tables and figure what pids are on the device" 426 if not results: 427 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name 428 self.sendEvent(self.statusEvent, 429 device=device.name, 430 summary=summary, 431 severity=Event.Error) 432 log.info(summary) 433 return 434 if device.snmpStatus > 0: 435 summary = 'Process table up for device %s' % device.name 436 self.clearSnmpError(device.name, summary) 437 438 procs = [] 439 names, paths, args = {}, {}, {} 440 def extract(dictionary, oid, value): 441 pid = int(oid.split('.')[-1]) 442 dictionary[pid] = value
443 for row in results[NAMETABLE].items(): 444 extract(names, *row) 445 for row in results[PATHTABLE].items(): 446 extract(paths, *row) 447 for row in results[ARGSTABLE].items(): 448 extract(args, *row) 449 for i, path in paths.items(): 450 if i not in names: continue 451 name = names[i] 452 if path and path.find('\\') == -1: 453 name = path 454 arg = '' 455 if i in args: arg = args[i] 456 procs.append( (i, (name, arg) ) ) 457 # look for changes in pids 458 before = Set(device.pids.keys()) 459 after = {} 460 for p in device.processes.values(): 461 for pid, (name, args) in procs: 462 if p.match(name, args): 463 log.debug("Found process %d on %s" % (pid, p.name)) 464 after[pid] = p 465 afterSet = Set(after.keys()) 466 afterByConfig = reverseDict(after) 467 new = afterSet - before 468 dead = before - afterSet 469 470 # report pid restarts 471 for p in dead: 472 config = device.pids[p] 473 config.discardPid(p) 474 if afterByConfig.has_key(config): 475 self.restarted += 1 476 if config.restart: 477 summary = 'Process restarted: %s' % config.originalName 478 self.sendEvent(self.statusEvent, 479 device=device.name, 480 summary=summary, 481 component=config.originalName, 482 severity=config.severity) 483 log.info(summary) 484 485 # report alive processes 486 for config, pids in afterByConfig.items(): 487 summary = "Process up: %s" % config.originalName 488 self.sendEvent(self.statusEvent, 489 device=device.name, 490 summary=summary, 491 component=config.originalName, 492 severity=Event.Clear) 493 config.status = 0 494 log.debug(summary) 495 496 for p in new: 497 log.debug("Found new %s pid %d on %s" % ( 498 after[p].originalName, p, device.name)) 499 device.pids = after 500 501 # no pids for a config 502 for config in device.processes.values(): 503 if not afterByConfig.has_key(config): 504 self.missing += 1 505 config.status += 1 506 summary = 'Process not running: %s' % config.originalName 507 self.sendEvent(self.statusEvent, 508 device=device.name, 509 summary=summary, 510 component=config.originalName, 511 severity=config.severity) 512 log.warning(summary) 513 514 # store counts 515 pidCounts = dict([(p, 0) for p in device.processes]) 516 for pids, pidConfig in device.pids.items(): 517 pidCounts[pidConfig.name] += 1 518 for name, count in pidCounts.items(): 519 self.save(device.name, name, 'count_count', count, 'GAUGE') 520 521
522 - def periodic(self, unused=None):
523 "Basic SNMP scan loop" 524 reactor.callLater(self.processCycleInterval, self.periodic) 525 526 if self.scanning: 527 running, unstarted, finished = self.scanning.status() 528 msg = "performance scan job not finishing: " \ 529 "%d jobs running %d jobs waiting %d jobs finished" % \ 530 (running, unstarted, finished) 531 log.error(msg) 532 log.error("Problem devices: %r", [ 533 d.name for d in self.devices().values() \ 534 if d.proxy is not None]) 535 return 536 537 start = time.time() 538 539 def doPeriodic(driver): 540 541 yield self.getDevicePingIssues() 542 self.downDevices = Set([d[0] for d in driver.next()]) 543 544 self.scanning = NJobs(self.parallelJobs, 545 self.oneDevice, 546 self.devices().values()) 547 yield self.scanning.start() 548 driver.next()
549 550 def checkResults(results): 551 for result in results: 552 if isinstance(result , Exception): 553 log.error("Error scanning device: %s", result) 554 break 555 self.cycleTime = time.time() - start 556 self.heartbeat() 557 558 drive(doPeriodic).addCallback(checkResults) 559 560
561 - def fetchPerf(self, device):
562 "Get performance data for all the monitored Processes on a device" 563 oids = [] 564 for pid, pidConf in device.pids.items(): 565 oids.extend([CPU + str(pid), MEM + str(pid)]) 566 if not oids: 567 return defer.succeed(([], device)) 568 569 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run() 570 d.addCallback(self.storePerfStats, device) 571 d.addErrback(self.deviceFailure, device) 572 return d
573 574
575 - def storePerfStats(self, results, device):
576 "Save the performance data in RRD files" 577 for success, result in results: 578 if not success: 579 self.deviceFailure(result, device) 580 return results 581 self.clearSnmpError(device.name, 582 'Process table up for device %s' % device.name) 583 parts = {} 584 for success, values in results: 585 if success: 586 parts.update(values) 587 results = parts 588 byConf = reverseDict(device.pids) 589 for pidConf, pids in byConf.items(): 590 if len(pids) != 1: 591 log.info("There are %d pids by the name %s", 592 len(pids), pidConf.name) 593 pidName = pidConf.name 594 for pid in pids: 595 cpu = results.get(CPU + str(pid), None) 596 mem = results.get(MEM + str(pid), None) 597 pidConf.updateCpu(pid, cpu) 598 pidConf.updateMemory(pid, mem) 599 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(), 600 'DERIVE', min=0) 601 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024, 602 'GAUGE')
603 604
605 - def save(self, deviceName, pidName, statName, value, rrdType, 606 min='U', max='U'):
607 """ 608 Save a value into an RRD file 609 610 @param deviceName: name of the remote device (ie a hostname) 611 @type deviceName: string 612 @param pidName: process id of the monitored process 613 @type pidName: string 614 @param statName: metric name 615 @type statName: string 616 @param value: data to be stored 617 @type value: number 618 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 619 @type rrdType: string 620 @param min: minimum value acceptable for this metric 621 @type min: number 622 @param max: maximum value acceptable for this metric 623 @type max: number 624 """ 625 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 626 try: 627 value = self.rrd.save(path, value, rrdType, min=min, max=max) 628 629 except Exception, ex: 630 summary= "Unable to save data for process-monitor RRD %s" % \ 631 path 632 self.log.critical( summary ) 633 634 message= "Data was value= %s, type=%s, min=%s, max=%s" % \ 635 ( value, rrdType, min, max, ) 636 self.log.critical( message ) 637 self.log.exception( ex ) 638 639 import traceback 640 trace_info= traceback.format_exc() 641 642 evid= self.sendEvent(dict( 643 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'), 644 severity=Critical, 645 device=self.options.monitor, 646 eventClass=Status_Perf, 647 component="RRD", 648 pidName=pidName, 649 statName=statName, 650 path=path, 651 message=message, 652 traceback=trace_info, 653 summary=summary)) 654 655 # Skip thresholds 656 return 657 658 for ev in self.thresholds.check(path, time.time(), value): 659 self.sendThresholdEvent(**ev)
660 661
662 - def heartbeat(self):
663 self.scanning = None 664 devices = self.devices() 665 pids = sum(map(lambda x: len(x.pids), devices.values())) 666 log.info("Pulled process status for %d devices and %d processes", 667 len(devices), pids) 668 SnmpDaemon.heartbeat(self) 669 cycle = self.processCycleInterval 670 self.sendEvents( 671 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) + 672 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) + 673 self.rrdStats.gauge('pids', cycle, pids) + 674 self.rrdStats.gauge('devices', cycle, len(devices)) + 675 self.rrdStats.gauge('missing', cycle, self.missing) + 676 self.rrdStats.gauge('restarted', cycle, self.restarted) + 677 self.rrdStats.gauge('cycleTime', cycle, self.cycleTime) 678 )
679 680
681 - def connected(self):
682 drive(self.start).addCallbacks(self.periodic, self.errorStop)
683 684 685 if __name__ == '__main__': 686 from Products.ZenRRD.zenprocess import zenprocess 687 z = zenprocess() 688 z.run() 689