1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """zenprocess
15
16 Gets SNMP process data from a device's HOST-RESOURCES-MIB
17 and store process performance in RRD files.
18 """
19
20 import Globals
21 import logging
22 import sys
23
24 import zope.component
25 import zope.interface
26
27 from Products.ZenCollector.daemon import CollectorDaemon
28 from Products.ZenCollector.interfaces import ICollectorPreferences,\
29 IScheduledTask, IEventService, IDataService, IConfigurationListener
30 from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\
31 TaskStates
32 from Products.ZenEvents import Event
33 from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \
34 Status_Perf
35 from Products.ZenUtils.observable import ObservableMixin
36 from Products.ZenUtils.Chain import Chain
37
38
39
40
41 from Products.ZenUtils.Utils import unused
42 from Products.ZenCollector.services.config import DeviceProxy
43 unused(DeviceProxy)
44 from Products.ZenHub.services.ProcessConfig import ProcessProxy
45 unused(ProcessProxy)
46 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
47 unused(SnmpConnInfo)
48
49 from twisted.internet import defer, error
50 from twisted.python.failure import Failure
51
52 log = logging.getLogger("zen.zenprocess")
53
54
55 HOSTROOT ='.1.3.6.1.2.1.25'
56 RUNROOT = HOSTROOT + '.4'
57 NAMETABLE = RUNROOT + '.2.1.2'
58 PATHTABLE = RUNROOT + '.2.1.4'
59 ARGSTABLE = RUNROOT + '.2.1.5'
60 PERFROOT = HOSTROOT + '.5'
61 CPU = PERFROOT + '.1.1.1.'
62 MEM = PERFROOT + '.1.1.2.'
63
64
65 WRAP = 0xffffffffL
70 zope.interface.implements(ICollectorPreferences)
71
73 """
74 Constructs a new ZenProcessPreferences instance and provide default
75 values for needed attributes.
76 """
77 self.collectorName = "zenprocess"
78 self.defaultRRDCreateCommand = None
79 self.configCycleInterval = 20
80
81
82 self.processCycleInterval = 3 * 60
83
84
85 self.options = None
86
87
88
89 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
90
91 @property
93 """
94 defined as a property since it is needed by the interface
95 """
96 return self.processCycleInterval
97
99 """
100 Build a list of command-line options
101 """
102 parser.add_option('--showprocs',
103 dest='showprocs',
104 action="store_true",
105 default=False,
106 help="Show the list of processes found." \
107 "For debugging purposes only.")
108 parser.add_option('--showrawtables',
109 dest='showrawtables',
110 action="store_true",
111 default=False,
112 help="Show the raw SNMP processes data returned " \
113 "from the device. For debugging purposes only.")
114
115 - def postStartup(self):
117
120 self.config = deviceProxy
121
122 self._pidToProcess = {}
123
124 self._processes = {}
125 for id, process in deviceProxy.processes.items():
126 self._processes[id] = ProcessStats(process)
127
128 - def update(self, deviceProxy):
140
141 @property
143 """
144 The ProcessStats: processes configured to be monitored
145 """
146 return self._processes.values()
147
148 @property
150 """
151 returns the pids from being tracked
152 """
153 return self._pidToProcess.keys()
154
155 @property
157 """
158 returns ProcessStats for which we have a pid
159 """
160 return self._pidToProcess.values()
161
164 self._pids={}
165 self._config = processProxy
166 self.cpu = 0
167
168 - def update(self, processProxy):
170
172 """
173 Override the Python default to represent ourselves as a string
174 """
175 return str(self._config.name)
176 __repr__ = __str__
177
178 - def match(self, name, args):
179 """
180 Perform exact comparisons on the process names.
181
182 @parameter name: name of a process to compare
183 @type name: string
184 @parameter args: argument list of the process
185 @type args: string
186 @return: does the name match this process's info?
187 @rtype: Boolean
188 """
189 if self._config.name is None:
190 return False
191 if self._config.ignoreParameters or not args:
192 return self._config.originalName == name
193 return self._config.originalName == '%s %s' % (name, args)
194
203
205 """
206 """
207 return self.cpu
208
213
215 """
216 """
217 return sum([x.memory for x in self._pids.values()
218 if x.memory is not None])
219
221 """
222 """
223 if pid in self._pids:
224 del self._pids[pid]
225
227 """
228 Helper class to track process id information
229 """
230 cpu = None
231 memory = None
232
234 """
235 """
236 if n is not None:
237 try:
238 n = int(n)
239 except ValueError:
240 log.warning("Bad value for CPU: '%s'", n)
241
242 if self.cpu is None or n is None:
243 self.cpu = n
244 return None
245 diff = n - self.cpu
246 if diff < 0:
247
248 n = None
249 diff = None
250 self.cpu = n
251 return diff
252
254 """
255 """
256 self.memory = n
257
259 """
260 Override the Python default to represent ourselves as a string
261 """
262 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
263 __repr__ = __str__
264
266 zope.interface.implements(IConfigurationListener)
267
268 - def deleted(self, configurationId):
269 """
270 Called when a configuration is deleted from the collector
271 """
272 log.debug('ConfigListener: configuration %s deleted' % configurationId)
273 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
274
275 - def added(self, configuration):
276 """
277 Called when a configuration is added to the collector
278 """
279 log.debug('ConfigListener: configuration %s added' % configuration)
280
281
282 - def updated(self, newConfiguration):
283 """
284 Called when a configuration is updated in collector
285 """
286 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
287
291 """
292 A scheduled task that finds instances of configure processes and collects
293 metrics on the processes
294 """
295 zope.interface.implements(IScheduledTask)
296
297
298 DEVICE_STATS = {}
299
300
301 RESTARTED = 0
302 MISSING = 0
303
304 STATE_CONNECTING = 'CONNECTING'
305 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES'
306 STATE_FETCH_PERF = 'FETCH_PERF_DATA'
307 STATE_STORE_PERF = 'STORE_PERF_DATA'
308
309 statusEvent = { 'eventClass' : Status_OSProcess,
310 'eventGroup' : 'Process' }
311
312 - def __init__(self,
313 deviceId,
314 taskName,
315 scheduleIntervalSeconds,
316 taskConfig):
317 super(ZenProcessTask, self).__init__()
318
319
320 self.name = taskName
321 self.configId = deviceId
322 self.interval = scheduleIntervalSeconds
323 self.state = TaskStates.STATE_IDLE
324
325
326 self._device = taskConfig
327 self._devId = self._device.name
328 self._manageIp = self._device.manageIp
329 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest
330
331 self._dataService = zope.component.queryUtility(IDataService)
332 self._eventService = zope.component.queryUtility(IEventService)
333 self._preferences = zope.component.queryUtility(ICollectorPreferences,
334 "zenprocess")
335 self.snmpProxy = None
336 self.snmpConnInfo = self._device.snmpConnInfo
337
338 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId)
339 if self._deviceStats:
340 self._deviceStats.update(self._device)
341 else:
342 self._deviceStats = DeviceStats(self._device)
343 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
344
346 """
347 Twisted errBack to log the exception for a single device.
348
349 @parameter reason: explanation of the failure
350 @type reason: Twisted error instance
351 """
352 msg = 'Unable to read processes on device %s' % self._devId
353 if isinstance(reason.value, error.TimeoutError):
354 log.debug('Timeout on device %s' % self._devId)
355 msg = '%s; Timeout on device' % msg
356 else:
357 msg = '%s; error: %s' % (msg, reason.getErrorMessage())
358 log.error('Error on device %s; %s' % (self._devId,
359 reason.getErrorMessage()))
360
361 self._eventService.sendEvent(self.statusEvent,
362 eventClass=Status_Snmp,
363 component="process",
364 device=self._devId,
365 summary=msg,
366 severity=Event.Error)
367 return reason
368
369
371 """
372 Callback called after a successful connect to the remote device.
373 """
374 log.debug("Connected to %s [%s]", self._devId, self._manageIp)
375 return result
376
391
393 """
394 Callback activated when the task is complete
395 """
396 if not isinstance(result, Failure):
397 log.debug("Device %s [%s] scanned successfully",
398 self._devId, self._manageIp)
399 else:
400 log.debug("Device %s [%s] scanned failed, %s",
401 self._devId, self._manageIp, result.getErrorMessage())
402
403 try:
404 self._close()
405 except Exception, e:
406 log.warn("Failed to close device %s: error %s" %
407 (self._devId, str(e)))
408
409
410 return result
411
414
416 """
417 Contact to one device and return a deferred which gathers data from
418 the device.
419
420 @return: job to scan a device
421 @rtype: Twisted deferred object
422 """
423
424
425 d = defer.maybeDeferred(self._connect)
426 d.addCallbacks(self._connectCallback, self._failure)
427 d.addCallback(self._collectCallback)
428
429
430
431 d.addBoth(self._finished)
432
433
434
435 return d
436
438 """
439 Parse the process tables and reconstruct the list of processes
440 that are on the device.
441
442 @parameter results: results of SNMP table gets
443 @type results: dictionary of dictionaries
444 @parameter device: proxy connection object
445 @type device: Device object
446 """
447
448 if not results or not results[NAMETABLE]:
449 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % \
450 self._devId
451 resolution="Verify with snmpwalk -v1 -c community %s %s" % \
452 (self._devId, NAMETABLE )
453
454 self._eventService.sendEvent(self.statusEvent,
455 device=self._devId,
456 summary=summary,
457 resolution=resolution,
458 severity=Event.Error)
459 log.info(summary)
460 return defer.fail(summary)
461
462 summary = 'Process table up for device %s' % self._devId
463 self._clearSnmpError(summary)
464
465 showrawtables = self._preferences.options.showrawtables
466 args, procs = mapResultsToDicts(showrawtables, results)
467 if self._preferences.options.showprocs:
468 self._showProcessList( procs )
469
470
471 beforePids = set(self._deviceStats.pids)
472 afterPidToProcessStats = {}
473 for pStats in self._deviceStats.processStats:
474 for pid, (name, args) in procs:
475 if pStats.match(name, args):
476 log.debug("Found process %d on %s" % (pid,
477 pStats._config.name))
478 afterPidToProcessStats[pid] = pStats
479 afterPids = set(afterPidToProcessStats.keys())
480 afterByConfig = reverseDict(afterPidToProcessStats)
481 newPids = afterPids - beforePids
482 deadPids = beforePids - afterPids
483
484
485 restarted = {}
486 for pid in deadPids:
487 procStats = self._deviceStats._pidToProcess[pid]
488 procStats.discardPid(pid)
489 if procStats in afterByConfig:
490 ZenProcessTask.RESTARTED += 1
491 pConfig = procStats._config
492 if pConfig.restart:
493 restarted[procStats] = True
494
495 summary = 'Process restarted: %s' % pConfig.originalName
496
497 self._eventService.sendEvent(self.statusEvent,
498 device=self._devId,
499 summary=summary,
500 component=pConfig.originalName,
501 eventKey=pConfig.processClass,
502 severity=pConfig.severity)
503 log.info(summary)
504
505
506 for processStat in afterByConfig.keys():
507 if processStat in restarted: continue
508 summary = "Process up: %s" % processStat._config.originalName
509 self._eventService.sendEvent(self.statusEvent,
510 device=self._devId,
511 summary=summary,
512 component=processStat._config.originalName,
513 eventKey=processStat._config.processClass,
514 severity=Event.Clear)
515 log.debug(summary)
516
517 for pid in newPids:
518 log.debug("Found new %s pid %d on %s" % (
519 afterPidToProcessStats[pid]._config.originalName, pid,
520 self._devId))
521
522 self._deviceStats._pidToProcess = afterPidToProcessStats
523
524
525 for procStat in self._deviceStats.processStats:
526 if procStat not in afterByConfig:
527 procConfig = procStat._config
528 ZenProcessTask.MISSING += 1
529 summary = 'Process not running: %s' % procConfig.originalName
530 self._eventService.sendEvent(self.statusEvent,
531 device=self._devId,
532 summary=summary,
533 component=procConfig.originalName,
534 eventKey=procConfig.processClass,
535 severity=procConfig.severity)
536 log.warning(summary)
537
538
539 pidCounts = dict([(p, 0) for p in self._deviceStats.processStats])
540 for procStat in self._deviceStats.monitoredProcs:
541 pidCounts[procStat] += 1
542 for procName, count in pidCounts.items():
543 self._save(procName, 'count_count', count, 'GAUGE')
544 return results
545
564
566 """
567 Save the process performance data in RRD files
568
569 @parameter results: results of SNMP table gets
570 @type results: list of (success, result) tuples
571 @parameter device: proxy object to the remote computer
572 @type device: Device object
573 """
574 self.state = ZenProcessTask.STATE_STORE_PERF
575 for success, result in results:
576 if not success:
577
578 return result
579 self._clearSnmpError('Process table up for device %s' % self._devId)
580 parts = {}
581 for success, values in results:
582 if success:
583 parts.update(values)
584 results = parts
585 byConf = reverseDict(self._deviceStats._pidToProcess)
586 for procStat, pids in byConf.items():
587 if len(pids) != 1:
588 log.info("There are %d pids by the name %s",
589 len(pids), procStat._config.name)
590 procName = procStat._config.name
591 for pid in pids:
592 cpu = results.get(CPU + str(pid), None)
593 mem = results.get(MEM + str(pid), None)
594 procStat.updateCpu(pid, cpu)
595 procStat.updateMemory(pid, mem)
596 self._save(procName, 'cpu_cpu', procStat.getCpu(),
597 'DERIVE', min=0)
598 self._save(procName, 'mem_mem',
599 procStat.getMemory() * 1024, 'GAUGE')
600 return results
601
603 """
604 Perform SNMP getTable for specified OIDs
605 @parameter oids: OIDs to gather
606 @type oids: list of strings
607 @return: Twisted deferred
608 @rtype: Twisted deferred
609 """
610 repetitions = self._maxOidsPerRequest / len(oids)
611 t = self.snmpProxy.getTable(oids,
612 timeout=self.snmpConnInfo.zSnmpTimeout,
613 retryCount=self.snmpConnInfo.zSnmpTries,
614 maxRepetitions=repetitions,
615 limit=sys.maxint)
616 return t
617
618 - def _get(self, oids):
619 """
620 Perform SNMP get for specified OIDs
621
622 @parameter oids: OIDs to gather
623 @type oids: list of strings
624 @return: Twisted deferred
625 @rtype: Twisted deferred
626 """
627 return self.snmpProxy.get(oids,
628 self.snmpConnInfo.zSnmpTimeout,
629 self.snmpConnInfo.zSnmpTries)
630
632 """
633 Create a connection to the remote device
634 """
635 self.state = ZenProcessTask.STATE_CONNECTING
636 if (self.snmpProxy is None or
637 self.snmpProxy.snmpConnInfo != self.snmpConnInfo):
638 self.snmpProxy = self.snmpConnInfo.createSession()
639 self.snmpProxy.open()
640
642 """
643 Close down the connection to the remote device
644 """
645 if self.snmpProxy:
646 self.snmpProxy.close()
647 self.snmpProxy = None
648
649
651 """
652 Display the processes in a sane manner.
653
654 @parameter procs: list of (pid, (name, args))
655 @type procs: list of tuples
656 """
657 device_name = self._devId
658 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \
659 in sorted(procs)]
660 proc_list.append('')
661 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
662
679
680 - def _save(self, pidName, statName, value, rrdType, min='U'):
681 """
682 Save a value into an RRD file
683
684 @param pidName: process id of the monitored process
685 @type pidName: string
686 @param statName: metric name
687 @type statName: string
688 @param value: data to be stored
689 @type value: number
690 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
691 @type rrdType: string
692 """
693 deviceName = self._devId
694 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
695 try:
696 self._dataService.writeRRD(path, value, rrdType, min=min)
697 except Exception, ex:
698 summary= "Unable to save data for process-monitor RRD %s" % \
699 path
700 log.critical( summary )
701
702 message= "Data was value= %s, type=%s" % \
703 ( value, rrdType )
704 log.critical( message )
705 log.exception( ex )
706
707 import traceback
708 trace_info= traceback.format_exc()
709
710 self._eventService.sendEvent(dict(
711 dedupid="%s|%s" % (self._preferences.options.monitor,
712 'RRD write failure'),
713 severity=Event.Critical,
714 device=self._preferences.options.monitor,
715 eventClass=Status_Perf,
716 component="RRD",
717 pidName=pidName,
718 statName=statName,
719 path=path,
720 message=message,
721 traceback=trace_info,
722 summary=summary))
723
725 """
726 Parse the process tables and reconstruct the list of processes
727 that are on the device.
728
729 @parameter results: results of SNMP table gets ie (OID + pid, value)
730 @type results: dictionary of dictionaries
731 @return: maps relating names and pids to each other
732 @rtype: dictionary, list of tuples
733 """
734 def extract(dictionary, oid, value):
735 """
736 Helper function to extract SNMP table data.
737 """
738 pid = int(oid.split('.')[-1])
739 dictionary[pid] = value
740
741 names, paths, args = {}, {}, {}
742 if showrawtables:
743 log.info("NAMETABLE = %r", results[NAMETABLE])
744 for row in results[NAMETABLE].items():
745 extract(names, *row)
746
747 if showrawtables:
748 log.info("PATHTABLE = %r", results[PATHTABLE])
749 for row in results[PATHTABLE].items():
750 extract(paths, *row)
751
752 if showrawtables:
753 log.info("ARGSTABLE = %r", results[ARGSTABLE])
754 for row in results[ARGSTABLE].items():
755 extract(args, *row)
756
757 procs = []
758 for pid, name in names.items():
759 path = paths.get(pid, '')
760 if path and path.find('\\') == -1:
761 name = path
762 arg = args.get(pid, '')
763 procs.append( (pid, (name, arg) ) )
764
765 return args, procs
766
768 """
769 Return a dictionary with keys and values swapped:
770 all values are lists to handle the different keys mapping to the same value
771 """
772 result = {}
773 for a, v in d.items():
774 result.setdefault(v, []).append(a)
775 return result
776
778 """
779 Break lst into n-sized chunks
780 """
781 return [lst[i:i+n] for i in range(0, len(lst), n)]
782
783
784
785 if __name__ == '__main__':
786 myPreferences = ZenProcessPreferences()
787
788 myTaskFactory = SimpleTaskFactory(ZenProcessTask)
789 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
790 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener())
791 daemon.run()
792