Package Products :: Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenprocess

  1  #! /usr/bin/env python 
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2007, 2009, 2011, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  __doc__ = """zenprocess 
 13   
 14  Gets SNMP process data from a device's HOST-RESOURCES-MIB 
 15  and store process performance in RRD files. 
 16  """ 
 17   
 18  import logging 
 19  import sys 
 20  import re 
 21  from md5 import md5 
 22  from pprint import pformat 
 23  import os.path 
 24   
 25  import Globals 
 26   
 27  import zope.component 
 28  import zope.interface 
 29   
 30  from Products.ZenCollector.daemon import CollectorDaemon 
 31  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 32      IScheduledTask, IEventService, IDataService, IConfigurationListener 
 33  from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\ 
 34      TaskStates 
 35  from Products.ZenEvents import Event 
 36  from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess,\ 
 37      Status_Perf 
 38  from Products.ZenUtils.observable import ObservableMixin 
 39  from Products.ZenUtils.Utils import prepId as globalPrepId 
 40   
 41  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 42  # connection. To do so, we need to import the class that will be used by the 
 43  # configuration service to send the data over, i.e. DeviceProxy. 
 44  from Products.ZenUtils.Utils import unused 
 45  from Products.ZenCollector.services.config import DeviceProxy 
 46   
 47  unused(DeviceProxy) 
 48  from Products.ZenHub.services.ProcessConfig import ProcessProxy 
 49   
 50  unused(ProcessProxy) 
 51  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 52   
 53  unused(SnmpConnInfo) 
 54   
 55  from twisted.internet import defer, error 
 56  from pynetsnmp.twistedsnmp import Snmpv3Error 
 57   
 58  log = logging.getLogger("zen.zenprocess") 
 59   
 60  # HOST-RESOURCES-MIB OIDs used 
 61  HOSTROOT = '.1.3.6.1.2.1.25' 
 62  RUNROOT = HOSTROOT + '.4' 
 63  NAMETABLE = RUNROOT + '.2.1.2' 
 64  PATHTABLE = RUNROOT + '.2.1.4' 
 65  ARGSTABLE = RUNROOT + '.2.1.5' 
 66  PERFROOT = HOSTROOT + '.5' 
 67  CPU = PERFROOT + '.1.1.1.'        # note trailing dot 
 68  MEM = PERFROOT + '.1.1.2.'        # note trailing dot 
 69   
 70  # Max size for CPU numbers 
 71  WRAP = 0xffffffffL 
 72   
 73  # Regex to see if a string is all hex digits 
 74  IS_MD5 = re.compile('^[a-f0-9]{32}$') 
 75   
 76  PROC_SCAN_ERROR = 'Unable to read processes on device %s' 
77 78 -class HostResourceMIBExecption(Exception):
79 pass
80
81 # Create an implementation of the ICollectorPreferences interface so that the 82 # ZenCollector framework can configure itself from our preferences. 83 -class ZenProcessPreferences(object):
84 zope.interface.implements(ICollectorPreferences) 85
86 - def __init__(self):
87 """ 88 Constructs a new ZenProcessPreferences instance and provide default 89 values for needed attributes. 90 """ 91 self.collectorName = "zenprocess" 92 self.defaultRRDCreateCommand = None 93 self.configCycleInterval = 20 # minutes 94 95 #will be updated based on Performance Config property of same name 96 self.processCycleInterval = 3 * 60 97 98 #will be filled in based on buildOptions 99 self.options = None 100 101 # the configurationService attribute is the fully qualified class-name 102 # of our configuration service that runs within ZenHub 103 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
104 105 @property
106 - def cycleInterval(self):
107 """ 108 defined as a property since it is needed by the interface 109 """ 110 return self.processCycleInterval
111
112 - def buildOptions(self, parser):
113 """ 114 Build a list of command-line options 115 """ 116 parser.add_option('--showprocs', 117 dest='showprocs', 118 action="store_true", 119 default=False, 120 help="Show the list of processes found."\ 121 "For debugging purposes only.") 122 parser.add_option('--showrawtables', 123 dest='showrawtables', 124 action="store_true", 125 default=False, 126 help="Show the raw SNMP processes data returned "\ 127 "from the device. For debugging purposes only.") 128 parser.add_option('--captureFilePrefix', dest='captureFilePrefix', 129 default='', 130 help="Directory and filename to use as a template" 131 " to store SNMP results from device.")
132
133 - def postStartup(self):
134 pass
135
136 137 -class DeviceStats:
138 - def __init__(self, deviceProxy):
139 self.config = deviceProxy 140 # map pid number to ProcessStats object 141 self._pidToProcess = {} 142 # map ProcessProxy id to ProcessStats object 143 self._processes = {} 144 for id, process in deviceProxy.processes.items(): 145 self._processes[id] = ProcessStats(process)
146
147 - def update(self, deviceProxy):
148 unused = set(self._processes.keys()) 149 for id, process in deviceProxy.processes.items(): 150 unused.discard(id) 151 if self._processes.get(id): 152 self._processes[id].update(process) 153 else: 154 self._processes[id] = ProcessStats(process) 155 156 #delete the left overs 157 for id in unused: 158 del self._processes[id] 159 for key, value in self._pidToProcess.items(): 160 if value._config.name == id: 161 del self._pidToProcess[key]
162 163 @property
164 - def processStats(self):
165 """ 166 The ProcessStats: processes configured to be monitored 167 """ 168 return self._processes.values()
169 170 @property
171 - def pids(self):
172 """ 173 returns the pids from being tracked 174 """ 175 return self._pidToProcess.keys()
176 177 @property
178 - def monitoredProcs(self):
179 """ 180 returns ProcessStats for which we have a pid 181 """ 182 return self._pidToProcess.values()
183
184 185 -class ProcessStats:
186 - def __init__(self, processProxy):
187 self._pids = {} 188 self._config = processProxy 189 self.cpu = 0 190 self.digest = md5('').hexdigest() 191 if not self._config.ignoreParameters: 192 # The modeler plugin computes the MD5 hash of the args, 193 # and then tosses that into the name of the process 194 result = self._config.name.rsplit(' ', 1) 195 if len(result) == 2 and result[1] != '': 196 self.digest = result[1]
197
198 - def update(self, processProxy):
199 self._config = processProxy
200
201 - def __str__(self):
202 """ 203 Override the Python default to represent ourselves as a string 204 """ 205 return str(self._config.name)
206 207 __repr__ = __str__ 208
209 - def match(self, name, args, useName=True, useMd5Digest=True):
210 """ 211 Perform exact comparisons on the process names. 212 213 @parameter name: name of a process to compare 214 @type name: string 215 @parameter args: argument list of the process 216 @type args: string 217 @parameter useMd5Digest: ignore true result if MD5 doesn't match the process name? 218 @type useMd5Digest: boolean 219 @return: does the name match this process's info? 220 @rtype: Boolean 221 """ 222 if self._config.name is None: 223 return False 224 225 # SNMP agents return a 'flexible' number of characters, 226 # so exact matching isn't always reliable. 227 processName = ('%s %s' % (name, args or '')).strip() 228 229 # Make the comparison 230 result = re.search(self._config.regex, processName) is not None 231 232 # We can a match, but it might not be for us 233 if result and useMd5Digest: 234 # Compare this arg list against the digest of this proc 235 digest = md5(args).hexdigest() 236 if self.digest and digest != self.digest: 237 result = False 238 239 if result and useName: 240 nameOnly = self._config.name.rsplit(' ', 1)[0] 241 cleanNameOnly = globalPrepId(name) 242 nameRe = '(.?)' + re.escape(nameOnly) + '$' 243 nameMatch = re.search(nameRe, cleanNameOnly) 244 if not nameMatch or nameMatch.group(1) not in ('', '_'): 245 log.debug("Discarding match based on name mismatch: %s %s" % (cleanNameOnly, nameOnly)) 246 result = False 247 248 return result
249
250 - def updateCpu(self, pid, value):
251 """ 252 """ 253 pid = self._pids.setdefault(pid, Pid()) 254 cpu = pid.updateCpu(value) 255 if cpu is not None: 256 self.cpu += cpu 257 self.cpu %= WRAP
258
259 - def getCpu(self):
260 """ 261 """ 262 return self.cpu
263
264 - def updateMemory(self, pid, value):
265 """ 266 """ 267 self._pids.setdefault(pid, Pid()).memory = value
268
269 - def getMemory(self):
270 """ 271 """ 272 return sum(x.memory for x in self._pids.values() if x.memory is not None)
273
274 - def discardPid(self, pid):
275 """ 276 """ 277 if pid in self._pids: 278 del self._pids[pid]
279
280 281 -class Pid:
282 """ 283 Helper class to track process id information 284 """ 285 cpu = None 286 memory = None 287
288 - def updateCpu(self, n):
289 """ 290 """ 291 if n is not None: 292 try: 293 n = int(n) 294 except ValueError: 295 log.warning("Bad value for CPU: '%s'", n) 296 297 if self.cpu is None or n is None: 298 self.cpu = n 299 return None 300 diff = n - self.cpu 301 if diff < 0: 302 # don't provide a value when the counter falls backwards 303 n = None 304 diff = None 305 self.cpu = n 306 return diff
307
308 - def updateMemory(self, n):
309 """ 310 """ 311 self.memory = n
312
313 - def __str__(self):
314 """ 315 Override the Python default to represent ourselves as a string 316 """ 317 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
318 319 __repr__ = __str__
320
321 322 -class ConfigListener(object):
323 zope.interface.implements(IConfigurationListener) 324
325 - def deleted(self, configurationId):
326 """ 327 Called when a configuration is deleted from the collector 328 """ 329 log.debug('ConfigListener: configuration %s deleted' % configurationId) 330 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
331
332 - def added(self, configuration):
333 """ 334 Called when a configuration is added to the collector 335 """ 336 log.debug('ConfigListener: configuration %s added' % configuration)
337 338
339 - def updated(self, newConfiguration):
340 """ 341 Called when a configuration is updated in collector 342 """ 343 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
344
345 # Create an implementation of the IScheduledTask interface that will perform 346 # the actual collection work needed by this collector. 347 -class ZenProcessTask(ObservableMixin):
348 """ 349 A scheduled task that finds instances of configure processes and collects 350 metrics on the processes 351 """ 352 zope.interface.implements(IScheduledTask) 353 354 #Keep state about process stats across task updates 355 DEVICE_STATS = {} 356 357 #counter to keep track of total restarted and missing processes 358 RESTARTED = 0 359 MISSING = 0 360 361 STATE_CONNECTING = 'CONNECTING' 362 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES' 363 STATE_FETCH_PERF = 'FETCH_PERF_DATA' 364 STATE_STORE_PERF = 'STORE_PERF_DATA' 365 STATE_PARSING_TABLE_DATA = 'PARSING_TABLE_DATA' 366 367 statusEvent = {'eventClass': Status_OSProcess, 368 'eventGroup': 'Process'} 369
370 - def __init__(self, 371 deviceId, 372 taskName, 373 scheduleIntervalSeconds, 374 taskConfig):
375 super(ZenProcessTask, self).__init__() 376 377 #needed for interface 378 self.name = taskName 379 self.configId = deviceId 380 self.interval = scheduleIntervalSeconds 381 self.state = TaskStates.STATE_IDLE 382 383 #the task config corresponds to a DeviceProxy 384 self._device = taskConfig 385 self._devId = self._device.name 386 self._manageIp = self._device.manageIp 387 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest 388 389 self._dataService = zope.component.queryUtility(IDataService) 390 self._eventService = zope.component.queryUtility(IEventService) 391 self._preferences = zope.component.queryUtility(ICollectorPreferences, 392 "zenprocess") 393 self.snmpProxy = None 394 self.snmpConnInfo = self._device.snmpConnInfo 395 396 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId) 397 if self._deviceStats: 398 self._deviceStats.update(self._device) 399 else: 400 self._deviceStats = DeviceStats(self._device) 401 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
402 403 @defer.inlineCallbacks
404 - def doTask(self):
405 """ 406 Contact to one device and return a deferred which gathers data from 407 the device. 408 409 @return: job to scan a device 410 @rtype: Twisted deferred object 411 """ 412 try: 413 # see if we need to connect first before doing any collection 414 self.openProxy() 415 log.debug("Opened proxy to %s [%s]", self._devId, self._manageIp) 416 yield self._collectCallback() 417 finally: 418 self._finished()
419 420 421 @defer.inlineCallbacks
422 - def _collectCallback(self):
423 """ 424 Callback called after a connect or previous collection so that another 425 collection can take place. 426 """ 427 log.debug("Scanning for processes from %s [%s]", 428 self._devId, self._manageIp) 429 430 self.state = ZenProcessTask.STATE_SCANNING_PROCS 431 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 432 try: 433 tableResult = yield self._getTables(tables) 434 summary = 'Process table up for device %s' % self._devId 435 self._clearSnmpError("%s - timeout cleared" % summary, 'table_scan_timeout') 436 if self.snmpConnInfo.zSnmpVer == 'v3': 437 self._clearSnmpError("%s - v3 error cleared" % summary, 'table_scan_v3_error') 438 processes = self._parseProcessNames(tableResult) 439 self._clearSnmpError(summary, 'resource_mib') 440 self._deviceStats.update(self._device) 441 processStatuses = self._determineProcessStatus(processes) 442 self._sendProcessEvents(processStatuses) 443 self._clearSnmpError(summary) 444 yield self._fetchPerf() 445 log.debug("Device %s [%s] scanned successfully", 446 self._devId, self._manageIp) 447 except HostResourceMIBExecption as e: 448 summary = 'Device %s does not publish HOST-RESOURCES-MIB' %\ 449 self._devId 450 resolution = "Verify with snmpwalk %s %s" %\ 451 (self._devId, NAMETABLE ) 452 log.warn(summary) 453 self._sendSnmpError(summary, "resource_mib", resolution=resolution) 454 455 except error.TimeoutError as e: 456 log.debug('Timeout fetching tables on device %s' % self._devId) 457 self._sendSnmpError('%s; Timeout on device' % PROC_SCAN_ERROR % self._devId, 'table_scan_timeout') 458 except Snmpv3Error as e: 459 msg = "Cannot connect to SNMP agent on {0._devId}: {1.value}".format(self, str(e)) 460 log.debug(msg) 461 self._sendSnmpError('%s; %s' % (PROC_SCAN_ERROR % self._devId, msg), 'table_scan_v3_error') 462 except Exception as e: 463 log.exception('Unexpected Error on device %s' % self._devId) 464 msg = '%s; error: %s' % (PROC_SCAN_ERROR % self._devId, e) 465 self._sendSnmpError(msg)
466
467 - def _finished(self):
468 """ 469 Callback activated when the task is complete 470 """ 471 try: 472 self._close() 473 except Exception, e: 474 log.warn("Failed to close device %s: error %s" % 475 (self._devId, str(e)))
476 477
478 - def cleanup(self):
479 return self._close()
480
481 - def capturePacket(self, hostname, data):
482 """ 483 Store SNMP results into files for unit-testing. 484 """ 485 # Prep for using capture replay module later 486 if not hasattr(self, 'captureSerialNum'): 487 self.captureSerialNum = 0 488 489 log.debug("Capturing packet from %s", hostname) 490 name = "%s-%s-%d" % (self._preferences.options.captureFilePrefix, 491 hostname, self.captureSerialNum) 492 493 # Don't overwrite previous captures, which will happen if we remodel 494 # and the serial number gets reset to zero 495 while os.path.exists(name): 496 self.captureSerialNum += 1 497 name = "%s-%s-%d" % (self._preferences.options.captureFilePrefix, 498 hostname, self.captureSerialNum) 499 500 try: 501 capFile = open(name, "w") 502 capFile.write(pformat(data)) 503 capFile.close() 504 self.captureSerialNum += 1 505 except Exception, ex: 506 log.warn("Couldn't write capture data to '%s' because %s", 507 name, str(ex))
508
509 - def sendRestartEvents(self, afterByConfig, beforeByConfig, restarted):
510 for procStats, pConfig in restarted.iteritems(): 511 droppedPids = [] 512 for pid in beforeByConfig[procStats]: 513 if pid not in afterByConfig[procStats]: 514 droppedPids.append(pid) 515 summary = 'Process restarted: %s' % pConfig.originalName 516 message = '%s\n Using regex \'%s\' Discarded dead pid(s) %s Using new pid(s) %s'\ 517 % (summary, pConfig.regex, droppedPids, afterByConfig[procStats]) 518 self._eventService.sendEvent(self.statusEvent, 519 device=self._devId, 520 summary=summary, 521 message=message, 522 component=pConfig.originalName, 523 eventKey=pConfig.processClass, 524 severity=pConfig.severity) 525 log.info("(%s) %s" % (self._devId, message))
526
527 - def sendFoundProcsEvents(self, afterByConfig, restarted):
528 # report alive processes 529 for processStat in afterByConfig.keys(): 530 if processStat in restarted: continue 531 summary = "Process up: %s" % processStat._config.originalName 532 message = '%s\n Using regex \'%s\' with pid\'s %s '\ 533 % (summary, processStat._config.regex, afterByConfig[processStat]) 534 self._eventService.sendEvent(self.statusEvent, 535 device=self._devId, 536 summary=summary, 537 message=message, 538 component=processStat._config.originalName, 539 eventKey=processStat._config.processClass, 540 severity=Event.Clear) 541 log.debug("(%s) %s" % (self._devId, message))
542
543 - def _parseProcessNames(self, results):
544 """ 545 Parse the process tables and reconstruct the list of processes 546 that are on the device. 547 548 @parameter results: results of SNMP table gets 549 @type results: dictionary of dictionaries 550 """ 551 self.state = ZenProcessTask.STATE_PARSING_TABLE_DATA 552 if not results or not results[NAMETABLE]: 553 raise HostResourceMIBExecption() 554 555 if self._preferences.options.captureFilePrefix: 556 self.capturePacket(self._devId, results) 557 558 showrawtables = self._preferences.options.showrawtables 559 args, procs = mapResultsToDicts(showrawtables, results) 560 if self._preferences.options.showprocs: 561 self._showProcessList(procs) 562 return procs
563
564 - def sendMissingProcsEvents(self, missing):
565 # Look for missing processes 566 for procConfig in missing: 567 ZenProcessTask.MISSING += 1 568 summary = 'Process not running: %s' % procConfig.originalName 569 message = "%s\n Using regex \'%s\' \n All Processes have stopped since the last model occurred. Last Modification time (%s)" \ 570 % (summary,procConfig.regex,self._device.lastmodeltime) 571 self._eventService.sendEvent(self.statusEvent, 572 device=self._devId, 573 summary=summary, 574 message=message, 575 component=procConfig.originalName, 576 eventKey=procConfig.processClass, 577 severity=procConfig.severity) 578 log.warning("(%s) %s" % (self._devId, message))
579
580 - def _sendProcessEvents(self, results):
581 (afterByConfig, afterPidToProcessStats, 582 beforeByConfig, newPids, restarted, deadPids, missing) = results 583 584 self.sendRestartEvents(afterByConfig, beforeByConfig, restarted) 585 self.sendFoundProcsEvents(afterByConfig, restarted) 586 587 for pid in newPids: 588 log.debug("Found new %s %s pid %d on %s " % ( 589 afterPidToProcessStats[pid]._config.originalName, afterPidToProcessStats[pid]._config.name, pid, 590 self._devId)) 591 self._deviceStats._pidToProcess = afterPidToProcessStats 592 self.sendMissingProcsEvents(missing) 593 594 # Store the total number of each process into an RRD 595 pidCounts = dict((p, 0) for p in self._deviceStats.processStats) 596 597 for procStat in self._deviceStats.monitoredProcs: 598 # monitoredProcs is determined from the current pids in _pidToProcess 599 # pidCounts is from _deviceStats.processStats which is modeled data 600 if procStat in pidCounts: 601 pidCounts[procStat] += 1 602 else: 603 log.warn('%s monitored proc %s %s not in process stats', self._devId, procStat._config.name, 604 procStat._config.originalName) 605 log.debug("%s pidcounts is %s", self._devId, pidCounts) 606 for procName, count in pidCounts.items(): 607 self._save(procName, 'count_count', count, 'GAUGE') 608 return "Sent events"
609
610 - def _determineProcessStatus(self, procs):
611 """ 612 Determine the up/down/restarted status of processes. 613 614 @parameter procs: array of pid, (name, args) info 615 @type procs: list 616 """ 617 beforePids = set(self._deviceStats.pids) 618 afterPidToProcessStats = {} 619 pStatsWArgsAndSums, pStatsWoArgs = self._splitPStatMatchers() 620 for pid, (name, psargs) in procs: 621 pStats = self._deviceStats._pidToProcess.get(pid) 622 if pStats: 623 # We saw the process before, so there's a good 624 # chance that it's the same. 625 if pStats.match(name, psargs): 626 # Yep, it's the same process 627 log.debug("Found process %d on %s, matching %s %s with MD5", 628 pid, pStats._config.name, name, psargs) 629 log.debug("%s found existing stat %s %s for pid %s - using MD5", self._devId, pStats._config.name, 630 pStats._config.originalName, pid) 631 afterPidToProcessStats[pid] = pStats 632 continue 633 634 elif pStats.match(name, psargs, useMd5Digest=False): 635 # In this case, our raw SNMP data from the 636 # remote agent got futzed 637 # It's the same process. Yay! 638 log.debug("%s - Found process %d on %s, matching %s %s without MD5", 639 self._devId, pid, pStats._config.name, name, psargs) 640 afterPidToProcessStats[pid] = pStats 641 continue 642 643 # Search for the first match in our list of regexes 644 # that have arguments AND an MD5-sum argument matching. 645 # Explicitly *IGNORE* any matchers not modeled by zenmodeler 646 for pStats in pStatsWArgsAndSums: 647 if pStats.match(name, psargs): 648 log.debug("%s Found process %d on %s %s", 649 self._devId, pid, pStats._config.originalName, pStats._config.name) 650 afterPidToProcessStats[pid] = pStats 651 break 652 else: 653 # Now look for the first match in our list of regexes 654 # that don't have arguments. 655 for pStats in pStatsWoArgs: 656 if pStats.match(name, psargs, useMd5Digest=False): 657 log.debug("Found process %d on %s", 658 pid, pStats._config.name) 659 afterPidToProcessStats[pid] = pStats 660 break 661 662 afterPids = set(afterPidToProcessStats.keys()) 663 afterByConfig = reverseDict(afterPidToProcessStats) 664 newPids = afterPids - beforePids 665 deadPids = beforePids - afterPids 666 667 restarted = {} 668 for pid in deadPids: 669 procStats = self._deviceStats._pidToProcess[pid] 670 procStats.discardPid(pid) 671 if procStats in afterByConfig: 672 ZenProcessTask.RESTARTED += 1 673 pConfig = procStats._config 674 if pConfig.restart: 675 restarted[procStats] = pConfig 676 677 # Now that we've found all of the stragglers, check to see 678 # what really is missing or not. 679 missing = [] 680 for procStat in self._deviceStats.processStats: 681 if procStat not in afterByConfig: 682 missing.append(procStat._config) 683 684 # For historical reasons, return the beforeByConfig 685 beforeByConfig = reverseDict(self._deviceStats._pidToProcess) 686 687 return (afterByConfig, afterPidToProcessStats, 688 beforeByConfig, newPids, restarted, deadPids, 689 missing)
690
691 - def _splitPStatMatchers(self):
692 pStatsWArgsAndSums = []; pStatsWoArgs = [] 693 for pStat in self._deviceStats.processStats: 694 if pStat._config.ignoreParameters: 695 pStatsWoArgs.append(pStat) 696 else: 697 nameList = pStat._config.name.rsplit(' ', 1) 698 if len(nameList) < 2: # (name, md5sum) 699 nameList = (nameList[0], md5('').hexdigest()) 700 701 possibleMd5Sum = nameList[-1].lower() 702 if IS_MD5.match(possibleMd5Sum): 703 pStatsWArgsAndSums.append(pStat) 704 return pStatsWArgsAndSums, pStatsWoArgs
705 706 @defer.inlineCallbacks
707 - def _fetchPerf(self):
708 """ 709 Get performance data for all the monitored processes on a device 710 711 @parameter results: results of SNMP table gets 712 @type results: list of (success, result) tuples 713 """ 714 self.state = ZenProcessTask.STATE_FETCH_PERF 715 716 oids = [] 717 for pid in self._deviceStats.pids: 718 oids.extend([CPU + str(pid), MEM + str(pid)]) 719 if oids: 720 singleOids = set() 721 results = {} 722 oidsToTest = oids 723 chunkSize = self._maxOidsPerRequest 724 while oidsToTest: 725 for oidChunk in chunk(oids, chunkSize): 726 try: 727 log.debug("%s fetching oid(s) %s" % (self._devId, oidChunk)) 728 result = yield self._get(oidChunk) 729 results.update(result) 730 except (error.TimeoutError, Snmpv3Error) as e: 731 log.debug("error reading oid(s) %s - %s", oidChunk, e) 732 singleOids.update(oidChunk) 733 oidsToTest = [] 734 if singleOids and chunkSize > 1: 735 chunkSize = 1 736 log.debug("running oids for %s in single mode %s" % (self._devId, singleOids)) 737 oidsToTest = list(singleOids) 738 self._storePerfStats(results)
739
740 - def _storePerfStats(self, results):
741 """ 742 Save the process performance data in RRD files 743 744 @parameter results: results of SNMP table gets 745 @type results: list of {oid:value} dictionaries 746 @parameter device: proxy object to the remote computer 747 @type device: Device object 748 """ 749 self.state = ZenProcessTask.STATE_STORE_PERF 750 byConf = reverseDict(self._deviceStats._pidToProcess) 751 for procStat, pids in byConf.items(): 752 if len(pids) != 1: 753 log.debug("There are %d pids by the name %s - %s", 754 len(pids), procStat._config.name, procStat._config.originalName) 755 procName = procStat._config.name 756 for pid in pids: 757 cpu = results.get(CPU + str(pid), None) 758 mem = results.get(MEM + str(pid), None) 759 procStat.updateCpu(pid, cpu) 760 procStat.updateMemory(pid, mem) 761 self._save(procName, 'cpu_cpu', procStat.getCpu(), 762 'DERIVE', min=0) 763 self._save(procName, 'mem_mem', 764 procStat.getMemory() * 1024, 'GAUGE') 765 return results
766
767 - def _getTables(self, oids):
768 """ 769 Perform SNMP getTable for specified OIDs 770 771 @parameter oids: OIDs to gather 772 @type oids: list of strings 773 @return: Twisted deferred 774 @rtype: Twisted deferred 775 """ 776 repetitions = self._maxOidsPerRequest / len(oids) 777 t = self.snmpProxy.getTable(oids, 778 timeout=self.snmpConnInfo.zSnmpTimeout, 779 retryCount=self.snmpConnInfo.zSnmpTries, 780 maxRepetitions=repetitions, 781 limit=sys.maxint) 782 return t
783
784 - def _get(self, oids):
785 """ 786 Perform SNMP get for specified OIDs 787 788 @parameter oids: OIDs to gather 789 @type oids: list of strings 790 @return: Twisted deferred 791 @rtype: Twisted deferred 792 """ 793 return self.snmpProxy.get(oids, 794 self.snmpConnInfo.zSnmpTimeout, 795 self.snmpConnInfo.zSnmpTries)
796
797 - def openProxy(self):
798 """ 799 Create a connection to the remote device 800 """ 801 self.state = ZenProcessTask.STATE_CONNECTING 802 if (self.snmpProxy is None or 803 self.snmpProxy.snmpConnInfo != self.snmpConnInfo): 804 self.snmpProxy = self.snmpConnInfo.createSession() 805 self.snmpProxy.open()
806
807 - def _close(self):
808 """ 809 Close down the connection to the remote device 810 """ 811 if self.snmpProxy: 812 self.snmpProxy.close() 813 self.snmpProxy = None
814 815
816 - def _showProcessList(self, procs):
817 """ 818 Display the processes in a sane manner. 819 820 @parameter procs: list of (pid, (name, args)) 821 @type procs: list of tuples 822 """ 823 device_name = self._devId 824 proc_list = ['%s %s %s' % (pid, name, args) for pid, (name, args) in sorted(procs)] 825 proc_list.append('') 826 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
827 828
829 - def _sendSnmpError(self, message, eventKey=None, **kwargs):
830 event = self.statusEvent.copy() 831 event.update(kwargs) 832 self._eventService.sendEvent(event, 833 eventClass=Status_Snmp, 834 device=self._devId, 835 severity=Event.Error, 836 eventKey=eventKey, 837 summary=message)
838
839 - def _clearSnmpError(self, message, eventKey=None):
840 """ 841 Send an event to clear other events. 842 843 @parameter message: clear text 844 @type message: string 845 """ 846 self._eventService.sendEvent(self.statusEvent, 847 eventClass=Status_Snmp, 848 device=self._devId, 849 summary=message, 850 agent='zenprocess', 851 eventKey=eventKey, 852 severity=Event.Clear)
853
854 - def _save(self, pidName, statName, value, rrdType, min='U'):
855 """ 856 Save a value into an RRD file 857 858 @param pidName: process id of the monitored process 859 @type pidName: string 860 @param statName: metric name 861 @type statName: string 862 @param value: data to be stored 863 @type value: number 864 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 865 @type rrdType: string 866 """ 867 deviceName = self._devId 868 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 869 try: 870 self._dataService.writeRRD(path, value, rrdType, min=min) 871 except Exception, ex: 872 summary = "Unable to save data for process-monitor RRD %s" %\ 873 path 874 log.critical(summary) 875 876 message = "Data was value= %s, type=%s" %\ 877 ( value, rrdType ) 878 log.critical(message) 879 log.exception(ex) 880 881 import traceback 882 883 trace_info = traceback.format_exc() 884 885 self._eventService.sendEvent(dict( 886 dedupid="%s|%s" % (self._preferences.options.monitor, 887 'RRD write failure'), 888 severity=Event.Critical, 889 device=self._preferences.options.monitor, 890 eventClass=Status_Perf, 891 component="RRD", 892 pidName=pidName, 893 statName=statName, 894 path=path, 895 message=message, 896 traceback=trace_info, 897 summary=summary))
898
899 900 -def mapResultsToDicts(showrawtables, results):
901 """ 902 Parse the process tables and reconstruct the list of processes 903 that are on the device. 904 905 @parameter showrawtables: log the raw table info? 906 @type showrawtables: boolean 907 @parameter results: results of SNMP table gets ie (OID + pid, value) 908 @type results: dictionary of dictionaries 909 @return: maps relating names and pids to each other 910 @rtype: dictionary, list of tuples 911 """ 912 913 def extract(dictionary, oid, value): 914 """ 915 Helper function to extract SNMP table data. 916 """ 917 pid = int(oid.split('.')[-1]) 918 dictionary[pid] = value.strip()
919 920 names, paths, args = {}, {}, {} 921 if showrawtables: 922 log.info("NAMETABLE = %r", results[NAMETABLE]) 923 for row in results[NAMETABLE].items(): 924 extract(names, *row) 925 926 if showrawtables: 927 log.info("PATHTABLE = %r", results[PATHTABLE]) 928 for row in results[PATHTABLE].items(): 929 extract(paths, *row) 930 931 if showrawtables: 932 log.info("ARGSTABLE = %r", results[ARGSTABLE]) 933 for row in results[ARGSTABLE].items(): 934 extract(args, *row) 935 936 procs = [] 937 for pid, name in names.items(): 938 path = paths.get(pid, '') 939 if path and path.find('\\') == -1: 940 name = path 941 arg = args.get(pid, '') 942 procs.append((pid, (name, arg) )) 943 944 return args, procs 945
946 947 -def reverseDict(d):
948 """ 949 Return a dictionary with keys and values swapped: 950 all values are lists to handle the different keys mapping to the same value 951 """ 952 result = {} 953 for a, v in d.items(): 954 result.setdefault(v, []).append(a) 955 return result
956
957 958 -def chunk(lst, n):
959 """ 960 Break lst into n-sized chunks 961 """ 962 return [lst[i:i + n] for i in range(0, len(lst), n)]
963 964 # Collector Daemon Main entry point 965 # 966 if __name__ == '__main__': 967 myPreferences = ZenProcessPreferences() 968 969 myTaskFactory = SimpleTaskFactory(ZenProcessTask) 970 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 971 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener()) 972 daemon.run() 973