| Trees | Indices | Help |
|
|---|
|
|
1 #! /usr/bin/env python
2 ###########################################################################
3 #
4 # This program is part of Zenoss Core, an open source monitoring platform.
5 # Copyright (C) 2007, Zenoss Inc.
6 #
7 # This program is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 #
11 # For complete information please visit: http://www.zenoss.com/oss/
12 #
13 ###########################################################################
14
15 __doc__='''zenprocess
16
17 Gets snmp process performance data and stores it in RRD files.
18
19 $Id$
20 '''
21
22 __version__ = "$Revision$"[11:-2]
23
24 import logging
25 import time
26 from sets import Set
27
28 log = logging.getLogger("zen.zenprocess")
29
30 from twisted.internet import reactor, defer
31 from twisted.python import failure
32
33 try:
34 from pynetsnmp.twistedsnmp import AgentProxy
35 from pynetsnmp.tableretriever import TableRetriever
36 except ImportError:
37 import warnings
38 warnings.warn("Using python-based snmp enging")
39 from twistedsnmp.agentproxy import AgentProxy
40 from twistedsnmp.tableretriever import TableRetriever
41
42 import Globals
43 from Products.ZenUtils.Driver import drive, driveLater
44 from Products.ZenUtils.NJobs import NJobs
45 from Products.ZenUtils.Chain import Chain
46 from Products.ZenModel.PerformanceConf import performancePath
47 from Products.ZenEvents import Event
48 from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess
49
50 from Products.ZenRRD.RRDUtil import RRDUtil
51 from SnmpDaemon import SnmpDaemon
52
53 HOSTROOT ='.1.3.6.1.2.1.25'
54 RUNROOT = HOSTROOT + '.4'
55 NAMETABLE = RUNROOT + '.2.1.2'
56 PATHTABLE = RUNROOT + '.2.1.4'
57 ARGSTABLE = RUNROOT + '.2.1.5'
58 PERFROOT = HOSTROOT + '.5'
59 CPU = PERFROOT + '.1.1.1.' # note trailing dot
60 MEM = PERFROOT + '.1.1.2.' # note trailing dot
61
62 PARALLEL_JOBS = 10
63
64 WRAP=0xffffffffL
65
66 try:
67 sorted = sorted # added in python 2.4
68 except NameError:
72
74 """return a dictionary with keys and values swapped:
75 all values are lists to handle the different keys mapping to the same value
76 """
77 result = {}
78 for a, v in d.items():
79 result.setdefault(v, []).append(a)
80 return result
81
85
89
91
93 cpu = None
94 memory = None
95
97 if n is not None:
98 try:
99 n = int(n)
100 except ValueError, er:
101 log.warning("Bad value for CPU: '%s'", n)
102
103 if self.cpu is None or n is None:
104 self.cpu = n
105 return None
106 diff = n - self.cpu
107 if diff < 0:
108 # don't provide a value when the counter falls backwards
109 n = None
110 diff = None
111 self.cpu = n
112 return diff
113
115 self.memory = n
116
119 __repr__ = __str__
120
121
123 'track process-specific configuration data'
124 name = None
125 originalName = None
126 ignoreParameters = False
127 restart = None
128 severity = Event.Warning
129 status = 0
130 cpu = 0
131
134
136 if self.name is None:
137 return False
138 if self.ignoreParameters or not args:
139 return self.originalName == name
140 return self.originalName == '%s %s' % (name, args)
141
144 __repr__ = __str__
145
147 p = self.pids.setdefault(pid, Pid())
148 cpu = p.updateCpu(value)
149 if cpu is not None:
150 self.cpu += cpu
151 self.cpu %= WRAP
152
154 return self.cpu
155
158
162
166
168 'track device data'
169 name = ''
170 address = ('', 0)
171 community = 'public'
172 version = '1'
173 port = 161
174 proxy = None
175 timeout = 2.5
176 tries = 2
177 protocol = None
178 lastScan = 0.
179 snmpStatus = 0
180 lastChange = 0
181 maxOidsPerRequest = 40
182
184 # map process name to Process object above
185 self.processes = {}
186 # map pid number to Process object
187 self.pids = {}
188
193
197
199 p = self.proxy
200 if (p is None or
201 (p.ip, p.port) != self.address or
202 p.snmpVersion != self.version or
203 p.port != self.port):
204 self.proxy = AgentProxy(ip=self.address[0],
205 port=self.address[1],
206 community=self.community,
207 snmpVersion=self.version,
208 protocol=self.protocol,
209 allowCache=True)
210 self.proxy.tries = self.tries
211 self.proxy.timeout = self.timeout
212
213
215 unused = Set(self.processes.keys())
216 for name, originalName, ignoreParameters, restart, severity \
217 in processes:
218 unused.discard(name)
219 p = self.processes.setdefault(name, Process())
220 p.name = name
221 p.originalName = originalName
222 p.ignoreParameters = ignoreParameters
223 p.restart = restart
224 p.severity = severity
225 for name in unused:
226 del self.processes[name]
227
228
231
232
234 t = TableRetriever(self.proxy, oids,
235 timeout=self.timeout,
236 retryCount=self.tries,
237 maxRepetitions=self.maxOidsPerRequest / len(oids))
238 return t()
239
240
242 statusEvent = { 'eventClass' : Status_OSProcess,
243 'eventGroup' : 'Process' }
244 initialServices = SnmpDaemon.initialServices + ['ProcessConfig']
245 processConfigInterval = 20*60
246 processCycleInterval = 5*60
247 properties = SnmpDaemon.properties + ('processCycleInterval',)
248
250 SnmpDaemon.__init__(self, 'zenprocess')
251 self._devices = {}
252 self.scanning = None
253 self.downDevices = Set()
254
256 "Return a filtered list of devices"
257 return dict([(k, v) for k, v in self._devices.items()
258 if k not in self.downDevices])
259
261 'Get configuration values from the Zope server'
262 def doFetchConfig(driver):
263 yield self.model().callRemote('getDefaultRRDCreateCommand')
264 createCommand = driver.next()
265
266 yield self.model().callRemote('propertyItems')
267 self.setPropertyItems(driver.next())
268
269 self.rrd = RRDUtil(createCommand, self.processCycleInterval)
270
271 yield self.model().callRemote('getThresholdClasses')
272 self.remote_updateThresholdClasses(driver.next())
273
274 devices = []
275 if self.options.device:
276 devices = [self.options.device]
277 yield self.model().callRemote('getOSProcessConf', devices)
278 driver.next()
279
280 return drive(doFetchConfig)
281
283 self.log.debug("Async delete device %s" % doomed)
284 if doomed in self._devices:
285 del self._devices[doomed]
286 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
287
289 self.log.debug("Async update device list %s" % devices)
290 doomed = Set(self._devices.keys())
291 updated = []
292 for device, lastChange in devices:
293 cfg = self._devices.get(device, None)
294 if not cfg or self._devices[device].lastChange < lastChange:
295 updated.append(device)
296 doomed.discard(device)
297 if updated:
298 log.info("Fetching the config for %s", updated)
299 d = self.model().callRemote('getOSProcessConf', devices)
300 d.addCallback(self.updateDevices, updated)
301 d.addErrback(self.error)
302 if doomed:
303 log.info("Removing %s", doomed)
304 for device in doomed:
305 del self._devices[device]
306 self.clearSnmpError(device, "device %s removed" % device)
307
308
310 if name in self._devices:
311 if self._devices[name].snmpStatus > 0:
312 self._devices[name].snmpStatus = 0
313 self.sendEvent(self.statusEvent,
314 eventClass=Status_Snmp,
315 component="snmp",
316 device=name,
317 summary=message,
318 severity=Event.Clear)
319
320
324
325
327 names = Set()
328 for cfg in cfgs:
329 lastChange, snmpConf, procs, thresholds = cfg
330 name, addr, snmpConf, maxOidsPerRequest = snmpConf
331 community, version, timeout, tries = snmpConf
332 names.add(name)
333 d = self._devices.setdefault(name, Device())
334 d.lastChange = lastChange
335 d.name = name
336 d.address = addr
337 d.community = community
338 d.version = version
339 d.timeout = timeout
340 d.tries = tries
341 d.maxOidsPerRequest = maxOidsPerRequest
342 d.updateConfig(procs)
343 d.protocol = self.snmpPort.protocol
344 self.thresholds.updateList(thresholds)
345 for doomed in Set(fetched) - names:
346 if doomed in self._devices:
347 del self._devices[doomed]
348
350 'Read the basic config needed to do anything'
351 log.debug("fetching config")
352 devices = self._devices.keys()
353 yield self.fetchConfig()
354 self.updateDevices(driver.next(), devices)
355
356 yield self.model().callRemote('getSnmpStatus', self.options.device)
357 self.updateSnmpStatus(driver.next())
358
359 yield self.model().callRemote('getProcessStatus', self.options.device)
360 self.updateProcessStatus(driver.next())
361
362 driveLater(self.configCycleInterval * 60, self.start)
363
364
370
371
373 down = {}
374 for device, component, count in status:
375 down[ (device, component) ] = count
376 for name, device in self._devices.items():
377 for p in device.processes.values():
378 p.status = down.get( (name, p.originalName), 0)
379
380
382 device.open()
383 def go(driver):
384 yield self.scanDevice(device)
385 driver.next()
386 yield self.fetchPerf(device)
387 driver.next()
388 d = drive(go)
389 d.addBoth(closer, device)
390 return d
391
392
394 "Fetch all the process info"
395 device.lastScan = time.time()
396 tables = [NAMETABLE, PATHTABLE, ARGSTABLE]
397 d = device.getTables(tables)
398 d.addCallback(self.storeProcessNames, device)
399 d.addErrback(self.deviceFailure, device)
400 return d
401
402
404 "Log exception for a single device"
405 self.sendEvent(self.statusEvent,
406 eventClass=Status_Snmp,
407 component="snmp",
408 device=device.name,
409 summary='Unable to read processes on device %s' % device.name,
410 severity=Event.Error)
411 device.snmpStatus += 1
412 self.logError('Error on device %s' % device.name, error.value)
413
414
416 "Parse the process tables and figure what pids are on the device"
417 if not results:
418 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name
419 self.sendEvent(self.statusEvent,
420 device=device.name,
421 summary=summary,
422 severity=Event.Error)
423 log.info(summary)
424 return
425 if device.snmpStatus > 0:
426 device.snmpStatus = 0
427 summary = 'Good SNMP response from device %s' % device.name
428 self.clearSnmpError(device.name, summary)
429
430 procs = []
431 names, paths, args = {}, {}, {}
432 def extract(dictionary, oid, value):
433 pid = int(oid.split('.')[-1])
434 dictionary[pid] = value
435 for row in results[NAMETABLE].items():
436 extract(names, *row)
437 for row in results[PATHTABLE].items():
438 extract(paths, *row)
439 for row in results[ARGSTABLE].items():
440 extract(args, *row)
441 for i, path in paths.items():
442 if i in names and i in args:
443 name = names[i]
444 if path and path.find('\\') == -1:
445 name = path
446 procs.append( (i, (name, args[i]) ) )
447 # look for changes in pids
448 before = Set(device.pids.keys())
449 after = {}
450 for p in device.processes.values():
451 for pid, (name, args) in procs:
452 if p.match(name, args):
453 log.debug("Found process %d on %s" % (pid, p.name))
454 after[pid] = p
455 afterSet = Set(after.keys())
456 afterByConfig = reverseDict(after)
457 new = afterSet - before
458 dead = before - afterSet
459
460 # report pid restarts
461 for p in dead:
462 config = device.pids[p]
463 config.discardPid(p)
464 if afterByConfig.has_key(config):
465 if config.restart:
466 summary = 'Process restarted: %s' % config.originalName
467 self.sendEvent(self.statusEvent,
468 device=device.name,
469 summary=summary,
470 component=config.originalName,
471 severity=config.severity)
472 log.info(summary)
473
474 # report alive processes
475 for config, pids in afterByConfig.items():
476 if config.status > 0:
477 summary = "Process up: %s" % config.originalName
478 self.sendEvent(self.statusEvent,
479 device=device.name,
480 summary=summary,
481 component=config.originalName,
482 severity=Event.Clear)
483 config.status = 0
484 log.debug(summary)
485
486 for p in new:
487 log.debug("Found new %s pid %d on %s" % (
488 after[p].originalName, p, device.name))
489 device.pids = after
490
491 # no pids for a config
492 for config in device.processes.values():
493 if not afterByConfig.has_key(config):
494 config.status += 1
495 summary = 'Process not running: %s' % config.originalName
496 self.sendEvent(self.statusEvent,
497 device=device.name,
498 summary=summary,
499 component=config.originalName,
500 severity=config.severity)
501 log.warning(summary)
502
503 # store counts
504 pidCounts = dict([(p, 0) for p in device.processes])
505 for pids, pidConfig in device.pids.items():
506 pidCounts[pidConfig.name] += 1
507 for name, count in pidCounts.items():
508 self.save(device.name, name, 'count_count', count, 'GAUGE')
509
510
512 "Basic SNMP scan loop"
513 reactor.callLater(self.processCycleInterval, self.periodic)
514
515 if self.scanning:
516 running, unstarted, finished = self.scanning.status()
517 msg = "performance scan job not finishing: " \
518 "%d jobs running %d jobs waiting %d jobs finished" % \
519 (running, unstarted, finished)
520 log.error(msg)
521 return
522
523 def doPeriodic(driver):
524
525 yield self.getDevicePingIssues()
526 self.downDevices = Set([d[0] for d in driver.next()])
527
528 self.scanning = NJobs(PARALLEL_JOBS,
529 self.oneDevice,
530 self.devices().values())
531 yield self.scanning.start()
532 driver.next()
533
534 drive(doPeriodic).addCallback(self.heartbeat)
535
536
538 "Get performance data for all the monitored Processes on a device"
539 oids = []
540 for pid, pidConf in device.pids.items():
541 oids.extend([CPU + str(pid), MEM + str(pid)])
542 if not oids:
543 return defer.succeed(([], device))
544
545 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run()
546 d.addBoth(self.storePerfStats, device)
547 return d
548
549
551 "Save the performance data in RRD files"
552 if isinstance(results, failure.Failure):
553 self.error(results)
554 return results
555 self.clearSnmpError(device.name,
556 'Performance data read for %s' % device.name)
557 parts = {}
558 for success, values in results:
559 if success:
560 parts.update(values)
561 results = parts
562 byConf = reverseDict(device.pids)
563 for pidConf, pids in byConf.items():
564 if len(pids) != 1:
565 log.info("There are %d pids by the name %s",
566 len(pids), pidConf.name)
567 pidName = pidConf.name
568 for pid in pids:
569 cpu = results.get(CPU + str(pid), None)
570 mem = results.get(MEM + str(pid), None)
571 pidConf.updateCpu(pid, cpu)
572 pidConf.updateMemory(pid, mem)
573 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(),
574 'DERIVE', min=0)
575 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024,
576 'GAUGE')
577
578
581 "Save an value in the right path in RRD files"
582 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
583 value = self.rrd.save(path, value, rrdType, min=min, max=max)
584
585 for ev in self.thresholds.check('/' + path, time.time(), value):
586 self.sendThresholdEvent(**ev)
587
588
596
597
600
601
602 if __name__ == '__main__':
603 z = zenprocess()
604 z.run()
605
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Thu Oct 25 16:28:48 2007 | http://epydoc.sourceforge.net |