| Trees | Indices | Help |
|
|---|
|
|
1 #! /usr/bin/env python
2 ###########################################################################
3 #
4 # This program is part of Zenoss Core, an open source monitoring platform.
5 # Copyright (C) 2007, Zenoss Inc.
6 #
7 # This program is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 #
11 # For complete information please visit: http://www.zenoss.com/oss/
12 #
13 ###########################################################################
14
15 __doc__= """zenprocess
16
17 Gets SNMP process performance data and stores it in RRD files.
18 """
19
20 import logging
21 import time
22 from sets import Set
23
24 log = logging.getLogger("zen.zenprocess")
25
26 from twisted.internet import reactor, defer, error
27
28 import Globals
29 from Products.ZenUtils.Driver import drive, driveLater
30 from Products.ZenUtils.NJobs import NJobs
31 from Products.ZenUtils.Chain import Chain
32 from Products.ZenEvents import Event
33 from Products.ZenEvents.ZenEventClasses import Status_Snmp, \
34 Status_OSProcess, Critical, Status_Perf
35
36 from Products.ZenRRD.RRDUtil import RRDUtil
37 from SnmpDaemon import SnmpDaemon
38
39 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
40 # needed for pb comms
41 SnmpConnInfo = SnmpConnInfo
42
43 HOSTROOT ='.1.3.6.1.2.1.25'
44 RUNROOT = HOSTROOT + '.4'
45 NAMETABLE = RUNROOT + '.2.1.2'
46 PATHTABLE = RUNROOT + '.2.1.4'
47 ARGSTABLE = RUNROOT + '.2.1.5'
48 PERFROOT = HOSTROOT + '.5'
49 CPU = PERFROOT + '.1.1.1.' # note trailing dot
50 MEM = PERFROOT + '.1.1.2.' # note trailing dot
51
52 DEFAULT_PARALLEL_JOBS = 10
53
54 WRAP=0xffffffffL
55
56 try:
57 sorted = sorted # added in python 2.4
58 except NameError:
62
64 """return a dictionary with keys and values swapped:
65 all values are lists to handle the different keys mapping to the same value
66 """
67 result = {}
68 for a, v in d.items():
69 result.setdefault(v, []).append(a)
70 return result
71
75
77
79 cpu = None
80 memory = None
81
83 if n is not None:
84 try:
85 n = int(n)
86 except ValueError, er:
87 log.warning("Bad value for CPU: '%s'", n)
88
89 if self.cpu is None or n is None:
90 self.cpu = n
91 return None
92 diff = n - self.cpu
93 if diff < 0:
94 # don't provide a value when the counter falls backwards
95 n = None
96 diff = None
97 self.cpu = n
98 return diff
99
101 self.memory = n
102
105 __repr__ = __str__
106
107
108 from twisted.spread import pb
110 'track process-specific configuration data'
111 name = None
112 originalName = None
113 ignoreParameters = False
114 restart = None
115 severity = Event.Warning
116 status = 0
117 cpu = 0
118 cycleTime = None
119
122
124 if self.name is None:
125 return False
126 if self.ignoreParameters or not args:
127 return self.originalName == name
128 return self.originalName == '%s %s' % (name, args)
129
131 return str(self.name)
132 __repr__ = __str__
133
135 p = self.pids.setdefault(pid, Pid())
136 cpu = p.updateCpu(value)
137 if cpu is not None:
138 self.cpu += cpu
139 self.cpu %= WRAP
140
142 return self.cpu
143
146
150
154
156 if self is update:
157 return
158 self.name = update.name
159 self.originalName = update.originalName
160 self.ignoreParameters = update.ignoreParameters
161 self.restart = update.restart
162 self.severity = update.severity
163
164 pb.setUnjellyableForClass(Process, Process)
165
167 'track device data'
168 name = ''
169 snmpConnInfo = None
170 proxy = None
171 lastScan = 0.
172 snmpStatus = 0
173 lastChange = 0
174 maxOidsPerRequest = 40
175
177 # map process name to Process object above
178 self.processes = {}
179 # map pid number to Process object
180 self.pids = {}
181
183 self._makeProxy()
184
190
192 p = self.proxy
193 c = self.snmpConnInfo
194 if (p is None or p.snmpConnInfo != c):
195 self.proxy = self.snmpConnInfo.createSession()
196 self.proxy.open()
197
199 if self is cfg:
200 return
201 self.snmpConnInfo = cfg.snmpConnInfo
202 unused = Set(self.processes.keys())
203 for update in cfg.processes.values():
204 unused.discard(update.name)
205 p = self.processes.setdefault(update.name, Process())
206 p.updateConfig(update)
207 for name in unused:
208 del self.processes[name]
209
210
212 return self.proxy.get(oids,
213 self.snmpConnInfo.zSnmpTimeout,
214 self.snmpConnInfo.zSnmpTries)
215
216
218 repetitions = self.maxOidsPerRequest / len(oids)
219 t = self.proxy.getTable(oids,
220 timeout=self.snmpConnInfo.zSnmpTimeout,
221 retryCount=self.snmpConnInfo.zSnmpTries,
222 maxRepetitions=repetitions)
223 return t
224 pb.setUnjellyableForClass(Device, Device)
225
226
228 statusEvent = { 'eventClass' : Status_OSProcess,
229 'eventGroup' : 'Process' }
230 initialServices = SnmpDaemon.initialServices + ['ProcessConfig']
231 processConfigInterval = 20*60
232 processCycleInterval = 5*60
233 properties = SnmpDaemon.properties + ('processCycleInterval',)
234 missing = 0
235 restarted = 0
236 parallelJobs = DEFAULT_PARALLEL_JOBS
237
239 SnmpDaemon.__init__(self, 'zenprocess', noopts)
240 self._devices = {}
241 self.scanning = None
242 self.downDevices = Set()
243
245 "Return a filtered list of devices"
246 return dict([(k, v) for k, v in self._devices.items()
247 if k not in self.downDevices])
248
250 'Get configuration values from the Zope server'
251 def doFetchConfig(driver):
252 now = time.time()
253
254 yield self.model().callRemote('getDefaultRRDCreateCommand')
255 createCommand = driver.next()
256
257 yield self.model().callRemote('getZenProcessParallelJobs')
258 self.parallelJobs = int(driver.next())
259
260 yield self.model().callRemote('propertyItems')
261 self.setPropertyItems(driver.next())
262
263 self.rrd = RRDUtil(createCommand, self.processCycleInterval)
264
265 yield self.model().callRemote('getThresholdClasses')
266 self.remote_updateThresholdClasses(driver.next())
267
268 yield self.model().callRemote('getCollectorThresholds')
269 self.rrdStats.config(self.options.monitor,
270 self.name,
271 driver.next(),
272 createCommand)
273
274 devices = []
275 if self.options.device:
276 devices = [self.options.device]
277 yield self.model().callRemote('getOSProcessConf', devices)
278 driver.next()
279 self.sendEvents(
280 self.rrdStats.gauge('configTime',
281 self.processConfigInterval,
282 time.time() - now)
283 )
284
285 return drive(doFetchConfig)
286
288 self.log.debug("Async delete device %s" % doomed)
289 if doomed in self._devices:
290 del self._devices[doomed]
291 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
292
294 self.log.debug("Async update device list %s" % devices)
295 doomed = Set(self._devices.keys())
296 updated = []
297 for device, lastChange in devices:
298 cfg = self._devices.get(device, None)
299 if not cfg or self._devices[device].lastChange < lastChange:
300 updated.append(device)
301 doomed.discard(device)
302 if updated:
303 log.info("Fetching the config for %s", updated)
304 d = self.model().callRemote('getOSProcessConf', devices)
305 d.addCallback(self.updateDevices, updated)
306 d.addErrback(self.error)
307 if doomed:
308 log.info("Removing %s", doomed)
309 for device in doomed:
310 del self._devices[device]
311 self.clearSnmpError(device, "device %s removed" % device)
312
313
315 if name in self._devices:
316 if self._devices[name].snmpStatus > 0:
317 self._devices[name].snmpStatus = 0
318 self.sendEvent(self.statusEvent,
319 eventClass=Status_Snmp,
320 component="process",
321 device=name,
322 summary=message,
323 severity=Event.Clear)
324
325
329
330
332 received = Set()
333 for cfg in cfgs:
334 received.add(cfg.name)
335 d = self._devices.setdefault(cfg.name, cfg)
336 d.updateConfig(cfg)
337 self.thresholds.updateForDevice(cfg.name, cfg.thresholds)
338 for doomed in Set(fetched) - received:
339 if doomed in self._devices:
340 del self._devices[doomed]
341
343 'Read the basic config needed to do anything'
344 log.debug("fetching config")
345 devices = self._devices.keys()
346 yield self.fetchConfig()
347 self.updateDevices(driver.next(), devices)
348
349 yield self.model().callRemote('getSnmpStatus', self.options.device)
350 self.updateSnmpStatus(driver.next())
351
352 yield self.model().callRemote('getProcessStatus', self.options.device)
353 self.updateProcessStatus(driver.next())
354
355 driveLater(self.configCycleInterval * 60, self.start)
356
357
363
364
366 down = {}
367 for device, component, count in status:
368 down[ (device, component) ] = count
369 for name, device in self._devices.items():
370 for p in device.processes.values():
371 p.status = down.get( (name, p.originalName), 0)
372
373
375 def go(driver):
376 try:
377 device.open()
378 yield self.scanDevice(device)
379 driver.next()
380
381 # Only fetch performance data if status data was found.
382 if device.snmpStatus == 0:
383 yield self.fetchPerf(device)
384 driver.next()
385 except:
386 log.debug('Failed to scan device %s' % device.name)
387
388 def close(res):
389 try:
390 device.close()
391 except:
392 log.debug("Failed to close device %s" % device.name)
393
394 d = drive(go)
395 d.addBoth(close)
396 return d
397
398
400 "Fetch all the process info"
401 device.lastScan = time.time()
402 tables = [NAMETABLE, PATHTABLE, ARGSTABLE]
403 d = device.getTables(tables)
404 d.addCallback(self.storeProcessNames, device)
405 d.addErrback(self.deviceFailure, device)
406 return d
407
408
410 "Log exception for a single device"
411 self.sendEvent(self.statusEvent,
412 eventClass=Status_Snmp,
413 component="process",
414 device=device.name,
415 summary='Unable to read processes on device %s' % device.name,
416 severity=Event.Error)
417 device.snmpStatus += 1
418 if isinstance(reason.value, error.TimeoutError):
419 self.log.debug('Timeout on device %s' % device.name)
420 else:
421 self.logError('Error on device %s' % device.name, reason.value)
422
423
425 "Parse the process tables and figure what pids are on the device"
426 if not results:
427 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name
428 self.sendEvent(self.statusEvent,
429 device=device.name,
430 summary=summary,
431 severity=Event.Error)
432 log.info(summary)
433 return
434 if device.snmpStatus > 0:
435 summary = 'Process table up for device %s' % device.name
436 self.clearSnmpError(device.name, summary)
437
438 procs = []
439 names, paths, args = {}, {}, {}
440 def extract(dictionary, oid, value):
441 pid = int(oid.split('.')[-1])
442 dictionary[pid] = value
443 for row in results[NAMETABLE].items():
444 extract(names, *row)
445 for row in results[PATHTABLE].items():
446 extract(paths, *row)
447 for row in results[ARGSTABLE].items():
448 extract(args, *row)
449 for i, path in paths.items():
450 if i not in names: continue
451 name = names[i]
452 if path and path.find('\\') == -1:
453 name = path
454 arg = ''
455 if i in args: arg = args[i]
456 procs.append( (i, (name, arg) ) )
457 # look for changes in pids
458 before = Set(device.pids.keys())
459 after = {}
460 for p in device.processes.values():
461 for pid, (name, args) in procs:
462 if p.match(name, args):
463 log.debug("Found process %d on %s" % (pid, p.name))
464 after[pid] = p
465 afterSet = Set(after.keys())
466 afterByConfig = reverseDict(after)
467 new = afterSet - before
468 dead = before - afterSet
469
470 # report pid restarts
471 for p in dead:
472 config = device.pids[p]
473 config.discardPid(p)
474 if afterByConfig.has_key(config):
475 self.restarted += 1
476 if config.restart:
477 summary = 'Process restarted: %s' % config.originalName
478 self.sendEvent(self.statusEvent,
479 device=device.name,
480 summary=summary,
481 component=config.originalName,
482 severity=config.severity)
483 log.info(summary)
484
485 # report alive processes
486 for config, pids in afterByConfig.items():
487 summary = "Process up: %s" % config.originalName
488 self.sendEvent(self.statusEvent,
489 device=device.name,
490 summary=summary,
491 component=config.originalName,
492 severity=Event.Clear)
493 config.status = 0
494 log.debug(summary)
495
496 for p in new:
497 log.debug("Found new %s pid %d on %s" % (
498 after[p].originalName, p, device.name))
499 device.pids = after
500
501 # no pids for a config
502 for config in device.processes.values():
503 if not afterByConfig.has_key(config):
504 self.missing += 1
505 config.status += 1
506 summary = 'Process not running: %s' % config.originalName
507 self.sendEvent(self.statusEvent,
508 device=device.name,
509 summary=summary,
510 component=config.originalName,
511 severity=config.severity)
512 log.warning(summary)
513
514 # store counts
515 pidCounts = dict([(p, 0) for p in device.processes])
516 for pids, pidConfig in device.pids.items():
517 pidCounts[pidConfig.name] += 1
518 for name, count in pidCounts.items():
519 self.save(device.name, name, 'count_count', count, 'GAUGE')
520
521
523 "Basic SNMP scan loop"
524 reactor.callLater(self.processCycleInterval, self.periodic)
525
526 if self.scanning:
527 running, unstarted, finished = self.scanning.status()
528 msg = "performance scan job not finishing: " \
529 "%d jobs running %d jobs waiting %d jobs finished" % \
530 (running, unstarted, finished)
531 log.error(msg)
532 log.error("Problem devices: %r", [
533 d.name for d in self.devices().values() \
534 if d.proxy is not None])
535 return
536
537 start = time.time()
538
539 def doPeriodic(driver):
540
541 yield self.getDevicePingIssues()
542 self.downDevices = Set([d[0] for d in driver.next()])
543
544 self.scanning = NJobs(self.parallelJobs,
545 self.oneDevice,
546 self.devices().values())
547 yield self.scanning.start()
548 driver.next()
549
550 def checkResults(results):
551 for result in results:
552 if isinstance(result , Exception):
553 log.error("Error scanning device: %s", result)
554 break
555 self.cycleTime = time.time() - start
556 self.heartbeat()
557
558 drive(doPeriodic).addCallback(checkResults)
559
560
562 "Get performance data for all the monitored Processes on a device"
563 oids = []
564 for pid, pidConf in device.pids.items():
565 oids.extend([CPU + str(pid), MEM + str(pid)])
566 if not oids:
567 return defer.succeed(([], device))
568
569 d = Chain(device.get, iter(chunk(oids, device.maxOidsPerRequest))).run()
570 d.addCallback(self.storePerfStats, device)
571 d.addErrback(self.deviceFailure, device)
572 return d
573
574
576 "Save the performance data in RRD files"
577 for success, result in results:
578 if not success:
579 self.deviceFailure(result, device)
580 return results
581 self.clearSnmpError(device.name,
582 'Process table up for device %s' % device.name)
583 parts = {}
584 for success, values in results:
585 if success:
586 parts.update(values)
587 results = parts
588 byConf = reverseDict(device.pids)
589 for pidConf, pids in byConf.items():
590 if len(pids) != 1:
591 log.info("There are %d pids by the name %s",
592 len(pids), pidConf.name)
593 pidName = pidConf.name
594 for pid in pids:
595 cpu = results.get(CPU + str(pid), None)
596 mem = results.get(MEM + str(pid), None)
597 pidConf.updateCpu(pid, cpu)
598 pidConf.updateMemory(pid, mem)
599 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(),
600 'DERIVE', min=0)
601 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024,
602 'GAUGE')
603
604
607 """
608 Save a value into an RRD file
609
610 @param deviceName: name of the remote device (ie a hostname)
611 @type deviceName: string
612 @param pidName: process id of the monitored process
613 @type pidName: string
614 @param statName: metric name
615 @type statName: string
616 @param value: data to be stored
617 @type value: number
618 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
619 @type rrdType: string
620 @param min: minimum value acceptable for this metric
621 @type min: number
622 @param max: maximum value acceptable for this metric
623 @type max: number
624 """
625 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
626 try:
627 value = self.rrd.save(path, value, rrdType, min=min, max=max)
628
629 except Exception, ex:
630 summary= "Unable to save data for process-monitor RRD %s" % \
631 path
632 self.log.critical( summary )
633
634 message= "Data was value= %s, type=%s, min=%s, max=%s" % \
635 ( value, rrdType, min, max, )
636 self.log.critical( message )
637 self.log.exception( ex )
638
639 import traceback
640 trace_info= traceback.format_exc()
641
642 evid= self.sendEvent(dict(
643 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
644 severity=Critical,
645 device=self.options.monitor,
646 eventClass=Status_Perf,
647 component="RRD",
648 pidName=pidName,
649 statName=statName,
650 path=path,
651 message=message,
652 traceback=trace_info,
653 summary=summary))
654
655 # Skip thresholds
656 return
657
658 for ev in self.thresholds.check(path, time.time(), value):
659 self.sendThresholdEvent(**ev)
660
661
663 self.scanning = None
664 devices = self.devices()
665 pids = sum(map(lambda x: len(x.pids), devices.values()))
666 log.info("Pulled process status for %d devices and %d processes",
667 len(devices), pids)
668 SnmpDaemon.heartbeat(self)
669 cycle = self.processCycleInterval
670 self.sendEvents(
671 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
672 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) +
673 self.rrdStats.gauge('pids', cycle, pids) +
674 self.rrdStats.gauge('devices', cycle, len(devices)) +
675 self.rrdStats.gauge('missing', cycle, self.missing) +
676 self.rrdStats.gauge('restarted', cycle, self.restarted) +
677 self.rrdStats.gauge('cycleTime', cycle, self.cycleTime)
678 )
679
680
683
684
685 if __name__ == '__main__':
686 from Products.ZenRRD.zenprocess import zenprocess
687 z = zenprocess()
688 z.run()
689
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Thu May 7 11:46:39 2009 | http://epydoc.sourceforge.net |