Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zenprocess

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__='''zenprocess 
 16   
 17  Gets snmp process performance data and stores it in RRD files. 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24  import logging 
 25  import time 
 26  from sets import Set 
 27   
 28  log = logging.getLogger("zen.zenprocess") 
 29   
 30  from twisted.internet import reactor, defer 
 31  from twisted.python import failure 
 32   
 33  try: 
 34      from pynetsnmp.twistedsnmp import AgentProxy 
 35      from pynetsnmp.tableretriever import TableRetriever 
 36  except ImportError: 
 37      import warnings 
 38      warnings.warn("Using python-based snmp enging") 
 39      from twistedsnmp.agentproxy import AgentProxy 
 40      from twistedsnmp.tableretriever import TableRetriever 
 41   
 42  import Globals 
 43  from Products.ZenUtils.Driver import drive, driveLater 
 44  from Products.ZenUtils.NJobs import NJobs 
 45  from Products.ZenUtils.Chain import Chain 
 46  from Products.ZenModel.PerformanceConf import performancePath 
 47  from Products.ZenEvents import Event 
 48  from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess 
 49   
 50  from Products.ZenRRD.RRDUtil import RRDUtil 
 51  from SnmpDaemon import SnmpDaemon 
 52   
 53  HOSTROOT  ='.1.3.6.1.2.1.25' 
 54  RUNROOT   = HOSTROOT + '.4' 
 55  NAMETABLE = RUNROOT + '.2.1.2' 
 56  PATHTABLE = RUNROOT + '.2.1.4' 
 57  ARGSTABLE = RUNROOT + '.2.1.5' 
 58  PERFROOT  = HOSTROOT + '.5' 
 59  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 60  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 61   
 62  PARALLEL_JOBS = 10 
 63   
 64  WRAP=0xffffffffL 
 65   
 66  try: 
 67      sorted = sorted                     # added in python 2.4 
 68  except NameError: 
69 - def sorted(x, *args, **kw):
70 x.sort(*args, **kw) 71 return x
72
73 -def reverseDict(d):
74 """return a dictionary with keys and values swapped: 75 all values are lists to handle the different keys mapping to the same value 76 """ 77 result = {} 78 for a, v in d.items(): 79 result.setdefault(v, []).append(a) 80 return result
81
82 -def chunk(lst, n):
83 'break lst into n-sized chunks' 84 return [lst[i:i+n] for i in range(0, len(lst), n)]
85
86 -def closer(value, device):
87 device.close() 88 return value
89
90 -class ScanFailure(Exception): pass
91
92 -class Pid:
93 cpu = None 94 memory = None 95
96 - def updateCpu(self, n):
97 if n is not None: 98 try: 99 n = int(n) 100 except ValueError, er: 101 log.warning("Bad value for CPU: '%s'", n) 102 103 if self.cpu is None or n is None: 104 self.cpu = n 105 return None 106 diff = n - self.cpu 107 if diff < 0: 108 # don't provide a value when the counter falls backwards 109 n = None 110 diff = None 111 self.cpu = n 112 return diff
113
114 - def updateMemory(self, n):
115 self.memory = n
116
117 - def __str__(self):
118 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
119 __repr__ = __str__
120 121
122 -class Process:
123 'track process-specific configuration data' 124 name = None 125 originalName = None 126 ignoreParameters = False 127 restart = None 128 severity = Event.Warning 129 status = 0 130 cpu = 0 131
132 - def __init__(self):
133 self.pids = {}
134
135 - def match(self, name, args):
136 if self.name is None: 137 return False 138 if self.ignoreParameters or not args: 139 return self.originalName == name 140 return self.originalName == '%s %s' % (name, args)
141
142 - def __str__(self):
143 return str(self.name)
144 __repr__ = __str__ 145
146 - def updateCpu(self, pid, value):
147 p = self.pids.setdefault(pid, Pid()) 148 cpu = p.updateCpu(value) 149 if cpu is not None: 150 self.cpu += cpu 151 self.cpu %= WRAP
152
153 - def getCpu(self):
154 return self.cpu
155
156 - def updateMemory(self, pid, value):
157 self.pids.setdefault(pid, Pid()).memory = value
158
159 - def getMemory(self):
160 return sum([x.memory for x in self.pids.values() 161 if x.memory is not None])
162
163 - def discardPid(self, pid):
164 if self.pids.has_key(pid): 165 del self.pids[pid]
166
167 -class Device:
168 'track device data' 169 name = '' 170 address = ('', 0) 171 community = 'public' 172 version = '1' 173 port = 161 174 proxy = None 175 timeout = 2.5 176 tries = 2 177 protocol = None 178 lastScan = 0. 179 snmpStatus = 0 180 lastChange = 0 181 maxOidsPerRequest = 40 182
183 - def __init__(self):
184 # map process name to Process object above 185 self.processes = {} 186 # map pid number to Process object 187 self.pids = {}
188
189 - def open(self):
190 self._makeProxy() 191 if hasattr(self.proxy, 'open'): 192 self.proxy.open()
193
194 - def close(self):
195 if hasattr(self.proxy, 'close'): 196 self.proxy.close()
197
198 - def _makeProxy(self):
199 p = self.proxy 200 if (p is None or 201 (p.ip, p.port) != self.address or 202 p.snmpVersion != self.version or 203 p.port != self.port): 204 self.proxy = AgentProxy(ip=self.address[0], 205 port=self.address[1], 206 community=self.community, 207 snmpVersion=self.version, 208 protocol=self.protocol, 209 allowCache=True) 210 self.proxy.tries = self.tries 211 self.proxy.timeout = self.timeout
212 213
214 - def updateConfig(self, processes):
215 unused = Set(self.processes.keys()) 216 for name, originalName, ignoreParameters, restart, severity \ 217 in processes: 218 unused.discard(name) 219 p = self.processes.setdefault(name, Process()) 220 p.name = name 221 p.originalName = originalName 222 p.ignoreParameters = ignoreParameters 223 p.restart = restart 224 p.severity = severity 225 for name in unused: 226 del self.processes[name]
227 228
229 - def get(self, oids):
230 return self.proxy.get(oids, self.timeout, self.tries)
231 232
233 - def getTables(self, oids):
234 t = TableRetriever(self.proxy, oids, 235 timeout=self.timeout, 236 retryCount=self.tries, 237 maxRepetitions=self.maxOidsPerRequest / len(oids)) 238 return t()
239 240
241 -class zenprocess(SnmpDaemon):
242 statusEvent = { 'eventClass' : Status_OSProcess, 243 'eventGroup' : 'Process' } 244 initialServices = SnmpDaemon.initialServices + ['ProcessConfig'] 245 processConfigInterval = 20*60 246 processCycleInterval = 5*60 247 properties = SnmpDaemon.properties + ('processCycleInterval',) 248
249 - def __init__(self):
250 SnmpDaemon.__init__(self, 'zenprocess') 251 self._devices = {} 252 self.scanning = None 253 self.downDevices = Set()
254
255 - def devices(self):
256 "Return a filtered list of devices" 257 return dict([(k, v) for k, v in self._devices.items() 258 if k not in self.downDevices])
259
260 - def fetchConfig(self):
261 'Get configuration values from the Zope server' 262 def doFetchConfig(driver): 263 yield self.model().callRemote('getDefaultRRDCreateCommand') 264 createCommand = driver.next() 265 266 yield self.model().callRemote('propertyItems') 267 self.setPropertyItems(driver.next()) 268 269 self.rrd = RRDUtil(createCommand, self.processCycleInterval) 270 271 yield self.model().callRemote('getThresholdClasses') 272 self.remote_updateThresholdClasses(driver.next()) 273 274 devices = [] 275 if self.options.device: 276 devices = [self.options.device] 277 yield self.model().callRemote('getOSProcessConf', devices) 278 driver.next()
279 280 return drive(doFetchConfig)
281
282 - def remote_deleteDevice(self, doomed):
283 self.log.debug("Async delete device %s" % doomed) 284 if doomed in self._devices: 285 del self._devices[doomed] 286 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
287
288 - def remote_updateDeviceList(self, devices):
289 self.log.debug("Async update device list %s" % devices) 290 doomed = Set(self._devices.keys()) 291 updated = [] 292 for device, lastChange in devices: 293 cfg = self._devices.get(device, None) 294 if not cfg or self._devices[device].lastChange < lastChange: 295 updated.append(device) 296 doomed.discard(device) 297 if updated: 298 log.info("Fetching the config for %s", updated) 299 d = self.model().callRemote('getOSProcessConf', devices) 300 d.addCallback(self.updateDevices, updated) 301 d.addErrback(self.error) 302 if doomed: 303 log.info("Removing %s", doomed) 304 for device in doomed: 305 del self._devices[device] 306 self.clearSnmpError(device, "device %s removed" % device)
307 308
309 - def clearSnmpError(self, name, message):
310 if name in self._devices: 311 if self._devices[name].snmpStatus > 0: 312 self._devices[name].snmpStatus = 0 313 self.sendEvent(self.statusEvent, 314 eventClass=Status_Snmp, 315 component="snmp", 316 device=name, 317 summary=message, 318 severity=Event.Clear)
319 320
321 - def remote_updateDevice(self, cfg):
322 self.log.debug("Async config update for %s", cfg[1][0]) 323 self.updateDevices([cfg],[])
324 325
326 - def updateDevices(self, cfgs, fetched):
327 names = Set() 328 for cfg in cfgs: 329 lastChange, snmpConf, procs, thresholds = cfg 330 name, addr, snmpConf, maxOidsPerRequest = snmpConf 331 community, version, timeout, tries = snmpConf 332 names.add(name) 333 d = self._devices.setdefault(name, Device()) 334 d.lastChange = lastChange 335 d.name = name 336 d.address = addr 337 d.community = community 338 d.version = version 339 d.timeout = timeout 340 d.tries = tries 341 d.maxOidsPerRequest = maxOidsPerRequest 342 d.updateConfig(procs) 343 d.protocol = self.snmpPort.protocol 344 self.thresholds.updateList(thresholds) 345 for doomed in Set(fetched) - names: 346 if doomed in self._devices: 347 del self._devices[doomed]
348
349 - def start(self, driver):
350 'Read the basic config needed to do anything' 351 log.debug("fetching config") 352 devices = self._devices.keys() 353 yield self.fetchConfig() 354 self.updateDevices(driver.next(), devices) 355 356 yield self.model().callRemote('getSnmpStatus', self.options.device) 357 self.updateSnmpStatus(driver.next()) 358 359 yield self.model().callRemote('getProcessStatus', self.options.device) 360 self.updateProcessStatus(driver.next()) 361 362 driveLater(self.configCycleInterval * 60, self.start)
363 364
365 - def updateSnmpStatus(self, updates):
366 for name, count in updates: 367 d = self._devices.get(name) 368 if d: 369 d.snmpStatus = count
370 371
372 - def updateProcessStatus(self, status):
373 down = {} 374 for device, component, count in status: 375 down[ (device, component) ] = count 376 for name, device in self._devices.items(): 377 for p in device.processes.values(): 378 p.status = down.get( (name, p.originalName), 0)
379 380
381 - def oneDevice(self, device):
382 device.open() 383 def go(driver): 384 yield self.scanDevice(device) 385 driver.next() 386 yield self.fetchPerf(device) 387 driver.next()
388 d = drive(go) 389 d.addBoth(closer, device) 390 return d 391 392
393 - def scanDevice(self, device):
394 "Fetch all the process info" 395 device.lastScan = time.time() 396 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 397 d = device.getTables(tables) 398 d.addCallback(self.storeProcessNames, device) 399 d.addErrback(self.deviceFailure, device) 400 return d
401 402
403 - def deviceFailure(self, error, device):
404 "Log exception for a single device" 405 self.sendEvent(self.statusEvent, 406 eventClass=Status_Snmp, 407 component="snmp", 408 device=device.name, 409 summary='Unable to read processes on device %s' % device.name, 410 severity=Event.Error) 411 device.snmpStatus += 1 412 self.logError('Error on device %s' % device.name, error.value)
413 414
415 - def storeProcessNames(self, results, device):
416 "Parse the process tables and figure what pids are on the device" 417 if not results: 418 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name 419 self.sendEvent(self.statusEvent, 420 device=device.name, 421 summary=summary, 422 severity=Event.Error) 423 log.info(summary) 424 return 425 if device.snmpStatus > 0: 426 device.snmpStatus = 0 427 summary = 'Good SNMP response from device %s' % device.name 428 self.clearSnmpError(device.name, summary) 429 430 procs = [] 431 names, paths, args = {}, {}, {} 432 def extract(dictionary, oid, value): 433 pid = int(oid.split('.')[-1]) 434 dictionary[pid] = value
435 for row in results[NAMETABLE].items(): 436 extract(names, *row) 437 for row in results[PATHTABLE].items(): 438 extract(paths, *row) 439 for row in results[ARGSTABLE].items(): 440 extract(args, *row) 441 for i, path in paths.items(): 442 if i in names and i in args: 443 name = names[i] 444 if path and path.find('\\') == -1: 445 name = path 446 procs.append( (i, (name, args[i]) ) ) 447 # look for changes in pids 448 before = Set(device.pids.keys()) 449 after = {} 450 for p in device.processes.values(): 451 for pid, (name, args) in procs: 452 if p.match(name, args): 453 log.debug("Found process %d on %s" % (pid, p.name)) 454 after[pid] = p 455 afterSet = Set(after.keys()) 456 afterByConfig = reverseDict(after) 457 new = afterSet - before 458 dead = before - afterSet 459 460 # report pid restarts 461 for p in dead: 462 config = device.pids[p] 463 config.discardPid(p) 464 if afterByConfig.has_key(config): 465 if config.restart: 466 summary = 'Process restarted: %s' % config.originalName 467 self.sendEvent(self.statusEvent, 468 device=device.name, 469 summary=summary, 470 component=config.originalName, 471 severity=config.severity) 472 log.info(summary) 473 474 # report alive processes 475 for config, pids in afterByConfig.items(): 476 if config.status > 0: 477 summary = "Process up: %s" % config.originalName 478 self.sendEvent(self.statusEvent, 479 device=device.name, 480 summary=summary, 481 component=config.originalName, 482 severity=Event.Clear) 483 config.status = 0 484 log.debug(summary) 485 486 for p in new: 487 log.debug("Found new %s pid %d on %s" % ( 488 after[p].originalName, p, device.name)) 489 device.pids = after 490 491 # no pids for a config 492 for config in device.processes.values(): 493 if not afterByConfig.has_key(config): 494 config.status += 1 495 summary = 'Process not running: %s' % config.originalName 496 self.sendEvent(self.statusEvent, 497 device=device.name, 498 summary=summary, 499 component=config.originalName, 500 severity=config.severity) 501 log.warning(summary) 502 503 # store counts 504 pidCounts = dict([(p, 0) for p in device.processes]) 505 for pids, pidConfig in device.pids.items(): 506 pidCounts[pidConfig.name] += 1 507 for name, count in pidCounts.items(): 508 self.save(device.name, name, 'count_count', count, 'GAUGE') 509 510
511 - def periodic(self, unused=None):
512 "Basic SNMP scan loop" 513 reactor.callLater(self.processCycleInterval, self.periodic) 514 515 if self.scanning: 516 running, unstarted, finished = self.scanning.status() 517 msg = "performance scan job not finishing: " \ 518 "%d jobs running %d jobs waiting %d jobs finished" % \ 519 (running, unstarted, finished) 520 log.error(msg) 521 return 522 523 def doPeriodic(driver): 524 525 yield self.getDevicePingIssues() 526 self.downDevices = Set([d[0] for d in driver.next()]) 527 528 self.scanning = NJobs(PARALLEL_JOBS, 529 self.oneDevice, 530 self.devices().values()) 531 yield self.scanning.start() 532 driver.next()
533 534 drive(doPeriodic).addCallback(self.heartbeat) 535 536
537 - def fetchPerf(self, device):
538 "Get performance data for all the monitored Processes on a device" 539 oids = [] 540 for pid, pidConf in device.pids.items(): 541 oids.extend([CPU + str(pid), MEM + str(pid)]) 542 if not oids: 543 return defer.succeed(([], device)) 544 545 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run() 546 d.addBoth(self.storePerfStats, device) 547 return d
548 549
550 - def storePerfStats(self, results, device):
551 "Save the performance data in RRD files" 552 if isinstance(results, failure.Failure): 553 self.error(results) 554 return results 555 self.clearSnmpError(device.name, 556 'Performance data read for %s' % device.name) 557 parts = {} 558 for success, values in results: 559 if success: 560 parts.update(values) 561 results = parts 562 byConf = reverseDict(device.pids) 563 for pidConf, pids in byConf.items(): 564 if len(pids) != 1: 565 log.info("There are %d pids by the name %s", 566 len(pids), pidConf.name) 567 pidName = pidConf.name 568 for pid in pids: 569 cpu = results.get(CPU + str(pid), None) 570 mem = results.get(MEM + str(pid), None) 571 pidConf.updateCpu(pid, cpu) 572 pidConf.updateMemory(pid, mem) 573 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(), 574 'DERIVE', min=0) 575 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024, 576 'GAUGE')
577 578
579 - def save(self, deviceName, pidName, statName, value, rrdType, 580 min='U', max='U'):
581 "Save an value in the right path in RRD files" 582 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 583 value = self.rrd.save(path, value, rrdType, min=min, max=max) 584 585 for ev in self.thresholds.check('/' + path, time.time(), value): 586 self.sendThresholdEvent(**ev)
587 588
589 - def heartbeat(self, *unused):
590 self.scanning = None 591 devices = self.devices() 592 pids = sum(map(lambda x: len(x.pids), devices.values())) 593 log.info("Pulled process status for %d devices and %d processes", 594 len(devices), pids) 595 SnmpDaemon.heartbeat(self)
596 597
598 - def connected(self):
599 drive(self.start).addCallbacks(self.periodic, self.errorStop)
600 601 602 if __name__ == '__main__': 603 z = zenprocess() 604 z.run() 605