1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """zenprocess
15
16 Gets SNMP process data from a device's HOST-RESOURCES-MIB
17 and store process performance in RRD files.
18 """
19
20 import Globals
21 import logging
22 import sys
23
24 import zope.component
25 import zope.interface
26
27 from Products.ZenCollector.daemon import CollectorDaemon
28 from Products.ZenCollector.interfaces import ICollectorPreferences,\
29 IScheduledTask, IEventService, IDataService, IConfigurationListener
30 from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\
31 TaskStates
32 from Products.ZenEvents import Event
33 from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \
34 Status_Perf
35 from Products.ZenUtils.observable import ObservableMixin
36 from Products.ZenUtils.Chain import Chain
37
38
39
40
41 from Products.ZenUtils.Utils import unused
42 from Products.ZenCollector.services.config import DeviceProxy
43 unused(DeviceProxy)
44 from Products.ZenHub.services.ProcessConfig import ProcessProxy
45 unused(ProcessProxy)
46 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
47 unused(SnmpConnInfo)
48
49 from twisted.internet import defer, error
50 from twisted.python.failure import Failure
51
52 log = logging.getLogger("zen.zenprocess")
53
54
55 HOSTROOT ='.1.3.6.1.2.1.25'
56 RUNROOT = HOSTROOT + '.4'
57 NAMETABLE = RUNROOT + '.2.1.2'
58 PATHTABLE = RUNROOT + '.2.1.4'
59 ARGSTABLE = RUNROOT + '.2.1.5'
60 PERFROOT = HOSTROOT + '.5'
61 CPU = PERFROOT + '.1.1.1.'
62 MEM = PERFROOT + '.1.1.2.'
63
64
65 WRAP = 0xffffffffL
66
67
68
70 zope.interface.implements(ICollectorPreferences)
71
73 """
74 Constructs a new ZenProcessPreferences instance and provide default
75 values for needed attributes.
76 """
77 self.collectorName = "zenprocess"
78 self.defaultRRDCreateCommand = None
79 self.configCycleInterval = 20
80
81
82 self.processCycleInterval = 3 * 60
83
84
85 self.options = None
86
87
88
89 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
90
91 @property
93 """
94 defined as a property since it is needed by the interface
95 """
96 return self.processCycleInterval
97
99 """
100 Build a list of command-line options
101 """
102 parser.add_option('--showprocs',
103 dest='showprocs',
104 action="store_true",
105 default=False,
106 help="Show the list of processes found." \
107 "For debugging purposes only.")
108 parser.add_option('--showrawtables',
109 dest='showrawtables',
110 action="store_true",
111 default=False,
112 help="Show the raw SNMP processes data returned " \
113 "from the device. For debugging purposes only.")
114
115 - def postStartup(self):
117
120 self.config = deviceProxy
121
122 self._pidToProcess = {}
123
124 self._processes = {}
125 for id, process in deviceProxy.processes.items():
126 self._processes[id] = ProcessStats(process)
127
128 - def update(self, deviceProxy):
140
141 @property
143 """
144 The ProcessStats: processes configured to be monitored
145 """
146 return self._processes.values()
147
148 @property
150 """
151 returns the pids from being tracked
152 """
153 return self._pidToProcess.keys()
154
155 @property
157 """
158 returns ProcessStats for which we have a pid
159 """
160 return self._pidToProcess.values()
161
164 self._pids={}
165 self._config = processProxy
166 self.cpu = 0
167
168 - def update(self, processProxy):
169 self._config = processProxy
170
172 """
173 Override the Python default to represent ourselves as a string
174 """
175 return str(self._config.name)
176 __repr__ = __str__
177
178 - def match(self, name, args):
179 """
180 Perform exact comparisons on the process names.
181
182 @parameter name: name of a process to compare
183 @type name: string
184 @parameter args: argument list of the process
185 @type args: string
186 @return: does the name match this process's info?
187 @rtype: Boolean
188 """
189 if self._config.name is None:
190 return False
191 if self._config.ignoreParameters or not args:
192 return self._config.originalName == name
193 return self._config.originalName == '%s %s' % (name, args)
194
203
205 """
206 """
207 return self.cpu
208
213
215 """
216 """
217 return sum([x.memory for x in self._pids.values()
218 if x.memory is not None])
219
221 """
222 """
223 if pid in self._pids:
224 del self._pids[pid]
225
227 """
228 Helper class to track process id information
229 """
230 cpu = None
231 memory = None
232
234 """
235 """
236 if n is not None:
237 try:
238 n = int(n)
239 except ValueError:
240 log.warning("Bad value for CPU: '%s'", n)
241
242 if self.cpu is None or n is None:
243 self.cpu = n
244 return None
245 diff = n - self.cpu
246 if diff < 0:
247
248 n = None
249 diff = None
250 self.cpu = n
251 return diff
252
254 """
255 """
256 self.memory = n
257
259 """
260 Override the Python default to represent ourselves as a string
261 """
262 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
263 __repr__ = __str__
264
266 zope.interface.implements(IConfigurationListener)
267
268 - def deleted(self, configurationId):
269 """
270 Called when a configuration is deleted from the collector
271 """
272 log.debug('ConfigListener: configuration %s deleted' % configurationId)
273 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
274
275 - def added(self, configuration):
276 """
277 Called when a configuration is added to the collector
278 """
279 log.debug('ConfigListener: configuration %s added' % configuration)
280
281
282 - def updated(self, newConfiguration):
283 """
284 Called when a configuration is updated in collector
285 """
286 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
287
288
289
291 """
292 A scheduled task that finds instances of configure processes and collects
293 metrics on the processes
294 """
295 zope.interface.implements(IScheduledTask)
296
297
298 DEVICE_STATS = {}
299
300
301 RESTARTED = 0
302 MISSING = 0
303
304 STATE_CONNECTING = 'CONNECTING'
305 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES'
306 STATE_FETCH_PERF = 'FETCH_PERF_DATA'
307 STATE_STORE_PERF = 'STORE_PERF_DATA'
308
309 statusEvent = { 'eventClass' : Status_OSProcess,
310 'eventGroup' : 'Process' }
311
312 - def __init__(self,
313 deviceId,
314 taskName,
315 scheduleIntervalSeconds,
316 taskConfig):
317 super(ZenProcessTask, self).__init__()
318
319
320 self.name = taskName
321 self.configId = deviceId
322 self.interval = scheduleIntervalSeconds
323 self.state = TaskStates.STATE_IDLE
324
325
326 self._device = taskConfig
327 self._devId = self._device.name
328 self._manageIp = self._device.manageIp
329 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest
330
331 self._dataService = zope.component.queryUtility(IDataService)
332 self._eventService = zope.component.queryUtility(IEventService)
333 self._preferences = zope.component.queryUtility(ICollectorPreferences,
334 "zenprocess")
335 self.snmpProxy = None
336 self.snmpConnInfo = self._device.snmpConnInfo
337
338 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId)
339 if self._deviceStats:
340 self._deviceStats.update(self._device)
341 else:
342 self._deviceStats = DeviceStats(self._device)
343 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
344
346 """
347 Twisted errBack to log the exception for a single device.
348
349 @parameter reason: explanation of the failure
350 @type reason: Twisted error instance
351 """
352 msg = 'Unable to read processes on device %s' % self._devId
353 if isinstance(reason.value, error.TimeoutError):
354 log.debug('Timeout on device %s' % self._devId)
355 msg = '%s; Timeout on device' % msg
356 else:
357 msg = '%s; error: %s' % (msg, reason.getErrorMessage())
358 log.error('Error on device %s; %s' % (self._devId,
359 reason.getErrorMessage()))
360
361 self._eventService.sendEvent(self.statusEvent,
362 eventClass=Status_Snmp,
363 component="process",
364 device=self._devId,
365 summary=msg,
366 severity=Event.Error)
367 return reason
368
369
371 """
372 Callback called after a successful connect to the remote device.
373 """
374 log.debug("Connected to %s [%s]", self._devId, self._manageIp)
375 return result
376
391
393 """
394 Callback activated when the task is complete
395 """
396 if not isinstance(result, Failure):
397 log.debug("Device %s [%s] scanned successfully",
398 self._devId, self._manageIp)
399 else:
400 log.debug("Device %s [%s] scanned failed, %s",
401 self._devId, self._manageIp, result.getErrorMessage())
402
403 try:
404 self._close()
405 except Exception, e:
406 log.warn("Failed to close device %s: error %s" %
407 (self._devId, str(e)))
408
409
410 return result
411
414
416 """
417 Contact to one device and return a deferred which gathers data from
418 the device.
419
420 @return: job to scan a device
421 @rtype: Twisted deferred object
422 """
423
424
425 d = defer.maybeDeferred(self._connect)
426 d.addCallbacks(self._connectCallback, self._failure)
427 d.addCallback(self._collectCallback)
428
429
430
431 d.addBoth(self._finished)
432
433
434
435 return d
436
438 """
439 Parse the process tables and reconstruct the list of processes
440 that are on the device.
441
442 @parameter results: results of SNMP table gets
443 @type results: dictionary of dictionaries
444 @parameter device: proxy connection object
445 @type device: Device object
446 """
447
448 if not results or not results[NAMETABLE]:
449 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % \
450 self._devId
451 resolution="Verify with snmpwalk -v1 -c community %s %s" % \
452 (self._devId, NAMETABLE )
453
454 self._eventService.sendEvent(self.statusEvent,
455 device=self._devId,
456 summary=summary,
457 resolution=resolution,
458 severity=Event.Error)
459 log.info(summary)
460 return defer.fail(summary)
461
462 summary = 'Process table up for device %s' % self._devId
463 self._clearSnmpError(summary)
464
465 showrawtables = self._preferences.options.showrawtables
466 args, procs = mapResultsToDicts(showrawtables, results)
467 if self._preferences.options.showprocs:
468 self._showProcessList( procs )
469
470
471 beforePids = set(self._deviceStats.pids)
472 afterPidToProcessStats = {}
473 for pStats in self._deviceStats.processStats:
474 for pid, (name, args) in procs:
475 if pStats.match(name, args):
476 log.debug("Found process %d on %s" % (pid,
477 pStats._config.name))
478 afterPidToProcessStats[pid] = pStats
479 afterPids = set(afterPidToProcessStats.keys())
480 afterByConfig = reverseDict(afterPidToProcessStats)
481 newPids = afterPids - beforePids
482 deadPids = beforePids - afterPids
483
484
485 restarted = {}
486 for pid in deadPids:
487 procStats = self._deviceStats._pidToProcess[pid]
488 procStats.discardPid(pid)
489 if procStats in afterByConfig:
490 ZenProcessTask.RESTARTED += 1
491 pConfig = procStats._config
492 if pConfig.restart:
493 restarted[procStats] = True
494
495 summary = 'Process restarted: %s' % pConfig.originalName
496
497 self._eventService.sendEvent(self.statusEvent,
498 device=self._devId,
499 summary=summary,
500 component=pConfig.originalName,
501 severity=pConfig.severity)
502 log.info(summary)
503
504
505 for processStat in afterByConfig.keys():
506 if processStat in restarted: continue
507 summary = "Process up: %s" % processStat._config.originalName
508 self._eventService.sendEvent(self.statusEvent,
509 device=self._devId,
510 summary=summary,
511 component=processStat._config.originalName,
512 severity=Event.Clear)
513 log.debug(summary)
514
515 for pid in newPids:
516 log.debug("Found new %s pid %d on %s" % (
517 afterPidToProcessStats[pid]._config.originalName, pid,
518 self._devId))
519
520 self._deviceStats._pidToProcess = afterPidToProcessStats
521
522
523 for procStat in self._deviceStats.processStats:
524 if procStat not in afterByConfig:
525 procConfig = procStat._config
526 ZenProcessTask.MISSING += 1
527 summary = 'Process not running: %s' % procConfig.originalName
528 self._eventService.sendEvent(self.statusEvent,
529 device=self._devId,
530 summary=summary,
531 component=procConfig.originalName,
532 severity=procConfig.severity)
533 log.warning(summary)
534
535
536 pidCounts = dict([(p, 0) for p in self._deviceStats.processStats])
537 for procStat in self._deviceStats.monitoredProcs:
538 pidCounts[procStat] += 1
539 for procName, count in pidCounts.items():
540 self._save(procName, 'count_count', count, 'GAUGE')
541 return results
542
544 """
545 Get performance data for all the monitored processes on a device
546
547 @parameter device: proxy object to the remote computer
548 @type device: Device object
549 """
550 self.state = ZenProcessTask.STATE_FETCH_PERF
551
552 oids = []
553 for pid in self._deviceStats.pids:
554 oids.extend([CPU + str(pid), MEM + str(pid)])
555 if not oids:
556 return defer.succeed(([]))
557 d = Chain(self._get, iter(chunk(oids, self._maxOidsPerRequest))).run()
558 d.addCallback(self._storePerfStats)
559 d.addErrback(self._failure)
560 return d
561
563 """
564 Save the process performance data in RRD files
565
566 @parameter results: results of SNMP table gets
567 @type results: list of (success, result) tuples
568 @parameter device: proxy object to the remote computer
569 @type device: Device object
570 """
571 self.state = ZenProcessTask.STATE_STORE_PERF
572 for success, result in results:
573 if not success:
574
575 return result
576 self._clearSnmpError('Process table up for device %s' % self._devId)
577 parts = {}
578 for success, values in results:
579 if success:
580 parts.update(values)
581 results = parts
582 byConf = reverseDict(self._deviceStats._pidToProcess)
583 for procStat, pids in byConf.items():
584 if len(pids) != 1:
585 log.info("There are %d pids by the name %s",
586 len(pids), procStat._config.name)
587 procName = procStat._config.name
588 for pid in pids:
589 cpu = results.get(CPU + str(pid), None)
590 mem = results.get(MEM + str(pid), None)
591 procStat.updateCpu(pid, cpu)
592 procStat.updateMemory(pid, mem)
593 self._save(procName, 'cpu_cpu', procStat.getCpu(),
594 'DERIVE', min=0)
595 self._save(procName, 'mem_mem',
596 procStat.getMemory() * 1024, 'GAUGE')
597 return results
598
600 """
601 Perform SNMP getTable for specified OIDs
602 @parameter oids: OIDs to gather
603 @type oids: list of strings
604 @return: Twisted deferred
605 @rtype: Twisted deferred
606 """
607 repetitions = self._maxOidsPerRequest / len(oids)
608 t = self.snmpProxy.getTable(oids,
609 timeout=self.snmpConnInfo.zSnmpTimeout,
610 retryCount=self.snmpConnInfo.zSnmpTries,
611 maxRepetitions=repetitions,
612 limit=sys.maxint)
613 return t
614
615 - def _get(self, oids):
616 """
617 Perform SNMP get for specified OIDs
618
619 @parameter oids: OIDs to gather
620 @type oids: list of strings
621 @return: Twisted deferred
622 @rtype: Twisted deferred
623 """
624 return self.snmpProxy.get(oids,
625 self.snmpConnInfo.zSnmpTimeout,
626 self.snmpConnInfo.zSnmpTries)
627
629 """
630 Create a connection to the remote device
631 """
632 self.state = ZenProcessTask.STATE_CONNECTING
633 if (self.snmpProxy is None or
634 self.snmpProxy.snmpConnInfo != self.snmpConnInfo):
635 self.snmpProxy = self.snmpConnInfo.createSession()
636 self.snmpProxy.open()
637
639 """
640 Close down the connection to the remote device
641 """
642 if self.snmpProxy:
643 self.snmpProxy.close()
644 self.snmpProxy = None
645
646
648 """
649 Display the processes in a sane manner.
650
651 @parameter procs: list of (pid, (name, args))
652 @type procs: list of tuples
653 """
654 device_name = self._devId
655 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \
656 in sorted(procs)]
657 proc_list.append('')
658 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
659
676
677 - def _save(self, pidName, statName, value, rrdType, min='U'):
678 """
679 Save a value into an RRD file
680
681 @param pidName: process id of the monitored process
682 @type pidName: string
683 @param statName: metric name
684 @type statName: string
685 @param value: data to be stored
686 @type value: number
687 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
688 @type rrdType: string
689 """
690 deviceName = self._devId
691 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
692 try:
693 self._dataService.writeRRD(path, value, rrdType, min=min)
694 except Exception, ex:
695 summary= "Unable to save data for process-monitor RRD %s" % \
696 path
697 log.critical( summary )
698
699 message= "Data was value= %s, type=%s" % \
700 ( value, rrdType )
701 log.critical( message )
702 log.exception( ex )
703
704 import traceback
705 trace_info= traceback.format_exc()
706
707 self._eventService.sendEvent(dict(
708 dedupid="%s|%s" % (self._preferences.options.monitor,
709 'RRD write failure'),
710 severity=Event.Critical,
711 device=self._preferences.options.monitor,
712 eventClass=Status_Perf,
713 component="RRD",
714 pidName=pidName,
715 statName=statName,
716 path=path,
717 message=message,
718 traceback=trace_info,
719 summary=summary))
720
722 """
723 Parse the process tables and reconstruct the list of processes
724 that are on the device.
725
726 @parameter results: results of SNMP table gets ie (OID + pid, value)
727 @type results: dictionary of dictionaries
728 @return: maps relating names and pids to each other
729 @rtype: dictionary, list of tuples
730 """
731 def extract(dictionary, oid, value):
732 """
733 Helper function to extract SNMP table data.
734 """
735 pid = int(oid.split('.')[-1])
736 dictionary[pid] = value
737
738 names, paths, args = {}, {}, {}
739 if showrawtables:
740 log.info("NAMETABLE = %r", results[NAMETABLE])
741 for row in results[NAMETABLE].items():
742 extract(names, *row)
743
744 if showrawtables:
745 log.info("PATHTABLE = %r", results[PATHTABLE])
746 for row in results[PATHTABLE].items():
747 extract(paths, *row)
748
749 if showrawtables:
750 log.info("ARGSTABLE = %r", results[ARGSTABLE])
751 for row in results[ARGSTABLE].items():
752 extract(args, *row)
753
754 procs = []
755 for pid, name in names.items():
756 path = paths.get(pid, '')
757 if path and path.find('\\') == -1:
758 name = path
759 arg = args.get(pid, '')
760 procs.append( (pid, (name, arg) ) )
761
762 return args, procs
763
765 """
766 Return a dictionary with keys and values swapped:
767 all values are lists to handle the different keys mapping to the same value
768 """
769 result = {}
770 for a, v in d.items():
771 result.setdefault(v, []).append(a)
772 return result
773
775 """
776 Break lst into n-sized chunks
777 """
778 return [lst[i:i+n] for i in range(0, len(lst), n)]
779
780
781
782 if __name__ == '__main__':
783 myPreferences = ZenProcessPreferences()
784
785 myTaskFactory = SimpleTaskFactory(ZenProcessTask)
786 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
787 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener())
788 daemon.run()
789