1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__="""zenperfsnmp
16
17 Gets SNMP performance data and stores it in RRD files.
18
19 """
20
21 import os
22 import time
23 import logging
24 log = logging.getLogger("zen.zenperfsnmp")
25
26 import copy
27 from sets import Set
28 import cPickle
29
30 from twisted.internet import reactor, defer, error
31 from twisted.python import failure
32
33 import Globals
34 from Products.ZenUtils.Utils import unused
35 from Products.ZenUtils.Chain import Chain
36 from Products.ZenUtils.Driver import drive, driveLater
37 from Products.ZenModel.PerformanceConf import performancePath
38 from Products.ZenEvents import Event
39 from Products.ZenEvents.ZenEventClasses \
40 import Perf_Snmp, Status_Snmp, Status_Perf
41 from Products.ZenEvents.ZenEventClasses import Critical, Clear
42
43 from Products.ZenRRD.RRDUtil import RRDUtil
44 from SnmpDaemon import SnmpDaemon
45
46 from FileCleanup import FileCleanup
47
48
49 from Products.ZenHub.services.PerformanceConfig import PerformanceConfig
50 unused(PerformanceConfig)
51
52 MAX_OIDS_PER_REQUEST = 40
53 MAX_SNMP_REQUESTS = 20
54 DEVICE_LOAD_CHUNK_SIZE = 20
55 CYCLES_TO_WAIT_FOR_RESPONSE = 2
56
58 """
59 Wrapper around makedirs that sanity checks before running
60 """
61 if os.path.exists(dir):
62 return
63
64 try:
65 os.makedirs(dir, 0750)
66 except Exception, ex:
67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68
69
71 """
72 Wrapper around the standard function to open a file and read its contents
73 """
74 if os.path.exists(fname):
75 fp = file(fname, 'rb')
76 try:
77 return fp.read()
78 finally:
79 fp.close()
80 return ''
81
82
84 """
85 Wrapper around the standard function to open a file and write data
86 """
87 makeDirs(os.path.dirname(fname))
88
89 try:
90 fp = open(fname, 'wb')
91 try:
92 fp.write(data)
93 finally:
94 fp.close()
95
96 except Exception, ex:
97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98
99
101 """
102 Wrapper around the standard function to delete a file
103 """
104 if os.path.exists(fname):
105 os.unlink(fname)
106
108 """
109 Break lst into n-sized chunks
110 """
111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112
113 try:
114 sorted = sorted
115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """
118 Keep things sane in a pre-python 2.4 environment
119 """
120 lst.sort(*args, **kw)
121 return lst
122
124 """
125 The first element of every item in a sequence
126 """
127 return [item[0] for item in lst]
128
130 """
131 Execute the function with arguments and keywords.
132 If there is an exception, log it using the given
133 logging function 'alog'.
134 """
135 try:
136 return function(*args, **kw)
137 except Exception, ex:
138 alog.exception(ex)
139 raise ex
140
141
142
143 from twisted.spread import pb
154
155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig)
156
157
159 """
160 Keep track of the status of many parallel requests
161 """
162
164 """
165 Initializer
166 """
167 self.daemon = daemon
168 self.reset()
169
171 """
172 Reset instance variables to intial values
173 """
174
175
176 self._numSucceeded = 0
177
178
179
180 self._numPrevSucceeded = 0
181
182
183 self._numFailed = 0
184
185
186
187 self._numPrevFailed = 0
188
189
190 self._startTime = 0
191
192
193 self._stopTime = 0
194
195
196 self._deferred = defer.Deferred()
197
198
199 self._devicesToQueryThisCycle = Set()
200
201
202
203 self._prevQueriesAndAges = {}
204
205
206
207
208 self._queue = Set()
209
210
211
212 self._reported = Set()
213
214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """
217 Record our start time, and return a deferred for our devices
218
219 @type devicesToQuery: iterable
220 @param devicesToQuery: names of devices to poll
221 @type prevQueriesAndAges: dict
222 @param prevQueriesAndAges: devices with outstanding quests
223 @return: deferred
224 """
225 self.reset()
226 self._startTime = time.time()
227
228
229
230 self._devicesToQueryThisCycle = \
231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys())
232 self._prevQueriesAndAges = prevQueriesAndAges
233 self._queue = copy.copy(self._devicesToQueryThisCycle)
234 self._checkFinished()
235 return self._deferred
236
237
238 - def record(self, name, success):
239 """
240 Record success or failure
241
242 @type name: string
243 @param name: name of device reporting results
244 @type success: boolean
245 @param success: True if query succeeded, False otherwise.
246 """
247 if name in self._reported:
248 log.error("Device %s is reporting more than once", name)
249 return
250 self._reported.add(name)
251 if name in self._devicesToQueryThisCycle:
252 if success:
253 self._numSucceeded += 1
254 else:
255 self._numFailed += 1
256 self._checkFinished()
257 elif name in self._prevQueriesAndAges:
258 if success:
259 self._numPrevSucceeded += 1
260 else:
261 self._numPrevFailed += 1
262 else:
263 log.debug('Unrecognized device reporting: %s' % name)
264
266 """
267 Determine the stopping point and log our current stats
268 """
269 if self.finished():
270 self._stopTime = time.time()
271 if not self._deferred.called:
272 self._deferred.callback(self)
273 self.daemon.heartbeat()
274 info = self.stats()
275 log.info(
276 'success:%d ' % info['numSucceeded'] +
277 'fail:%d ' % info['numFailed'] +
278 'pending:%d ' % info['numInProcess'] +
279 'todo:%d ' % info['queueSize'])
280
281
283 """
284 Determine if we have finished, disregarding devices that were queried
285 in a previous cycle and still haven't reported back.
286 """
287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
290 """
291 Return a dictionary with stats for this cycle:
292 numSucceeded - queries made this cycle and reported back success
293 numPrevSucceeded - queries from a prev cycle reported back success
294 numFailed - queries made this cycle reported back failure
295 numPrevFailed - queries made prev cycle reported back failure
296 startTime - timestamp when this cycle started
297 stopTime - timestamp when this cycle stopped
298 age - time cycle took to run or current age (if still running)
299 queueSize - num of devices not queried yet
300 numInProcess - num queried this cycle not reported back yet
301 numPrevInProcess - num queried prev cycle still not reported back
302 numReported - number reported back from this or previous cycles
303 """
304 return dict(
305 numSucceeded = self._numSucceeded,
306 numPrevSucceeded = self._numPrevSucceeded,
307 numFailed = self._numFailed,
308 numPrevFailed = self._numPrevFailed,
309 startTime = self._startTime,
310 stopTime = self._stopTime,
311 age = self._stopTime and (self._stopTime - self._startTime) \
312 or (time.time() - self._startTime),
313 queueSize = len(self._queue),
314 numInProcess = len(self.inProcess()),
315 numPrevInProcess = len(self.prevInProcess()),
316 numReported = len(self._reported)
317 )
318
320 """
321 Return the name of the devices that have been queried this cycle
322 but from whom no response has been received.
323 """
324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
327 """
328 Return the names of the devices that were queried prior to this cycle
329 and have not yet reported.
330 """
331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
334 """
335 Return the names of the devices that have yet to be queried.
336 """
337 return self._queue
338
340 """
341 Pop a device to be queried from the queue
342 """
343 return self._queue.pop()
344
346 """
347 Return a dictionary with the device and age of each query from this
348 or previous cycles that has not yet responeded.
349 """
350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items()
351 if d not in self._reported])
352 waiting.update(dict([(d, self._startTime)
353 for d in self.inProcess()]))
354 return waiting
355
356
358 """
359 Track and report SNMP status failures
360 """
361
362 snmpStatusEvent = {'eventClass': Status_Snmp,
363 'component': 'snmp',
364 'eventGroup': 'SnmpTest'}
365
366
368 """
369 Initializer
370 """
371 self.count = snmpState
372
373
393
394
395
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
406
408 """
409 Periodically query all devices for SNMP values to archive in RRD files
410 """
411
412
413 maxRrdFileAge = 30 * (24*60*60)
414 perfsnmpConfigInterval = 20*60
415 perfsnmpCycleInterval = 5*60
416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',)
417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig']
418
420 """
421 Create any base performance directories (if necessary),
422 load cached configuration data and clean up any old RRD files
423 (if specified by --checkAgingFiles)
424 """
425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts)
426 self.status = None
427 self.proxies = {}
428 self.unresponsiveDevices = Set()
429 self.snmpOidsRequested = 0
430
431 self.log.info( "Initializing daemon..." )
432
433 perfRoot = performancePath('')
434 makeDirs(perfRoot)
435
436 if self.options.cacheconfigs:
437 self.loadConfigs()
438
439 self.oldFiles = Set()
440
441
442 if self.options.checkagingfiles:
443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$',
444 24 * 60 * 60,
445 frequency=60)
446 self.oldCheck.process = self.reportOldFile
447 self.oldCheck.start()
448
449
450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$',
451 self.maxRrdFileAge,
452 frequency=90*60)
453 self.fileCleanup.process = self.cleanup
454 self.fileCleanup.start()
455
456
458 """
459 Return the path to the pickle file for a device
460 """
461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462
463
464
466 """
467 Read cached configuration values from pickle files at startup.
468
469 NB: We cache in pickles to get a full collect cycle, because
470 loading the initial config can take several minutes.
471 """
472 self.log.info( "Gathering cached configuration information" )
473
474 base = performancePath('Devices')
475 makeDirs(base)
476 root, ds, fs = os.walk(base).next()
477 for d in ds:
478 pickle_name= self.pickleName(d)
479 config = read( pickle_name )
480 if config:
481 try:
482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name )
483 data= cPickle.loads(config)
484 self.updateDeviceConfig( data )
485
486 except Exception, ex:
487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) )
488 try:
489 os.unlink( pickle_name )
490 except Exception, ex:
491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492
493
494
496 """
497 Delete an old RRD file
498 """
499 self.log.warning("Deleting old RRD file: %s", fullPath)
500 os.unlink(fullPath)
501 self.oldFiles.discard(fullPath)
502
503
505 """
506 Add an RRD file to the list of files to be removed
507 """
508 self.oldFiles.add(fullPath)
509
511 """
512 Gather the list of devices from zenhub, update all devices config
513 in the list of devices, and remove any devices that we know about,
514 but zenhub doesn't know about.
515
516 NB: This is callable from within zenhub.
517 """
518 SnmpDaemon.remote_updateDeviceList(self, devices)
519
520 survivors = []
521 doomed = Set(self.proxies.keys())
522 for device, lastChange in devices:
523 doomed.discard(device)
524 proxy = self.proxies.get(device)
525 if not proxy or proxy.lastChange < lastChange:
526 survivors.append(device)
527
528 log.info("Deleting %s", doomed)
529 for d in doomed:
530 del self.proxies[d]
531
532 if survivors:
533 log.info("Fetching configs: %s", survivors)
534 d = self.model().callRemote('getDevices', survivors)
535 d.addCallback(self.updateDeviceList, survivors)
536 d.addErrback(self.error)
537
538
539
541 """
542 Periodically ask the Zope server for basic configuration data.
543 """
544
545 now = time.time()
546
547 log.info("Fetching property items...")
548 yield self.model().callRemote('propertyItems')
549 self.setPropertyItems(driver.next())
550
551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig)
552
553 log.info("Getting threshold classes...")
554 yield self.model().callRemote('getThresholdClasses')
555 self.remote_updateThresholdClasses(driver.next())
556
557 devices = []
558 if self.options.device:
559 devices = [self.options.device]
560 else:
561 log.info("Checking for outdated configs...")
562 current = [(k, v.lastChange) for k, v in self.proxies.items()]
563 yield self.model().callRemote('getDeviceUpdates', current)
564 devices = driver.next()
565
566 log.info("Fetching configs for %s", repr(devices)[0:800]+'...')
567 yield self.model().callRemote('getDevices', devices)
568 updatedDevices = driver.next()
569
570 log.info("Fetching default RRDCreateCommand...")
571 yield self.model().callRemote('getDefaultRRDCreateCommand')
572 createCommand = driver.next()
573
574 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval)
575
576 log.info( "Getting collector thresholds..." )
577 yield self.model().callRemote('getCollectorThresholds')
578 self.rrdStats.config(self.options.monitor, self.name, driver.next(),
579 createCommand)
580
581 log.info("Fetching SNMP status...")
582 yield self.model().callRemote('getSnmpStatus', self.options.device)
583 self.updateSnmpStatus(driver.next())
584
585
586 log.info("Initiating incremental device load")
587 if self.options.cycle:
588 d = self.updateDeviceList(updatedDevices, devices)
589 def report(result):
590 """
591 Twisted deferred errBack to check for errors
592 """
593 if result:
594 log.error("Error loading devices: %s", result)
595 d.addBoth(report)
596 else:
597
598 yield self.updateDeviceList(updatedDevices, devices)
599 driver.next()
600
601 self.sendEvents(self.rrdStats.gauge('configTime',
602 self.configCycleInterval * 60,
603 time.time() - now))
604
605
607 """
608 Update the config for devices
609 """
610
611 def fetchDevices(driver):
612 """
613 An iterable to go over the list of devices
614 """
615 deviceNames = Set()
616 length = len(responses)
617 log.debug("Fetching configs for %d devices", length)
618 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE):
619 log.debug("Fetching config for %s", devices)
620 yield self.model().callRemote('getDeviceConfigs', devices)
621 try:
622 for response in driver.next():
623 self.updateDeviceConfig(response)
624 except Exception, ex:
625 log.warning("Error loading config for devices %s" % devices)
626 for d in devices:
627 deviceNames.add(d)
628 log.debug("Finished fetching configs for %d devices", length)
629
630
631 doomed = Set(requested) - deviceNames
632 if self.options.device:
633 self.log.debug('Gathering performance data for %s ' %
634 self.options.device)
635 doomed = Set(self.proxies.keys())
636 doomed.discard(self.options.device)
637 for name in doomed:
638 self.log.info('Removing device %s' % name)
639 if name in self.proxies:
640 del self.proxies[name]
641
642
643 config = self.pickleName(name)
644 unlink(config)
645
646
647 ips = Set()
648 for name, proxy in self.proxies.items():
649 if proxy.snmpConnInfo.manageIp in ips:
650 log.warning("Warning: device %s has a duplicate address %s",
651 name, proxy.snmpConnInfo.manageIp)
652 ips.add(proxy.snmpConnInfo.manageIp)
653 self.log.info('Configured %d of %d devices',
654 len(deviceNames), len(self.proxies))
655 yield defer.succeed(None)
656 return drive(fetchDevices)
657
658
659
661 """
662 Create or update proxy
663
664 @parameter deviceName: device name known by zenhub
665 @type deviceName: string
666 @parameter snmpConnInfo: object information passed from zenhub
667 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py
668 @return: connection information from the proxy
669 @rtype: SnmpConnInfo class
670 """
671 p = self.proxies.get(deviceName, None)
672 if not p:
673 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
674 allowCache=True)
675 p.oidMap = {}
676 p.snmpStatus = SnmpStatus(0)
677 p.singleOidMode = False
678 p.lastChange = 0
679
680 if p.snmpConnInfo != snmpConnInfo:
681 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
682 allowCache=True)
683 t.oidMap = p.oidMap
684 t.snmpStatus = p.snmpStatus
685 t.singleOidMode = p.singleOidMode
686 t.lastChange = p.lastChange
687 p = t
688
689 return p
690
691
692
694 """
695 Update the SNMP failure counts from Status database
696 """
697 countMap = dict(status)
698 for name, proxy in self.proxies.items():
699 proxy.snmpStatus.count = countMap.get(name, 0)
700
701
703 """
704 Allows zenhub to delete a device from our configuration
705 """
706 if self.options.device and doomed != self.options.device:
707 return
708
709 self.log.debug("Async delete device %s" % doomed)
710 if doomed in self.proxies:
711 del self.proxies[doomed]
712
713
715 """
716 Allows zenhub to update our device configuration
717 """
718 if self.options.device and snmpTargets.device != self.options.device:
719 return
720
721 self.log.debug("Device updates from zenhub received")
722 self.updateDeviceConfig(snmpTargets)
723
724
725
727 """
728 Examine the given device configuration, and if newer update the device
729 as well as its pickle file.
730 If no SNMP proxy created for the device, create one.
731 """
732 self.log.debug("Received config for %s", configs.device)
733 p = self.updateAgentProxy(configs.device, configs.connInfo)
734
735 if self.options.cacheconfigs:
736 p.lastChange = configs.lastChangeTime
737 data= cPickle.dumps(configs)
738 pickle_name= self.pickleName(configs.device)
739 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name )
740 write(pickle_name, data)
741
742
743 oidMap, p.oidMap = p.oidMap, {}
744 for name, oid, path, dsType, createCmd, minmax in configs.oids:
745 createCmd = createCmd.strip()
746 oid = str(oid).strip('.')
747
748 if oid:
749 oid = '.' + oid
750 oid_status = oidMap.setdefault(oid, OidData())
751 oid_status.update(name, path, dsType, createCmd, minmax)
752 p.oidMap[oid] = oid_status
753
754 self.proxies[configs.device] = p
755 self.thresholds.updateForDevice(configs.device, configs.thresholds)
756
757
758
770
771
773 """
774 Remember all the unresponsive devices
775 """
776 if isinstance(arg, list):
777 deviceList = arg
778 self.log.debug('unresponsive devices: %r' % deviceList)
779 self.unresponsiveDevices = Set(firsts(deviceList))
780 else:
781 self.log.error('Could not get unresponsive devices: %s', arg)
782 self.readDevices()
783
784
786 """
787 Periodically fetch the performance values from all known devices
788 """
789
790 if self.status:
791
792
793 pending = self.status.getQueryAges()
794
795
796 doneWaiting = []
797 for device, age in pending.items():
798 beenWaiting = time.time() - age
799 if beenWaiting >= self.perfsnmpCycleInterval \
800 * CYCLES_TO_WAIT_FOR_RESPONSE:
801 self.log.error('No response from %s after %s cycles.'
802 % (device, CYCLES_TO_WAIT_FOR_RESPONSE))
803 doneWaiting.append(device)
804 else:
805 self.log.warning('Continuing to wait for response from'
806 ' %s after %s seconds' % (device, beenWaiting))
807 for device in doneWaiting:
808 del pending[device]
809
810
811 queued = self.status.inQueue()
812 if queued:
813 self.log.error('%s devices still queued at end of cycle and did'
814 ' not get queried.' % len(queued))
815 self.log.debug('Devices not queried: %s' % ', '.join(queued))
816
817
818
819
820 if not self.status._stopTime:
821 self.reportRate()
822 else:
823 pending = {}
824
825 devicesToQuery = Set(self.proxies.keys())
826
827 devicesToQuery -= self.unresponsiveDevices
828
829 devicesToQuery -= Set(pending.keys())
830 self.status = Status(self)
831 d = self.status.start(devicesToQuery, pending)
832 d.addCallback(self.reportRate)
833 for unused in range(MAX_SNMP_REQUESTS):
834 if not len(self.status.inQueue()):
835 break
836 d = self.startReadDevice(self.status.popDevice())
837
838 def printError(reason):
839 """
840 Twisted errBack to record a traceback and log messages
841 """
842 from StringIO import StringIO
843 out = StringIO()
844 reason.printTraceback(out)
845 self.log.error(reason)
846
847 d.addErrback(printError)
848
849
851 """
852 Finished reading all the devices, report stats and maybe stop
853 """
854 info = self.status.stats()
855 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0
856
857 self.log.info('******** Cycle completed ********')
858 self.log.info("Sent %d OID requests", oidsRequested)
859 self.log.info('Queried %d devices' % (info['numSucceeded'] \
860 + info['numFailed'] + info['numInProcess']))
861 self.log.info(' %s in queue still unqueried' % info['queueSize'])
862 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
863 (info['numSucceeded'], info['numFailed'], info['numInProcess']))
864 self.log.info('Waited on %d queries from previous cycles.' %
865 (info['numPrevSucceeded'] + info['numPrevFailed'] \
866 + info['numPrevInProcess']))
867 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
868 (info['numPrevSucceeded'], info['numPrevFailed'],
869 info['numPrevInProcess']))
870 self.log.info('Cycle lasted %.2f seconds' % info['age'])
871 self.log.info('*********************************')
872
873 cycle = self.perfsnmpCycleInterval
874 self.sendEvents(
875 self.rrdStats.gauge('success', cycle,
876 info['numSucceeded'] + info['numPrevSucceeded']) +
877 self.rrdStats.gauge('failed', cycle,
878 info['numFailed'] + info['numPrevFailed']) +
879 self.rrdStats.gauge('cycleTime', cycle, info['age']) +
880 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
881 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle())
882 )
883
884 self.checkOldFiles()
885
887 """
888 Send an event showing whether we have old files or not
889 """
890 if not self.options.checkagingfiles:
891 return
892 self.oldFiles = Set(
893 [f for f in self.oldFiles
894 if os.path.exists(f) and self.oldCheck.test(f)]
895 )
896 if self.oldFiles:
897 root = performancePath('')
898 filenames = [f.lstrip(root) for f in self.oldFiles]
899 message = 'RRD files not updated: ' + ' '.join(filenames)
900 self.sendEvent(dict(
901 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'),
902 severity=Critical,
903 device=self.options.monitor,
904 eventClass=Status_Perf,
905 summary=message))
906 else:
907 self.sendEvent(dict(
908 severity=Clear,
909 device=self.options.monitor,
910 eventClass=Status_Perf,
911 summary='All RRD files have been recently updated'))
912
913
915 """
916 Initiate a request (or several) to read the performance data
917 from a device
918 """
919 proxy = self.proxies.get(deviceName, None)
920 if proxy is None:
921 return
922
923
924
925 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest)
926 if proxy.singleOidMode:
927 n = 1
928
929 def getLater(oids):
930 """
931 Return the result of proxy.get( oids, timeoute, tries )
932 """
933 return checkException(self.log,
934 proxy.get,
935 oids,
936 proxy.snmpConnInfo.zSnmpTimeout,
937 proxy.snmpConnInfo.zSnmpTries)
938
939
940 proxy.open()
941 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n)))
942 d = chain.run()
943
944 def closer(arg, proxy):
945 """
946 Close the proxy
947 """
948 try:
949 proxy.close()
950 except Exception, ex:
951 self.log.exception(ex)
952 raise ex
953
954 return arg
955
956 d.addCallback(closer, proxy)
957 d.addCallback(self.storeValues, deviceName)
958
959
960 self.snmpOidsRequested += len(proxy.oidMap)
961
962 return d
963
964
965 - def badOid(self, deviceName, oid):
986
987
989 """
990 Decode responses from devices and store the elements in RRD files
991 """
992
993 proxy = self.proxies.get(deviceName, None)
994 if proxy is None:
995 self.status.record(deviceName, True)
996 return
997
998
999 for success, update in updates:
1000
1001 if success and not update and not proxy.singleOidMode:
1002 proxy.singleOidMode = True
1003 self.log.warn('Error collecting data on %s -- retrying in single-OID mode',
1004 deviceName)
1005 self.startReadDevice(deviceName)
1006 return
1007
1008 if not success:
1009 if isinstance(update, failure.Failure) and \
1010 isinstance(update.value, error.TimeoutError):
1011 self.log.debug("Device %s timed out" % deviceName)
1012 else:
1013 self.log.warning('Failed to collect on %s (%s: %s)',
1014 deviceName,
1015 update.__class__,
1016 update)
1017
1018 successCount = sum(firsts(updates))
1019 oids = []
1020 for success, update in updates:
1021 if success:
1022 for oid, value in update.items():
1023
1024 if value == '' or value is None:
1025 self.badOid(deviceName, oid)
1026 else:
1027 self.storeRRD(deviceName, oid, value)
1028 oids.append(oid)
1029
1030 if successCount == len(updates) and proxy.singleOidMode:
1031
1032 for doomed in Set(proxy.oidMap.keys()) - Set(oids):
1033 self.badOid(deviceName, doomed)
1034
1035 if self.status.inQueue():
1036 self.startReadDevice(self.status.popDevice())
1037
1038 if successCount and len(updates) > 0:
1039 successPercent = successCount * 100 / len(updates)
1040 if successPercent not in (0, 100):
1041 self.log.debug("Successful request ratio for %s is %2d%%",
1042 deviceName,
1043 successPercent)
1044 success = True
1045 if updates:
1046 success = successCount > 0
1047 self.status.record(deviceName, success)
1048 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1049
1050
1051 - def storeRRD(self, device, oid, value):
1052 """
1053 Store a value into an RRD file
1054
1055 @param device: remote device name
1056 @type device: string
1057 @param oid: SNMP OID used as our performance metric
1058 @type oid: string
1059 @param value: data to be stored
1060 @type value: number
1061 """
1062 oidData = self.proxies[device].oidMap.get(oid, None)
1063 if not oidData: return
1064
1065 raw_value = value
1066 min, max = oidData.minmax
1067 try:
1068 value = self.rrd.save(oidData.path,
1069 value,
1070 oidData.dataStorageType,
1071 oidData.rrdCreateCommand,
1072 min=min, max=max)
1073 except Exception, ex:
1074 summary= "Unable to save data for OID %s in RRD %s" % \
1075 ( oid, oidData.path )
1076 self.log.critical( summary )
1077
1078 message= """Data was value= %s, type=%s, min=%s, max=%s
1079 RRD create command: %s""" % \
1080 ( value, oidData.dataStorageType, min, max, \
1081 oidData.rrdCreateCommand )
1082 self.log.critical( message )
1083 self.log.exception( ex )
1084
1085 import traceback
1086 trace_info= traceback.format_exc()
1087
1088 evid= self.sendEvent(dict(
1089 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
1090 severity=Critical,
1091 device=self.options.monitor,
1092 eventClass=Status_Perf,
1093 component="RRD",
1094 oid=oid,
1095 path=oidData.path,
1096 message=message,
1097 traceback=trace_info,
1098 summary=summary))
1099
1100
1101 return
1102
1103 if self.options.showdeviceresults:
1104 self.log.info("%s %s results: raw=%s RRD-converted=%s"
1105 " type=%s, min=%s, max=%s" % (
1106 device, oid, raw_value, value, oidData.dataStorageType, min, max))
1107
1108 for ev in self.thresholds.check(oidData.path, time.time(), value):
1109 eventKey = oidData.path.rsplit('/')[-1]
1110 if ev.has_key('eventKey'):
1111 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
1112 else:
1113 ev['eventKey'] = eventKey
1114 self.sendThresholdEvent(**ev)
1115
1116
1117
1125
1126
1128 """
1129 Build a list of command-line options
1130 """
1131 SnmpDaemon.buildOptions(self)
1132 self.parser.add_option('--checkAgingFiles',
1133 dest='checkagingfiles',
1134 action="store_true",
1135 default=False,
1136 help="Send events when RRD files are not being updated regularly")
1137
1138 self.parser.add_option('--cacheconfigs',
1139 dest='cacheconfigs',
1140 action="store_true",
1141 default=False,
1142 help="To improve startup times, cache configuration received from zenhub")
1143
1144 self.parser.add_option('--showdeviceresults',
1145 dest='showdeviceresults',
1146 action="store_true",
1147 default=False,
1148 help="Show the raw RRD values. For debugging purposes only.")
1149
1150
1151 if __name__ == '__main__':
1152
1153 from Products.ZenRRD.zenperfsnmp import zenperfsnmp
1154
1155 zpf = zenperfsnmp()
1156 zpf.run()
1157