Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zenprocess

  1  #! /usr/bin/env python 
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, 2009 Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__= """zenprocess 
 16   
 17  Gets SNMP process data from a device's HOST-RESOURCES-MIB 
 18  and store process performance in RRD files. 
 19  """ 
 20   
 21  import sys 
 22  import logging 
 23  import time 
 24  from sets import Set 
 25   
 26  log = logging.getLogger("zen.zenprocess") 
 27   
 28  from twisted.internet import reactor, defer, error 
 29  from twisted.spread import pb 
 30   
 31  import Globals 
 32  from Products.ZenUtils.Driver import drive, driveLater 
 33  from Products.ZenUtils.NJobs import NJobs 
 34  from Products.ZenUtils.Chain import Chain 
 35  from Products.ZenEvents import Event 
 36  from Products.ZenEvents.ZenEventClasses import Status_Snmp, \ 
 37        Status_OSProcess, Critical, Status_Perf 
 38   
 39  from Products.ZenRRD.RRDUtil import RRDUtil 
 40  from SnmpDaemon import SnmpDaemon 
 41   
 42  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 43  # needed for pb comms 
 44  SnmpConnInfo = SnmpConnInfo # Shut up, pyflakes! 
 45   
 46  # HOST-RESOURCES-MIB OIDs used 
 47  HOSTROOT  ='.1.3.6.1.2.1.25' 
 48  RUNROOT   = HOSTROOT + '.4' 
 49  NAMETABLE = RUNROOT + '.2.1.2' 
 50  PATHTABLE = RUNROOT + '.2.1.4' 
 51  ARGSTABLE = RUNROOT + '.2.1.5' 
 52  PERFROOT  = HOSTROOT + '.5' 
 53  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 54  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 55   
 56  DEFAULT_PARALLEL_JOBS = 10 
 57   
 58  # Max size for CPU numbers 
 59  WRAP = 0xffffffffL 
 60   
61 -def reverseDict(d):
62 """ 63 Return a dictionary with keys and values swapped: 64 all values are lists to handle the different keys mapping to the same value 65 """ 66 result = {} 67 for a, v in d.items(): 68 result.setdefault(v, []).append(a) 69 return result
70
71 -def chunk(lst, n):
72 """ 73 Break lst into n-sized chunks 74 """ 75 return [lst[i:i+n] for i in range(0, len(lst), n)]
76
77 -class ScanFailure(Exception): pass
78
79 -class Pid:
80 """ 81 Helper class to track process id information 82 """ 83 cpu = None 84 memory = None 85
86 - def updateCpu(self, n):
87 """ 88 """ 89 if n is not None: 90 try: 91 n = int(n) 92 except ValueError, er: 93 log.warning("Bad value for CPU: '%s'", n) 94 95 if self.cpu is None or n is None: 96 self.cpu = n 97 return None 98 diff = n - self.cpu 99 if diff < 0: 100 # don't provide a value when the counter falls backwards 101 n = None 102 diff = None 103 self.cpu = n 104 return diff
105
106 - def updateMemory(self, n):
107 """ 108 """ 109 self.memory = n
110
111 - def __str__(self):
112 """ 113 Override the Python default to represent ourselves as a string 114 """ 115 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
116 __repr__ = __str__
117 118
119 -class Process(pb.Copyable, pb.RemoteCopy):
120 """ 121 Track process-specific configuration data 122 """ 123 name = None 124 originalName = None 125 ignoreParameters = False 126 restart = None 127 severity = Event.Warning 128 status = 0 129 cpu = 0 130 cycleTime = None 131
132 - def __init__(self):
133 self.pids = {}
134
135 - def match(self, name, args):
136 """ 137 Perform exact comparisons on the process names. 138 139 @parameter name: name of a process to compare 140 @type name: string 141 @parameter args: argument list of the process 142 @type args: string 143 @return: does the name match this process's info? 144 @rtype: Boolean 145 """ 146 if self.name is None: 147 return False 148 if self.ignoreParameters or not args: 149 return self.originalName == name 150 return self.originalName == '%s %s' % (name, args)
151
152 - def __str__(self):
153 """ 154 Override the Python default to represent ourselves as a string 155 """ 156 return str(self.name)
157 __repr__ = __str__ 158
159 - def updateCpu(self, pid, value):
160 """ 161 """ 162 p = self.pids.setdefault(pid, Pid()) 163 cpu = p.updateCpu(value) 164 if cpu is not None: 165 self.cpu += cpu 166 self.cpu %= WRAP
167
168 - def getCpu(self):
169 """ 170 """ 171 return self.cpu
172
173 - def updateMemory(self, pid, value):
174 """ 175 """ 176 self.pids.setdefault(pid, Pid()).memory = value
177
178 - def getMemory(self):
179 """ 180 """ 181 return sum([x.memory for x in self.pids.values() 182 if x.memory is not None])
183
184 - def discardPid(self, pid):
185 """ 186 """ 187 if pid in self.pids: 188 del self.pids[pid]
189
190 - def updateConfig(self, update):
191 """ 192 """ 193 if self is update: 194 return 195 self.name = update.name 196 self.originalName = update.originalName 197 self.ignoreParameters = update.ignoreParameters 198 self.restart = update.restart 199 self.severity = update.severity
200 201 pb.setUnjellyableForClass(Process, Process) 202 203
204 -class Device(pb.Copyable, pb.RemoteCopy):
205 """ 206 Track device data 207 """ 208 name = '' 209 snmpConnInfo = None 210 proxy = None 211 lastScan = 0. 212 snmpStatus = 0 213 lastChange = 0 214 maxOidsPerRequest = 40 215
216 - def __init__(self):
217 # map process name to Process object above 218 self.processes = {} 219 # map pid number to Process object 220 self.pids = {}
221
222 - def open(self):
223 """ 224 Create a connection to the remote device 225 """ 226 if (self.proxy is None or \ 227 self.proxy.snmpConnInfo != self.snmpConnInfo): 228 self.proxy = self.snmpConnInfo.createSession() 229 self.proxy.open()
230
231 - def close(self, unused=None):
232 """ 233 Close down the connection to the remote device 234 """ 235 if self.proxy: 236 self.proxy.close() 237 self.proxy = None 238 return unused
239
240 - def updateConfig(self, cfg):
241 """ 242 Called with configuration information from zenhub. 243 """ 244 if self is cfg: 245 return 246 log.debug("Updating configuration for %s", 247 update.name) 248 self.snmpConnInfo = cfg.snmpConnInfo 249 unused = Set(self.processes.keys()) 250 for update in cfg.processes.values(): 251 unused.discard(update.name) 252 p = self.processes.setdefault(update.name, Process()) 253 p.updateConfig(update) 254 for name in unused: 255 del self.processes[name]
256 257
258 - def get(self, oids):
259 """ 260 Perform SNMP get for specified OIDs 261 262 @parameter oids: OIDs to gather 263 @type oids: list of strings 264 @return: Twisted deferred 265 @rtype: Twisted deferred 266 """ 267 return self.proxy.get(oids, 268 self.snmpConnInfo.zSnmpTimeout, 269 self.snmpConnInfo.zSnmpTries)
270 271
272 - def getTables(self, oids):
273 """ 274 Perform SNMP getTable for specified OIDs 275 276 @parameter oids: OIDs to gather 277 @type oids: list of strings 278 @return: Twisted deferred 279 @rtype: Twisted deferred 280 """ 281 repetitions = self.maxOidsPerRequest / len(oids) 282 t = self.proxy.getTable(oids, 283 timeout=self.snmpConnInfo.zSnmpTimeout, 284 retryCount=self.snmpConnInfo.zSnmpTries, 285 maxRepetitions=repetitions, 286 limit=sys.maxint) 287 return t
288 pb.setUnjellyableForClass(Device, Device) 289 290
291 -class zenprocess(SnmpDaemon):
292 """ 293 Daemon class to connect to an SNMP agent and determine the processes 294 that are running on that server. 295 """ 296 statusEvent = { 'eventClass' : Status_OSProcess, 297 'eventGroup' : 'Process' } 298 initialServices = SnmpDaemon.initialServices + ['ProcessConfig'] 299 processConfigInterval = 20*60 300 processCycleInterval = 5*60 301 properties = SnmpDaemon.properties + ('processCycleInterval',) 302 missing = 0 303 restarted = 0 304 parallelJobs = DEFAULT_PARALLEL_JOBS 305
306 - def __init__(self, noopts=False):
307 SnmpDaemon.__init__(self, 'zenprocess', noopts) 308 self._devices = {} 309 self.scanning = None 310 self.downDevices = Set()
311
312 - def devices(self):
313 """ 314 Return the list of devices that are available 315 316 @return: device list 317 @rtype: dictionary of device name, device object 318 """ 319 return dict([(k, v) for k, v in self._devices.items() 320 if k not in self.downDevices])
321
322 - def fetchConfig(self):
323 """ 324 Get configuration values from zenhub 325 326 @return: Twisted deferred 327 @rtype: Twisted deferred 328 """ 329 def doFetchConfig(driver): 330 now = time.time() 331 332 yield self.model().callRemote('getDefaultRRDCreateCommand') 333 createCommand = driver.next() 334 335 yield self.model().callRemote('getZenProcessParallelJobs') 336 self.parallelJobs = int(driver.next()) 337 338 yield self.model().callRemote('propertyItems') 339 self.setPropertyItems(driver.next()) 340 341 self.rrd = RRDUtil(createCommand, self.processCycleInterval) 342 343 yield self.model().callRemote('getThresholdClasses') 344 self.remote_updateThresholdClasses(driver.next()) 345 346 yield self.model().callRemote('getCollectorThresholds') 347 self.rrdStats.config(self.options.monitor, 348 self.name, 349 driver.next(), 350 createCommand) 351 352 devices = [] 353 if self.options.device: 354 devices = [self.options.device] 355 yield self.model().callRemote('getOSProcessConf', devices) 356 driver.next() 357 self.sendEvents( 358 self.rrdStats.gauge('configTime', 359 self.processConfigInterval, 360 time.time() - now) 361 )
362 363 return drive(doFetchConfig)
364
365 - def remote_deleteDevice(self, doomed):
366 """ 367 Called from zenhub to remove a device from our configuration 368 369 @parameter doomed: device to delete 370 @type doomed: string 371 """ 372 self.log.debug("zenhub asks us to delete device %s" % doomed) 373 if doomed in self._devices: 374 del self._devices[doomed] 375 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
376
377 - def remote_updateDeviceList(self, devices):
378 """ 379 Called from zenhub to update the devices to monitor 380 381 @parameter devices: devices to monitor 382 @type devices: list of (device, changetime) tuples 383 """ 384 self.log.debug("Received updated device list from zenhub %s" % devices) 385 doomed = Set(self._devices.keys()) 386 updated = [] 387 for device, lastChange in devices: 388 # Ignore updates for devices if we've only asked for one device 389 if self.options.device and \ 390 device != self.options.device: 391 self.log.debug("Ignoring update for %s as we only want %s", 392 device, self.options.device) 393 continue 394 395 cfg = self._devices.get(device, None) 396 if not cfg or self._devices[device].lastChange < lastChange: 397 updated.append(device) 398 doomed.discard(device) 399 400 if updated: 401 log.info("Fetching the config for %s", updated) 402 d = self.model().callRemote('getOSProcessConf', devices) 403 d.addCallback(self.updateDevices, updated) 404 d.addErrback(self.error) 405 406 if doomed: 407 log.info("Removing %s", doomed) 408 for device in doomed: 409 del self._devices[device] 410 self.clearSnmpError(device, "device %s removed" % device)
411 412
413 - def clearSnmpError(self, name, message):
414 """ 415 Send an event to clear other events. 416 417 @parameter name: device for which the event applies 418 @type name: string 419 @parameter message: clear text 420 @type message: string 421 """ 422 if name in self._devices: 423 if self._devices[name].snmpStatus > 0: 424 self._devices[name].snmpStatus = 0 425 self.sendEvent(self.statusEvent, 426 eventClass=Status_Snmp, 427 component="process", 428 device=name, 429 summary=message, 430 agent='zenprocess', 431 severity=Event.Clear)
432 433
434 - def remote_updateDevice(self, cfg):
435 """ 436 Twisted remote callback, to allow zenhub to remotely update 437 this daemon. 438 439 @parameter cfg: configuration information returned from zenhub 440 @type cfg: object 441 """ 442 self.log.debug("Configuration update from zenhub for %s", cfg.name) 443 self.updateDevices([cfg],[])
444 445
446 - def updateDevices(self, cfgs, fetched):
447 """ 448 Called when the zenhub service getSnmpStatus completes. 449 450 @parameter cfgs: configuration information returned from zenhub 451 @type cfgs: list of objects 452 @parameter fetched: names we want zenhub to return information about 453 @type fetched: list of strings 454 """ 455 received = Set() 456 for cfg in cfgs: 457 received.add(cfg.name) 458 d = self._devices.setdefault(cfg.name, cfg) 459 d.updateConfig(cfg) 460 self.thresholds.updateForDevice(cfg.name, cfg.thresholds) 461 462 for doomed in Set(fetched) - received: 463 if doomed in self._devices: 464 del self._devices[doomed]
465
466 - def start(self, driver):
467 """ 468 Read the basic config needed to do anything, and to reread 469 the configuration information on a periodic basis. 470 """ 471 log.debug("Fetching configuration from zenhub") 472 devices = self._devices.keys() 473 yield self.fetchConfig() 474 self.updateDevices(driver.next(), devices) 475 476 yield self.model().callRemote('getSnmpStatus', self.options.device) 477 self.updateSnmpStatus(driver.next()) 478 479 yield self.model().callRemote('getProcessStatus', self.options.device) 480 self.updateProcessStatus(driver.next()) 481 482 driveLater(self.configCycleInterval * 60, self.start)
483 484
485 - def updateSnmpStatus(self, updates):
486 """ 487 Called when the zenhub service getSnmpStatus completes. 488 489 @parameter updates: List of names and error counts 490 @type updates: list of (string, int) 491 """ 492 for name, count in updates: 493 d = self._devices.get(name) 494 if d: 495 d.snmpStatus = count
496 497
498 - def updateProcessStatus(self, status):
499 """ 500 Called when the zenhub service getProcessStatus completes. 501 502 @parameter status: List of names, component names and error counts 503 @type status: list of (string, string, int) 504 """ 505 down = {} 506 for device, component, count in status: 507 down[ (device, component) ] = count 508 for name, device in self._devices.items(): 509 for p in device.processes.values(): 510 p.status = down.get( (name, p.originalName), 0)
511 512
513 - def oneDevice(self, device):
514 """ 515 Contact one device and return a deferred which gathers data from 516 the device. 517 518 @parameter device: proxy object to the remote computer 519 @type device: Device object 520 @return: job to scan a device 521 @rtype: Twisted deferred object 522 """ 523 def go(driver): 524 """ 525 Generator object to gather information from a device. 526 """ 527 try: 528 device.open() 529 yield self.scanDevice(device) 530 driver.next() 531 532 # Only fetch performance data if status data was found. 533 if device.snmpStatus == 0: 534 yield self.fetchPerf(device) 535 driver.next() 536 else: 537 log.warn("Failed to find performance data for %s", 538 device.name) 539 except: 540 log.debug('Failed to scan device %s' % device.name)
541 542 def close(res): 543 """ 544 Twisted closeBack and errBack function which closes any 545 open connections. 546 """ 547 try: 548 device.close() 549 except: 550 log.debug("Failed to close device %s" % device.name) 551 552 d = drive(go) 553 d.addBoth(close) 554 return d 555 556
557 - def scanDevice(self, device):
558 """ 559 Fetch all the process info for a device using SNMP table gets 560 561 @parameter device: proxy connection object 562 @type device: Device object 563 @return: Twisted deferred 564 @rtype: Twisted deferred 565 """ 566 device.lastScan = time.time() 567 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 568 d = device.getTables(tables) 569 d.addCallback(self.storeProcessNames, device) 570 d.addErrback(self.deviceFailure, device) 571 return d
572 573
574 - def deviceFailure(self, reason, device):
575 """ 576 Twisted errBack to log the exception for a single device. 577 578 @parameter reason: explanation of the failure 579 @type reason: Twisted error instance 580 @parameter device: proxy connection object 581 @type device: Device object 582 """ 583 self.sendEvent(self.statusEvent, 584 eventClass=Status_Snmp, 585 component="process", 586 device=device.name, 587 summary='Unable to read processes on device %s' % device.name, 588 severity=Event.Error) 589 device.snmpStatus += 1 590 if isinstance(reason.value, error.TimeoutError): 591 self.log.debug('Timeout on device %s' % device.name) 592 else: 593 self.logError('Error on device %s' % device.name, reason.value)
594
595 - def mapResultsToDicts(self, results):
596 """ 597 Parse the process tables and reconstruct the list of processes 598 that are on the device. 599 600 @parameter results: results of SNMP table gets ie (OID + pid, value) 601 @type results: dictionary of dictionaries 602 @return: maps relating names and pids to each other 603 @rtype: dictionary, dictionary, dictionary, list of tuples 604 """ 605 def extract(dictionary, oid, value): 606 """ 607 Helper function to extract SNMP table data. 608 """ 609 pid = int(oid.split('.')[-1]) 610 dictionary[pid] = value
611 612 names, paths, args = {}, {}, {} 613 if self.options.showrawtables: 614 log.info("NAMETABLE = %r", results[NAMETABLE]) 615 for row in results[NAMETABLE].items(): 616 extract(names, *row) 617 618 if self.options.showrawtables: 619 log.info("PATHTABLE = %r", results[PATHTABLE]) 620 for row in results[PATHTABLE].items(): 621 extract(paths, *row) 622 623 if self.options.showrawtables: 624 log.info("ARGSTABLE = %r", results[ARGSTABLE]) 625 for row in results[ARGSTABLE].items(): 626 extract(args, *row) 627 628 procs = [] 629 for pid, name in names.items(): 630 path = paths.get(pid, '') 631 if path and path.find('\\') == -1: 632 name = path 633 arg = args.get(pid, '') 634 procs.append( (pid, (name, arg) ) ) 635 636 return names, paths, args, procs 637
638 - def showProcessList(self, device_name, procs):
639 """ 640 Display the processes in a sane manner. 641 642 @parameter device_name: name of the device 643 @type device_name: string 644 @parameter procs: list of (pid, (name, args)) 645 @type procs: list of tuples 646 """ 647 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \ 648 in sorted(procs)] 649 proc_list.append('') 650 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
651
652 - def storeProcessNames(self, results, device):
653 """ 654 Parse the process tables and reconstruct the list of processes 655 that are on the device. 656 657 @parameter results: results of SNMP table gets 658 @type results: dictionary of dictionaries 659 @parameter device: proxy connection object 660 @type device: Device object 661 """ 662 if not results or not results[NAMETABLE]: 663 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name 664 resolution="Verify with snmpwalk -v1 -c community %s %s" % ( 665 device.name, NAMETABLE ) 666 self.sendEvent(self.statusEvent, 667 device=device.name, 668 summary=summary, 669 resolution=resolution, 670 severity=Event.Error) 671 log.info(summary) 672 return 673 if device.snmpStatus > 0: 674 summary = 'Process table up for device %s' % device.name 675 self.clearSnmpError(device.name, summary) 676 677 names, paths, args, procs = self.mapResultsToDicts(results) 678 if self.options.showprocs: 679 self.showProcessList(device.name, procs) 680 681 # look for changes in processes 682 before = Set(device.pids.keys()) 683 after = {} 684 for p in device.processes.values(): 685 for pid, (name, args) in procs: 686 if p.match(name, args): 687 log.debug("Found process %d on %s" % (pid, p.name)) 688 after[pid] = p 689 afterSet = Set(after.keys()) 690 afterByConfig = reverseDict(after) 691 new = afterSet - before 692 dead = before - afterSet 693 694 # report pid restarts 695 restarted = {} 696 for p in dead: 697 config = device.pids[p] 698 config.discardPid(p) 699 if config in afterByConfig: 700 self.restarted += 1 701 if config.restart: 702 restarted[config] = True 703 summary = 'Process restarted: %s' % config.originalName 704 self.sendEvent(self.statusEvent, 705 device=device.name, 706 summary=summary, 707 component=config.originalName, 708 severity=config.severity) 709 log.info(summary) 710 711 # report alive processes 712 for config, pids in afterByConfig.items(): 713 if config in restarted: continue 714 summary = "Process up: %s" % config.originalName 715 self.sendEvent(self.statusEvent, 716 device=device.name, 717 summary=summary, 718 component=config.originalName, 719 severity=Event.Clear) 720 config.status = 0 721 log.debug(summary) 722 723 for p in new: 724 log.debug("Found new %s pid %d on %s" % ( 725 after[p].originalName, p, device.name)) 726 device.pids = after 727 728 # Look for missing processes 729 for config in device.processes.values(): 730 if config not in afterByConfig: 731 self.missing += 1 732 config.status += 1 733 summary = 'Process not running: %s' % config.originalName 734 self.sendEvent(self.statusEvent, 735 device=device.name, 736 summary=summary, 737 component=config.originalName, 738 severity=config.severity) 739 log.warning(summary) 740 741 # Store per-device, per-process statistics 742 pidCounts = dict([(p, 0) for p in device.processes]) 743 for pids, pidConfig in device.pids.items(): 744 pidCounts[pidConfig.name] += 1 745 for name, count in pidCounts.items(): 746 self.save(device.name, name, 'count_count', count, 'GAUGE')
747 748
749 - def periodic(self, unused=None):
750 """ 751 Main loop that drives all other processing. 752 """ 753 reactor.callLater(self.processCycleInterval, self.periodic) 754 755 if self.scanning: 756 running, unstarted, finished = self.scanning.status() 757 runningDevices = [ d.name for d in self.devices().values() \ 758 if d.proxy is not None] 759 760 if runningDevices or unstarted > 0: 761 log.warning("Process scan not finishing: " 762 "%d running, %d waiting, %d finished" % ( 763 running, unstarted, finished)) 764 log.warning("Problem devices: %r", runningDevices) 765 return 766 767 start = time.time() 768 769 def doPeriodic(driver): 770 """ 771 Generator function to create deferred jobs. 772 """ 773 yield self.getDevicePingIssues() 774 self.downDevices = Set([d[0] for d in driver.next()]) 775 776 self.scanning = NJobs(self.parallelJobs, 777 self.oneDevice, 778 self.devices().values()) 779 yield self.scanning.start() 780 driver.next()
781 782 def checkResults(results): 783 """ 784 Process the results from all deferred objects. 785 """ 786 for result in results: 787 if isinstance(result , Exception): 788 log.error("Error scanning device: %s", result) 789 break 790 self.cycleTime = time.time() - start 791 self.heartbeat() 792 793 drive(doPeriodic).addCallback(checkResults) 794 795
796 - def fetchPerf(self, device):
797 """ 798 Get performance data for all the monitored processes on a device 799 800 @parameter device: proxy object to the remote computer 801 @type device: Device object 802 """ 803 oids = [] 804 for pid, pidConf in device.pids.items(): 805 oids.extend([CPU + str(pid), MEM + str(pid)]) 806 if not oids: 807 return defer.succeed(([], device)) 808 809 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run() 810 d.addCallback(self.storePerfStats, device) 811 d.addErrback(self.deviceFailure, device) 812 return d
813 814
815 - def storePerfStats(self, results, device):
816 """ 817 Save the process performance data in RRD files 818 819 @parameter results: results of SNMP table gets 820 @type results: list of (success, result) tuples 821 @parameter device: proxy object to the remote computer 822 @type device: Device object 823 """ 824 for success, result in results: 825 if not success: 826 self.deviceFailure(result, device) 827 return results 828 self.clearSnmpError(device.name, 829 'Process table up for device %s' % device.name) 830 parts = {} 831 for success, values in results: 832 if success: 833 parts.update(values) 834 results = parts 835 byConf = reverseDict(device.pids) 836 for pidConf, pids in byConf.items(): 837 if len(pids) != 1: 838 log.info("There are %d pids by the name %s", 839 len(pids), pidConf.name) 840 pidName = pidConf.name 841 for pid in pids: 842 cpu = results.get(CPU + str(pid), None) 843 mem = results.get(MEM + str(pid), None) 844 pidConf.updateCpu(pid, cpu) 845 pidConf.updateMemory(pid, mem) 846 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(), 847 'DERIVE', min=0) 848 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024, 849 'GAUGE')
850 851
852 - def save(self, deviceName, pidName, statName, value, rrdType, 853 min='U', max='U'):
854 """ 855 Save a value into an RRD file 856 857 @param deviceName: name of the remote device (ie a hostname) 858 @type deviceName: string 859 @param pidName: process id of the monitored process 860 @type pidName: string 861 @param statName: metric name 862 @type statName: string 863 @param value: data to be stored 864 @type value: number 865 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 866 @type rrdType: string 867 @param min: minimum value acceptable for this metric 868 @type min: number 869 @param max: maximum value acceptable for this metric 870 @type max: number 871 """ 872 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 873 try: 874 value = self.rrd.save(path, value, rrdType, min=min, max=max) 875 876 except Exception, ex: 877 summary= "Unable to save data for process-monitor RRD %s" % \ 878 path 879 self.log.critical( summary ) 880 881 message= "Data was value= %s, type=%s, min=%s, max=%s" % \ 882 ( value, rrdType, min, max, ) 883 self.log.critical( message ) 884 self.log.exception( ex ) 885 886 import traceback 887 trace_info= traceback.format_exc() 888 889 evid= self.sendEvent(dict( 890 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'), 891 severity=Critical, 892 device=self.options.monitor, 893 eventClass=Status_Perf, 894 component="RRD", 895 pidName=pidName, 896 statName=statName, 897 path=path, 898 message=message, 899 traceback=trace_info, 900 summary=summary)) 901 902 # Skip thresholds 903 return 904 905 for ev in self.thresholds.check(path, time.time(), value): 906 self.sendThresholdEvent(**ev)
907 908
909 - def heartbeat(self):
910 """ 911 Twisted keep-alive mechanism to ensure that 912 we're still connected to zenhub. 913 """ 914 self.scanning = None 915 devices = self.devices() 916 pids = sum(map(lambda x: len(x.pids), devices.values())) 917 log.info("Pulled process status for %d devices and %d processes", 918 len(devices), pids) 919 SnmpDaemon.heartbeat(self) 920 cycle = self.processCycleInterval 921 self.sendEvents( 922 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) + 923 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) + 924 self.rrdStats.gauge('pids', cycle, pids) + 925 self.rrdStats.gauge('devices', cycle, len(devices)) + 926 self.rrdStats.gauge('missing', cycle, self.missing) + 927 self.rrdStats.gauge('restarted', cycle, self.restarted) + 928 self.rrdStats.gauge('cycleTime', cycle, self.cycleTime) 929 )
930 931
932 - def connected(self):
933 """ 934 Gather our configuration and start collecting status information. 935 Called after connected to the zenhub service. 936 """ 937 drive(self.start).addCallbacks(self.periodic, self.errorStop)
938 939
940 - def buildOptions(self):
941 """ 942 Build a list of command-line options 943 """ 944 SnmpDaemon.buildOptions(self) 945 self.parser.add_option('--showprocs', 946 dest='showprocs', 947 action="store_true", 948 default=False, 949 help="Show the list of processes found." \ 950 "For debugging purposes only.") 951 self.parser.add_option('--showrawtables', 952 dest='showrawtables', 953 action="store_true", 954 default=False, 955 help="Show the raw SNMP processes data returned " \ 956 "from the device. For debugging purposes only.")
957 958 959 if __name__ == '__main__': 960 # Needed for PB communications 961 from Products.ZenRRD.zenprocess import zenprocess 962 z = zenprocess() 963 z.run() 964