1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__= """zenprocess
16
17 Gets SNMP process performance data and stores it in RRD files.
18 """
19
20 import logging
21 import time
22 from sets import Set
23
24 log = logging.getLogger("zen.zenprocess")
25
26 from twisted.internet import reactor, defer, error
27
28 import Globals
29 from Products.ZenUtils.Driver import drive, driveLater
30 from Products.ZenUtils.NJobs import NJobs
31 from Products.ZenUtils.Chain import Chain
32 from Products.ZenEvents import Event
33 from Products.ZenEvents.ZenEventClasses import Status_Snmp, \
34 Status_OSProcess, Critical, Status_Perf
35
36 from Products.ZenRRD.RRDUtil import RRDUtil
37 from SnmpDaemon import SnmpDaemon
38
39 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
40
41 SnmpConnInfo = SnmpConnInfo
42
43 HOSTROOT ='.1.3.6.1.2.1.25'
44 RUNROOT = HOSTROOT + '.4'
45 NAMETABLE = RUNROOT + '.2.1.2'
46 PATHTABLE = RUNROOT + '.2.1.4'
47 ARGSTABLE = RUNROOT + '.2.1.5'
48 PERFROOT = HOSTROOT + '.5'
49 CPU = PERFROOT + '.1.1.1.'
50 MEM = PERFROOT + '.1.1.2.'
51
52 DEFAULT_PARALLEL_JOBS = 10
53
54 WRAP=0xffffffffL
55
56 try:
57 sorted = sorted
58 except NameError:
60 x.sort(*args, **kw)
61 return x
62
64 """return a dictionary with keys and values swapped:
65 all values are lists to handle the different keys mapping to the same value
66 """
67 result = {}
68 for a, v in d.items():
69 result.setdefault(v, []).append(a)
70 return result
71
73 'break lst into n-sized chunks'
74 return [lst[i:i+n] for i in range(0, len(lst), n)]
75
77
79 cpu = None
80 memory = None
81
83 if n is not None:
84 try:
85 n = int(n)
86 except ValueError, er:
87 log.warning("Bad value for CPU: '%s'", n)
88
89 if self.cpu is None or n is None:
90 self.cpu = n
91 return None
92 diff = n - self.cpu
93 if diff < 0:
94
95 n = None
96 diff = None
97 self.cpu = n
98 return diff
99
102
104 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
105 __repr__ = __str__
106
107
108 from twisted.spread import pb
109 -class Process(pb.Copyable, pb.RemoteCopy):
163
164 pb.setUnjellyableForClass(Process, Process)
165
166 -class Device(pb.Copyable, pb.RemoteCopy):
224 pb.setUnjellyableForClass(Device, Device)
225
226
286
288 self.log.debug("Async delete device %s" % doomed)
289 if doomed in self._devices:
290 del self._devices[doomed]
291 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
292
312
313
324
325
329
330
341
342 - def start(self, driver):
356
357
363
364
372
373
387
388 def close(res):
389 try:
390 device.close()
391 except:
392 log.debug("Failed to close device %s" % device.name)
393
394 d = drive(go)
395 d.addBoth(close)
396 return d
397
398
407
408
422
423
425 "Parse the process tables and figure what pids are on the device"
426 if not results:
427 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name
428 self.sendEvent(self.statusEvent,
429 device=device.name,
430 summary=summary,
431 severity=Event.Error)
432 log.info(summary)
433 return
434 if device.snmpStatus > 0:
435 summary = 'Process table up for device %s' % device.name
436 self.clearSnmpError(device.name, summary)
437
438 procs = []
439 names, paths, args = {}, {}, {}
440 def extract(dictionary, oid, value):
441 pid = int(oid.split('.')[-1])
442 dictionary[pid] = value
443 for row in results[NAMETABLE].items():
444 extract(names, *row)
445 for row in results[PATHTABLE].items():
446 extract(paths, *row)
447 for row in results[ARGSTABLE].items():
448 extract(args, *row)
449 for i, path in paths.items():
450 if i not in names: continue
451 name = names[i]
452 if path and path.find('\\') == -1:
453 name = path
454 arg = ''
455 if i in args: arg = args[i]
456 procs.append( (i, (name, arg) ) )
457
458 before = Set(device.pids.keys())
459 after = {}
460 for p in device.processes.values():
461 for pid, (name, args) in procs:
462 if p.match(name, args):
463 log.debug("Found process %d on %s" % (pid, p.name))
464 after[pid] = p
465 afterSet = Set(after.keys())
466 afterByConfig = reverseDict(after)
467 new = afterSet - before
468 dead = before - afterSet
469
470
471 for p in dead:
472 config = device.pids[p]
473 config.discardPid(p)
474 if afterByConfig.has_key(config):
475 self.restarted += 1
476 if config.restart:
477 summary = 'Process restarted: %s' % config.originalName
478 self.sendEvent(self.statusEvent,
479 device=device.name,
480 summary=summary,
481 component=config.originalName,
482 severity=config.severity)
483 log.info(summary)
484
485
486 for config, pids in afterByConfig.items():
487 summary = "Process up: %s" % config.originalName
488 self.sendEvent(self.statusEvent,
489 device=device.name,
490 summary=summary,
491 component=config.originalName,
492 severity=Event.Clear)
493 config.status = 0
494 log.debug(summary)
495
496 for p in new:
497 log.debug("Found new %s pid %d on %s" % (
498 after[p].originalName, p, device.name))
499 device.pids = after
500
501
502 for config in device.processes.values():
503 if not afterByConfig.has_key(config):
504 self.missing += 1
505 config.status += 1
506 summary = 'Process not running: %s' % config.originalName
507 self.sendEvent(self.statusEvent,
508 device=device.name,
509 summary=summary,
510 component=config.originalName,
511 severity=config.severity)
512 log.warning(summary)
513
514
515 pidCounts = dict([(p, 0) for p in device.processes])
516 for pids, pidConfig in device.pids.items():
517 pidCounts[pidConfig.name] += 1
518 for name, count in pidCounts.items():
519 self.save(device.name, name, 'count_count', count, 'GAUGE')
520
521
549
550 def checkResults(results):
551 for result in results:
552 if isinstance(result , Exception):
553 log.error("Error scanning device: %s", result)
554 break
555 self.cycleTime = time.time() - start
556 self.heartbeat()
557
558 drive(doPeriodic).addCallback(checkResults)
559
560
573
574
576 "Save the performance data in RRD files"
577 for success, result in results:
578 if not success:
579 self.deviceFailure(result, device)
580 return results
581 self.clearSnmpError(device.name,
582 'Process table up for device %s' % device.name)
583 parts = {}
584 for success, values in results:
585 if success:
586 parts.update(values)
587 results = parts
588 byConf = reverseDict(device.pids)
589 for pidConf, pids in byConf.items():
590 if len(pids) != 1:
591 log.info("There are %d pids by the name %s",
592 len(pids), pidConf.name)
593 pidName = pidConf.name
594 for pid in pids:
595 cpu = results.get(CPU + str(pid), None)
596 mem = results.get(MEM + str(pid), None)
597 pidConf.updateCpu(pid, cpu)
598 pidConf.updateMemory(pid, mem)
599 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(),
600 'DERIVE', min=0)
601 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024,
602 'GAUGE')
603
604
605 - def save(self, deviceName, pidName, statName, value, rrdType,
606 min='U', max='U'):
607 """
608 Save a value into an RRD file
609
610 @param deviceName: name of the remote device (ie a hostname)
611 @type deviceName: string
612 @param pidName: process id of the monitored process
613 @type pidName: string
614 @param statName: metric name
615 @type statName: string
616 @param value: data to be stored
617 @type value: number
618 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
619 @type rrdType: string
620 @param min: minimum value acceptable for this metric
621 @type min: number
622 @param max: maximum value acceptable for this metric
623 @type max: number
624 """
625 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
626 try:
627 value = self.rrd.save(path, value, rrdType, min=min, max=max)
628
629 except Exception, ex:
630 summary= "Unable to save data for process-monitor RRD %s" % \
631 path
632 self.log.critical( summary )
633
634 message= "Data was value= %s, type=%s, min=%s, max=%s" % \
635 ( value, rrdType, min, max, )
636 self.log.critical( message )
637 self.log.exception( ex )
638
639 import traceback
640 trace_info= traceback.format_exc()
641
642 evid= self.sendEvent(dict(
643 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
644 severity=Critical,
645 device=self.options.monitor,
646 eventClass=Status_Perf,
647 component="RRD",
648 pidName=pidName,
649 statName=statName,
650 path=path,
651 message=message,
652 traceback=trace_info,
653 summary=summary))
654
655
656 return
657
658 for ev in self.thresholds.check(path, time.time(), value):
659 self.sendThresholdEvent(**ev)
660
661
663 self.scanning = None
664 devices = self.devices()
665 pids = sum(map(lambda x: len(x.pids), devices.values()))
666 log.info("Pulled process status for %d devices and %d processes",
667 len(devices), pids)
668 SnmpDaemon.heartbeat(self)
669 cycle = self.processCycleInterval
670 self.sendEvents(
671 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
672 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) +
673 self.rrdStats.gauge('pids', cycle, pids) +
674 self.rrdStats.gauge('devices', cycle, len(devices)) +
675 self.rrdStats.gauge('missing', cycle, self.missing) +
676 self.rrdStats.gauge('restarted', cycle, self.restarted) +
677 self.rrdStats.gauge('cycleTime', cycle, self.cycleTime)
678 )
679
680
683
684
685 if __name__ == '__main__':
686 from Products.ZenRRD.zenprocess import zenprocess
687 z = zenprocess()
688 z.run()
689