1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__= """zenprocess
16
17 Gets SNMP process data from a device's HOST-RESOURCES-MIB
18 and store process performance in RRD files.
19 """
20
21 import sys
22 import logging
23 import time
24 from sets import Set
25
26 log = logging.getLogger("zen.zenprocess")
27
28 from twisted.internet import reactor, defer, error
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.NJobs import NJobs
34 from Products.ZenUtils.Chain import Chain
35 from Products.ZenEvents import Event
36 from Products.ZenEvents.ZenEventClasses import Status_Snmp, \
37 Status_OSProcess, Critical, Status_Perf
38
39 from Products.ZenRRD.RRDUtil import RRDUtil
40 from SnmpDaemon import SnmpDaemon
41
42 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
43
44 SnmpConnInfo = SnmpConnInfo
45
46
47 HOSTROOT ='.1.3.6.1.2.1.25'
48 RUNROOT = HOSTROOT + '.4'
49 NAMETABLE = RUNROOT + '.2.1.2'
50 PATHTABLE = RUNROOT + '.2.1.4'
51 ARGSTABLE = RUNROOT + '.2.1.5'
52 PERFROOT = HOSTROOT + '.5'
53 CPU = PERFROOT + '.1.1.1.'
54 MEM = PERFROOT + '.1.1.2.'
55
56 DEFAULT_PARALLEL_JOBS = 10
57
58
59 WRAP = 0xffffffffL
60
62 """
63 Return a dictionary with keys and values swapped:
64 all values are lists to handle the different keys mapping to the same value
65 """
66 result = {}
67 for a, v in d.items():
68 result.setdefault(v, []).append(a)
69 return result
70
72 """
73 Break lst into n-sized chunks
74 """
75 return [lst[i:i+n] for i in range(0, len(lst), n)]
76
78
80 """
81 Helper class to track process id information
82 """
83 cpu = None
84 memory = None
85
87 """
88 """
89 if n is not None:
90 try:
91 n = int(n)
92 except ValueError, er:
93 log.warning("Bad value for CPU: '%s'", n)
94
95 if self.cpu is None or n is None:
96 self.cpu = n
97 return None
98 diff = n - self.cpu
99 if diff < 0:
100
101 n = None
102 diff = None
103 self.cpu = n
104 return diff
105
107 """
108 """
109 self.memory = n
110
112 """
113 Override the Python default to represent ourselves as a string
114 """
115 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
116 __repr__ = __str__
117
118
119 -class Process(pb.Copyable, pb.RemoteCopy):
120 """
121 Track process-specific configuration data
122 """
123 name = None
124 originalName = None
125 ignoreParameters = False
126 restart = None
127 severity = Event.Warning
128 status = 0
129 cpu = 0
130 cycleTime = None
131
134
135 - def match(self, name, args):
136 """
137 Perform exact comparisons on the process names.
138
139 @parameter name: name of a process to compare
140 @type name: string
141 @parameter args: argument list of the process
142 @type args: string
143 @return: does the name match this process's info?
144 @rtype: Boolean
145 """
146 if self.name is None:
147 return False
148 if self.ignoreParameters or not args:
149 return self.originalName == name
150 return self.originalName == '%s %s' % (name, args)
151
153 """
154 Override the Python default to represent ourselves as a string
155 """
156 return str(self.name)
157 __repr__ = __str__
158
167
169 """
170 """
171 return self.cpu
172
177
179 """
180 """
181 return sum([x.memory for x in self.pids.values()
182 if x.memory is not None])
183
185 """
186 """
187 if pid in self.pids:
188 del self.pids[pid]
189
200
201 pb.setUnjellyableForClass(Process, Process)
202
203
204 -class Device(pb.Copyable, pb.RemoteCopy):
205 """
206 Track device data
207 """
208 name = ''
209 snmpConnInfo = None
210 proxy = None
211 lastScan = 0.
212 snmpStatus = 0
213 lastChange = 0
214 maxOidsPerRequest = 40
215
217
218 self.processes = {}
219
220 self.pids = {}
221
230
231 - def close(self, unused=None):
232 """
233 Close down the connection to the remote device
234 """
235 if self.proxy:
236 self.proxy.close()
237 self.proxy = None
238 return unused
239
256
257
258 - def get(self, oids):
259 """
260 Perform SNMP get for specified OIDs
261
262 @parameter oids: OIDs to gather
263 @type oids: list of strings
264 @return: Twisted deferred
265 @rtype: Twisted deferred
266 """
267 return self.proxy.get(oids,
268 self.snmpConnInfo.zSnmpTimeout,
269 self.snmpConnInfo.zSnmpTries)
270
271
273 """
274 Perform SNMP getTable for specified OIDs
275
276 @parameter oids: OIDs to gather
277 @type oids: list of strings
278 @return: Twisted deferred
279 @rtype: Twisted deferred
280 """
281 repetitions = self.maxOidsPerRequest / len(oids)
282 t = self.proxy.getTable(oids,
283 timeout=self.snmpConnInfo.zSnmpTimeout,
284 retryCount=self.snmpConnInfo.zSnmpTries,
285 maxRepetitions=repetitions,
286 limit=sys.maxint)
287 return t
288 pb.setUnjellyableForClass(Device, Device)
289
290
364
366 """
367 Called from zenhub to remove a device from our configuration
368
369 @parameter doomed: device to delete
370 @type doomed: string
371 """
372 self.log.debug("zenhub asks us to delete device %s" % doomed)
373 if doomed in self._devices:
374 del self._devices[doomed]
375 self.clearSnmpError(doomed, "Device %s removed from SNMP collection")
376
378 """
379 Called from zenhub to update the devices to monitor
380
381 @parameter devices: devices to monitor
382 @type devices: list of (device, changetime) tuples
383 """
384 self.log.debug("Received updated device list from zenhub %s" % devices)
385 doomed = Set(self._devices.keys())
386 updated = []
387 for device, lastChange in devices:
388
389 if self.options.device and \
390 device != self.options.device:
391 self.log.debug("Ignoring update for %s as we only want %s",
392 device, self.options.device)
393 continue
394
395 cfg = self._devices.get(device, None)
396 if not cfg or self._devices[device].lastChange < lastChange:
397 updated.append(device)
398 doomed.discard(device)
399
400 if updated:
401 log.info("Fetching the config for %s", updated)
402 d = self.model().callRemote('getOSProcessConf', devices)
403 d.addCallback(self.updateDevices, updated)
404 d.addErrback(self.error)
405
406 if doomed:
407 log.info("Removing %s", doomed)
408 for device in doomed:
409 del self._devices[device]
410 self.clearSnmpError(device, "device %s removed" % device)
411
412
432
433
435 """
436 Twisted remote callback, to allow zenhub to remotely update
437 this daemon.
438
439 @parameter cfg: configuration information returned from zenhub
440 @type cfg: object
441 """
442 self.log.debug("Configuration update from zenhub for %s", cfg.name)
443 self.updateDevices([cfg],[])
444
445
447 """
448 Called when the zenhub service getSnmpStatus completes.
449
450 @parameter cfgs: configuration information returned from zenhub
451 @type cfgs: list of objects
452 @parameter fetched: names we want zenhub to return information about
453 @type fetched: list of strings
454 """
455 received = Set()
456 for cfg in cfgs:
457 received.add(cfg.name)
458 d = self._devices.setdefault(cfg.name, cfg)
459 d.updateConfig(cfg)
460 self.thresholds.updateForDevice(cfg.name, cfg.thresholds)
461
462 for doomed in Set(fetched) - received:
463 if doomed in self._devices:
464 del self._devices[doomed]
465
466 - def start(self, driver):
483
484
486 """
487 Called when the zenhub service getSnmpStatus completes.
488
489 @parameter updates: List of names and error counts
490 @type updates: list of (string, int)
491 """
492 for name, count in updates:
493 d = self._devices.get(name)
494 if d:
495 d.snmpStatus = count
496
497
511
512
514 """
515 Contact one device and return a deferred which gathers data from
516 the device.
517
518 @parameter device: proxy object to the remote computer
519 @type device: Device object
520 @return: job to scan a device
521 @rtype: Twisted deferred object
522 """
523 def go(driver):
524 """
525 Generator object to gather information from a device.
526 """
527 try:
528 device.open()
529 yield self.scanDevice(device)
530 driver.next()
531
532
533 if device.snmpStatus == 0:
534 yield self.fetchPerf(device)
535 driver.next()
536 else:
537 log.warn("Failed to find performance data for %s",
538 device.name)
539 except:
540 log.debug('Failed to scan device %s' % device.name)
541
542 def close(res):
543 """
544 Twisted closeBack and errBack function which closes any
545 open connections.
546 """
547 try:
548 device.close()
549 except:
550 log.debug("Failed to close device %s" % device.name)
551
552 d = drive(go)
553 d.addBoth(close)
554 return d
555
556
572
573
594
596 """
597 Parse the process tables and reconstruct the list of processes
598 that are on the device.
599
600 @parameter results: results of SNMP table gets ie (OID + pid, value)
601 @type results: dictionary of dictionaries
602 @return: maps relating names and pids to each other
603 @rtype: dictionary, dictionary, dictionary, list of tuples
604 """
605 def extract(dictionary, oid, value):
606 """
607 Helper function to extract SNMP table data.
608 """
609 pid = int(oid.split('.')[-1])
610 dictionary[pid] = value
611
612 names, paths, args = {}, {}, {}
613 if self.options.showrawtables:
614 log.info("NAMETABLE = %r", results[NAMETABLE])
615 for row in results[NAMETABLE].items():
616 extract(names, *row)
617
618 if self.options.showrawtables:
619 log.info("PATHTABLE = %r", results[PATHTABLE])
620 for row in results[PATHTABLE].items():
621 extract(paths, *row)
622
623 if self.options.showrawtables:
624 log.info("ARGSTABLE = %r", results[ARGSTABLE])
625 for row in results[ARGSTABLE].items():
626 extract(args, *row)
627
628 procs = []
629 for pid, name in names.items():
630 path = paths.get(pid, '')
631 if path and path.find('\\') == -1:
632 name = path
633 arg = args.get(pid, '')
634 procs.append( (pid, (name, arg) ) )
635
636 return names, paths, args, procs
637
639 """
640 Display the processes in a sane manner.
641
642 @parameter device_name: name of the device
643 @type device_name: string
644 @parameter procs: list of (pid, (name, args))
645 @type procs: list of tuples
646 """
647 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \
648 in sorted(procs)]
649 proc_list.append('')
650 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
651
653 """
654 Parse the process tables and reconstruct the list of processes
655 that are on the device.
656
657 @parameter results: results of SNMP table gets
658 @type results: dictionary of dictionaries
659 @parameter device: proxy connection object
660 @type device: Device object
661 """
662 if not results or not results[NAMETABLE]:
663 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % device.name
664 resolution="Verify with snmpwalk -v1 -c community %s %s" % (
665 device.name, NAMETABLE )
666 self.sendEvent(self.statusEvent,
667 device=device.name,
668 summary=summary,
669 resolution=resolution,
670 severity=Event.Error)
671 log.info(summary)
672 return
673 if device.snmpStatus > 0:
674 summary = 'Process table up for device %s' % device.name
675 self.clearSnmpError(device.name, summary)
676
677 names, paths, args, procs = self.mapResultsToDicts(results)
678 if self.options.showprocs:
679 self.showProcessList(device.name, procs)
680
681
682 before = Set(device.pids.keys())
683 after = {}
684 for p in device.processes.values():
685 for pid, (name, args) in procs:
686 if p.match(name, args):
687 log.debug("Found process %d on %s" % (pid, p.name))
688 after[pid] = p
689 afterSet = Set(after.keys())
690 afterByConfig = reverseDict(after)
691 new = afterSet - before
692 dead = before - afterSet
693
694
695 restarted = {}
696 for p in dead:
697 config = device.pids[p]
698 config.discardPid(p)
699 if config in afterByConfig:
700 self.restarted += 1
701 if config.restart:
702 restarted[config] = True
703 summary = 'Process restarted: %s' % config.originalName
704 self.sendEvent(self.statusEvent,
705 device=device.name,
706 summary=summary,
707 component=config.originalName,
708 severity=config.severity)
709 log.info(summary)
710
711
712 for config, pids in afterByConfig.items():
713 if config in restarted: continue
714 summary = "Process up: %s" % config.originalName
715 self.sendEvent(self.statusEvent,
716 device=device.name,
717 summary=summary,
718 component=config.originalName,
719 severity=Event.Clear)
720 config.status = 0
721 log.debug(summary)
722
723 for p in new:
724 log.debug("Found new %s pid %d on %s" % (
725 after[p].originalName, p, device.name))
726 device.pids = after
727
728
729 for config in device.processes.values():
730 if config not in afterByConfig:
731 self.missing += 1
732 config.status += 1
733 summary = 'Process not running: %s' % config.originalName
734 self.sendEvent(self.statusEvent,
735 device=device.name,
736 summary=summary,
737 component=config.originalName,
738 severity=config.severity)
739 log.warning(summary)
740
741
742 pidCounts = dict([(p, 0) for p in device.processes])
743 for pids, pidConfig in device.pids.items():
744 pidCounts[pidConfig.name] += 1
745 for name, count in pidCounts.items():
746 self.save(device.name, name, 'count_count', count, 'GAUGE')
747
748
750 """
751 Main loop that drives all other processing.
752 """
753 reactor.callLater(self.processCycleInterval, self.periodic)
754
755 if self.scanning:
756 running, unstarted, finished = self.scanning.status()
757 runningDevices = [ d.name for d in self.devices().values() \
758 if d.proxy is not None]
759
760 if runningDevices or unstarted > 0:
761 log.warning("Process scan not finishing: "
762 "%d running, %d waiting, %d finished" % (
763 running, unstarted, finished))
764 log.warning("Problem devices: %r", runningDevices)
765 return
766
767 start = time.time()
768
769 def doPeriodic(driver):
770 """
771 Generator function to create deferred jobs.
772 """
773 yield self.getDevicePingIssues()
774 self.downDevices = Set([d[0] for d in driver.next()])
775
776 self.scanning = NJobs(self.parallelJobs,
777 self.oneDevice,
778 self.devices().values())
779 yield self.scanning.start()
780 driver.next()
781
782 def checkResults(results):
783 """
784 Process the results from all deferred objects.
785 """
786 for result in results:
787 if isinstance(result , Exception):
788 log.error("Error scanning device: %s", result)
789 break
790 self.cycleTime = time.time() - start
791 self.heartbeat()
792
793 drive(doPeriodic).addCallback(checkResults)
794
795
813
814
816 """
817 Save the process performance data in RRD files
818
819 @parameter results: results of SNMP table gets
820 @type results: list of (success, result) tuples
821 @parameter device: proxy object to the remote computer
822 @type device: Device object
823 """
824 for success, result in results:
825 if not success:
826 self.deviceFailure(result, device)
827 return results
828 self.clearSnmpError(device.name,
829 'Process table up for device %s' % device.name)
830 parts = {}
831 for success, values in results:
832 if success:
833 parts.update(values)
834 results = parts
835 byConf = reverseDict(device.pids)
836 for pidConf, pids in byConf.items():
837 if len(pids) != 1:
838 log.info("There are %d pids by the name %s",
839 len(pids), pidConf.name)
840 pidName = pidConf.name
841 for pid in pids:
842 cpu = results.get(CPU + str(pid), None)
843 mem = results.get(MEM + str(pid), None)
844 pidConf.updateCpu(pid, cpu)
845 pidConf.updateMemory(pid, mem)
846 self.save(device.name, pidName, 'cpu_cpu', pidConf.getCpu(),
847 'DERIVE', min=0)
848 self.save(device.name, pidName, 'mem_mem', pidConf.getMemory() * 1024,
849 'GAUGE')
850
851
852 - def save(self, deviceName, pidName, statName, value, rrdType,
853 min='U', max='U'):
854 """
855 Save a value into an RRD file
856
857 @param deviceName: name of the remote device (ie a hostname)
858 @type deviceName: string
859 @param pidName: process id of the monitored process
860 @type pidName: string
861 @param statName: metric name
862 @type statName: string
863 @param value: data to be stored
864 @type value: number
865 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
866 @type rrdType: string
867 @param min: minimum value acceptable for this metric
868 @type min: number
869 @param max: maximum value acceptable for this metric
870 @type max: number
871 """
872 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
873 try:
874 value = self.rrd.save(path, value, rrdType, min=min, max=max)
875
876 except Exception, ex:
877 summary= "Unable to save data for process-monitor RRD %s" % \
878 path
879 self.log.critical( summary )
880
881 message= "Data was value= %s, type=%s, min=%s, max=%s" % \
882 ( value, rrdType, min, max, )
883 self.log.critical( message )
884 self.log.exception( ex )
885
886 import traceback
887 trace_info= traceback.format_exc()
888
889 evid= self.sendEvent(dict(
890 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
891 severity=Critical,
892 device=self.options.monitor,
893 eventClass=Status_Perf,
894 component="RRD",
895 pidName=pidName,
896 statName=statName,
897 path=path,
898 message=message,
899 traceback=trace_info,
900 summary=summary))
901
902
903 return
904
905 for ev in self.thresholds.check(path, time.time(), value):
906 self.sendThresholdEvent(**ev)
907
908
910 """
911 Twisted keep-alive mechanism to ensure that
912 we're still connected to zenhub.
913 """
914 self.scanning = None
915 devices = self.devices()
916 pids = sum(map(lambda x: len(x.pids), devices.values()))
917 log.info("Pulled process status for %d devices and %d processes",
918 len(devices), pids)
919 SnmpDaemon.heartbeat(self)
920 cycle = self.processCycleInterval
921 self.sendEvents(
922 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
923 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle()) +
924 self.rrdStats.gauge('pids', cycle, pids) +
925 self.rrdStats.gauge('devices', cycle, len(devices)) +
926 self.rrdStats.gauge('missing', cycle, self.missing) +
927 self.rrdStats.gauge('restarted', cycle, self.restarted) +
928 self.rrdStats.gauge('cycleTime', cycle, self.cycleTime)
929 )
930
931
933 """
934 Gather our configuration and start collecting status information.
935 Called after connected to the zenhub service.
936 """
937 drive(self.start).addCallbacks(self.periodic, self.errorStop)
938
939
941 """
942 Build a list of command-line options
943 """
944 SnmpDaemon.buildOptions(self)
945 self.parser.add_option('--showprocs',
946 dest='showprocs',
947 action="store_true",
948 default=False,
949 help="Show the list of processes found." \
950 "For debugging purposes only.")
951 self.parser.add_option('--showrawtables',
952 dest='showrawtables',
953 action="store_true",
954 default=False,
955 help="Show the raw SNMP processes data returned " \
956 "from the device. For debugging purposes only.")
957
958
959 if __name__ == '__main__':
960
961 from Products.ZenRRD.zenprocess import zenprocess
962 z = zenprocess()
963 z.run()
964