1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__="""zenperfsnmp
16
17 Gets SNMP performance data and stores it in RRD files.
18
19 """
20
21 import os
22 import time
23 import logging
24 log = logging.getLogger("zen.zenperfsnmp")
25
26 import copy
27 from sets import Set
28 import cPickle
29
30 from twisted.internet import reactor, defer, error
31 from twisted.python import failure
32
33 import Globals
34 from Products.ZenUtils.Utils import unused
35 from Products.ZenUtils.Chain import Chain
36 from Products.ZenUtils.Driver import drive, driveLater
37 from Products.ZenModel.PerformanceConf import performancePath
38 from Products.ZenEvents import Event
39 from Products.ZenEvents.ZenEventClasses \
40 import Perf_Snmp, Status_Snmp, Status_Perf
41 from Products.ZenEvents.ZenEventClasses import Critical, Clear
42
43 from Products.ZenRRD.RRDUtil import RRDUtil
44 from SnmpDaemon import SnmpDaemon
45
46 from FileCleanup import FileCleanup
47
48
49 from Products.ZenHub.services.PerformanceConfig import PerformanceConfig
50 unused(PerformanceConfig)
51
52 MAX_OIDS_PER_REQUEST = 40
53 MAX_SNMP_REQUESTS = 20
54 DEVICE_LOAD_CHUNK_SIZE = 20
55 CYCLES_TO_WAIT_FOR_RESPONSE = 2
56
58 """
59 Wrapper around makedirs that sanity checks before running
60 """
61 if os.path.exists(dir):
62 return
63
64 try:
65 os.makedirs(dir, 0750)
66 except Exception, ex:
67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68
69
71 """
72 Wrapper around the standard function to open a file and read its contents
73 """
74 if os.path.exists(fname):
75 fp = file(fname, 'rb')
76 try:
77 return fp.read()
78 finally:
79 fp.close()
80 return ''
81
82
84 """
85 Wrapper around the standard function to open a file and write data
86 """
87 makeDirs(os.path.dirname(fname))
88
89 try:
90 fp = open(fname, 'wb')
91 try:
92 fp.write(data)
93 finally:
94 fp.close()
95
96 except Exception, ex:
97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98
99
101 """
102 Wrapper around the standard function to delete a file
103 """
104 if os.path.exists(fname):
105 os.unlink(fname)
106
108 """
109 Break lst into n-sized chunks
110 """
111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112
113 try:
114 sorted = sorted
115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """
118 Keep things sane in a pre-python 2.4 environment
119 """
120 lst.sort(*args, **kw)
121 return lst
122
124 """
125 The first element of every item in a sequence
126 """
127 return [item[0] for item in lst]
128
130 """
131 Execute the function with arguments and keywords.
132 If there is an exception, log it using the given
133 logging function 'alog'.
134 """
135 try:
136 return function(*args, **kw)
137 except Exception, ex:
138 alog.exception(ex)
139 raise ex
140
141
142
143 from twisted.spread import pb
154
155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig)
156
157
159 """
160 Keep track of the status of many parallel requests
161 """
162
164 """
165 Initializer
166 """
167 self.daemon = daemon
168 self.reset()
169
171 """
172 Reset instance variables to intial values
173 """
174
175
176 self._numSucceeded = 0
177
178
179
180 self._numPrevSucceeded = 0
181
182
183 self._numFailed = 0
184
185
186
187 self._numPrevFailed = 0
188
189
190 self._startTime = 0
191
192
193 self._stopTime = 0
194
195
196 self._deferred = defer.Deferred()
197
198
199 self._devicesToQueryThisCycle = Set()
200
201
202
203 self._prevQueriesAndAges = {}
204
205
206
207
208 self._queue = Set()
209
210
211
212 self._reported = Set()
213
214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """
217 Record our start time, and return a deferred for our devices
218
219 @type devicesToQuery: iterable
220 @param devicesToQuery: names of devices to poll
221 @type prevQueriesAndAges: dict
222 @param prevQueriesAndAges: devices with outstanding quests
223 @return: deferred
224 """
225 self.reset()
226 self._startTime = time.time()
227
228
229
230 self._devicesToQueryThisCycle = \
231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys())
232 self._prevQueriesAndAges = prevQueriesAndAges
233 self._queue = copy.copy(self._devicesToQueryThisCycle)
234 self._checkFinished()
235 return self._deferred
236
237
238 - def record(self, name, success):
239 """
240 Record success or failure
241
242 @type name: string
243 @param name: name of device reporting results
244 @type success: boolean
245 @param success: True if query succeeded, False otherwise.
246 """
247 if name in self._reported:
248 log.error("Device %s is reporting more than once", name)
249 return
250 self._reported.add(name)
251 if name in self._devicesToQueryThisCycle:
252 if success:
253 self._numSucceeded += 1
254 else:
255 self._numFailed += 1
256 self._checkFinished()
257 elif name in self._prevQueriesAndAges:
258 if success:
259 self._numPrevSucceeded += 1
260 else:
261 self._numPrevFailed += 1
262 else:
263 log.debug('Unrecognized device reporting: %s' % name)
264
266 """
267 Determine the stopping point and log our current stats
268 """
269 if self.finished():
270 self._stopTime = time.time()
271 if not self._deferred.called:
272 self._deferred.callback(self)
273 self.daemon.heartbeat()
274 info = self.stats()
275 log.info(
276 'success:%d ' % info['numSucceeded'] +
277 'fail:%d ' % info['numFailed'] +
278 'pending:%d ' % info['numInProcess'] +
279 'todo:%d ' % info['queueSize'])
280
281
283 """
284 Determine if we have finished, disregarding devices that were queried
285 in a previous cycle and still haven't reported back.
286 """
287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
290 """
291 Return a dictionary with stats for this cycle:
292 numSucceeded - queries made this cycle and reported back success
293 numPrevSucceeded - queries from a prev cycle reported back success
294 numFailed - queries made this cycle reported back failure
295 numPrevFailed - queries made prev cycle reported back failure
296 startTime - timestamp when this cycle started
297 stopTime - timestamp when this cycle stopped
298 age - time cycle took to run or current age (if still running)
299 queueSize - num of devices not queried yet
300 numInProcess - num queried this cycle not reported back yet
301 numPrevInProcess - num queried prev cycle still not reported back
302 numReported - number reported back from this or previous cycles
303 """
304 return dict(
305 numSucceeded = self._numSucceeded,
306 numPrevSucceeded = self._numPrevSucceeded,
307 numFailed = self._numFailed,
308 numPrevFailed = self._numPrevFailed,
309 startTime = self._startTime,
310 stopTime = self._stopTime,
311 age = self._stopTime and (self._stopTime - self._startTime) \
312 or (time.time() - self._startTime),
313 queueSize = len(self._queue),
314 numInProcess = len(self.inProcess()),
315 numPrevInProcess = len(self.prevInProcess()),
316 numReported = len(self._reported)
317 )
318
320 """
321 Return the name of the devices that have been queried this cycle
322 but from whom no response has been received.
323 """
324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
327 """
328 Return the names of the devices that were queried prior to this cycle
329 and have not yet reported.
330 """
331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
334 """
335 Return the names of the devices that have yet to be queried.
336 """
337 return self._queue
338
340 """
341 Pop a device to be queried from the queue
342 """
343 return self._queue.pop()
344
346 """
347 Return a dictionary with the device and age of each query from this
348 or previous cycles that has not yet responeded.
349 """
350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items()
351 if d not in self._reported])
352 waiting.update(dict([(d, self._startTime)
353 for d in self.inProcess()]))
354 return waiting
355
356
358 """
359 Track and report SNMP status failures
360 """
361
362 snmpStatusEvent = {'eventClass': Status_Snmp,
363 'component': 'snmp',
364 'eventGroup': 'SnmpTest'}
365
366
368 """
369 Initializer
370 """
371 self.count = snmpState
372
373
393
394
395
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
406
408 """
409 Periodically query all devices for SNMP values to archive in RRD files
410 """
411
412
413 maxRrdFileAge = 30 * (24*60*60)
414 perfsnmpConfigInterval = 20*60
415 perfsnmpCycleInterval = 5*60
416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',)
417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig']
418
420 """
421 Create any base performance directories (if necessary),
422 load cached configuration data and clean up any old RRD files
423 (if specified by --checkAgingFiles)
424 """
425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts)
426 self.status = None
427 self.proxies = {}
428 self.unresponsiveDevices = Set()
429 self.snmpOidsRequested = 0
430
431 self.log.info( "Initializing daemon..." )
432
433 perfRoot = performancePath('')
434 makeDirs(perfRoot)
435
436 if self.options.cacheconfigs:
437 self.loadConfigs()
438
439 self.oldFiles = Set()
440
441
442 if self.options.checkagingfiles:
443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$',
444 24 * 60 * 60,
445 frequency=60)
446 self.oldCheck.process = self.reportOldFile
447 self.oldCheck.start()
448
449
450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$',
451 self.maxRrdFileAge,
452 frequency=90*60)
453 self.fileCleanup.process = self.cleanup
454 self.fileCleanup.start()
455
456
458 """
459 Return the path to the pickle file for a device
460 """
461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462
463
464
466 """
467 Read cached configuration values from pickle files at startup.
468
469 NB: We cache in pickles to get a full collect cycle, because
470 loading the initial config can take several minutes.
471 """
472 self.log.info( "Gathering cached configuration information" )
473
474 base = performancePath('Devices')
475 makeDirs(base)
476 root, ds, fs = os.walk(base).next()
477 for d in ds:
478 pickle_name= self.pickleName(d)
479 config = read( pickle_name )
480 if config:
481 try:
482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name )
483 data= cPickle.loads(config)
484 self.updateDeviceConfig( data )
485
486 except Exception, ex:
487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) )
488 try:
489 os.unlink( pickle_name )
490 except Exception, ex:
491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492
493
494
496 """
497 Delete an old RRD file
498 """
499 self.log.warning("Deleting old RRD file: %s", fullPath)
500 os.unlink(fullPath)
501 self.oldFiles.discard(fullPath)
502
503
505 """
506 Add an RRD file to the list of files to be removed
507 """
508 self.oldFiles.add(fullPath)
509
511 """
512 Gather the list of devices from zenhub, update all devices config
513 in the list of devices, and remove any devices that we know about,
514 but zenhub doesn't know about.
515
516 NB: This is callable from within zenhub.
517 """
518 SnmpDaemon.remote_updateDeviceList(self, devices)
519
520 survivors = []
521 doomed = Set(self.proxies.keys())
522 for device, lastChange in devices:
523 doomed.discard(device)
524 proxy = self.proxies.get(device)
525 if not proxy or proxy.lastChange < lastChange:
526 survivors.append(device)
527
528 log.info("Deleting %s", doomed)
529 for d in doomed:
530 del self.proxies[d]
531
532 if survivors:
533 log.info("Fetching configs: %s", survivors)
534 d = self.model().callRemote('getDevices', survivors)
535 d.addCallback(self.updateDeviceList, survivors)
536 d.addErrback(self.error)
537
538
539
541 """
542 Periodically ask the Zope server for basic configuration data.
543 """
544
545 now = time.time()
546
547 log.info("Fetching property items...")
548 yield self.model().callRemote('propertyItems')
549 self.setPropertyItems(driver.next())
550
551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig)
552
553 log.info("Getting threshold classes...")
554 yield self.model().callRemote('getThresholdClasses')
555 self.remote_updateThresholdClasses(driver.next())
556
557 log.info("Checking for outdated configs...")
558 current = [(k, v.lastChange) for k, v in self.proxies.items()]
559 yield self.model().callRemote('getDeviceUpdates', current)
560
561 devices = driver.next()
562 if self.options.device:
563 devices = [self.options.device]
564
565 log.info("Fetching configs for %s", repr(devices)[0:800]+'...')
566 yield self.model().callRemote('getDevices', devices)
567 updatedDevices = driver.next()
568
569 log.info("Fetching default RRDCreateCommand...")
570 yield self.model().callRemote('getDefaultRRDCreateCommand')
571 createCommand = driver.next()
572
573 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval)
574
575 log.info( "Getting collector thresholds..." )
576 yield self.model().callRemote('getCollectorThresholds')
577 self.rrdStats.config(self.options.monitor, self.name, driver.next(),
578 createCommand)
579
580 log.info("Fetching SNMP status...")
581 yield self.model().callRemote('getSnmpStatus', self.options.device)
582 self.updateSnmpStatus(driver.next())
583
584
585 log.info("Initiating incremental device load")
586 if self.options.cycle:
587 d = self.updateDeviceList(updatedDevices, devices)
588 def report(result):
589 """
590 Twisted deferred errBack to check for errors
591 """
592 if result:
593 log.error("Error loading devices: %s", result)
594 d.addBoth(report)
595 else:
596
597 yield self.updateDeviceList(updatedDevices, devices)
598 driver.next()
599
600 self.sendEvents(self.rrdStats.gauge('configTime',
601 self.configCycleInterval * 60,
602 time.time() - now))
603
604
606 """
607 Update the config for devices
608 """
609
610 def fetchDevices(driver):
611 """
612 An iterable to go over the list of devices
613 """
614 deviceNames = Set()
615 length = len(responses)
616 log.debug("Fetching configs for %d devices", length)
617 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE):
618 log.debug("Fetching config for %s", devices)
619 yield self.model().callRemote('getDeviceConfigs', devices)
620 try:
621 for response in driver.next():
622 self.updateDeviceConfig(response)
623 except Exception, ex:
624 log.warning("Error loading config for devices %s" % devices)
625 for d in devices:
626 deviceNames.add(d)
627 log.debug("Finished fetching configs for %d devices", length)
628
629
630 doomed = Set(requested) - deviceNames
631 if self.options.device:
632 self.log.debug('Gathering performance data for %s ' %
633 self.options.device)
634 doomed = Set(self.proxies.keys())
635 doomed.discard(self.options.device)
636 for name in doomed:
637 self.log.info('Removing device %s' % name)
638 if name in self.proxies:
639 del self.proxies[name]
640
641
642 config = self.pickleName(name)
643 unlink(config)
644
645
646 ips = Set()
647 for name, proxy in self.proxies.items():
648 if proxy.snmpConnInfo.manageIp in ips:
649 log.warning("Warning: device %s has a duplicate address %s",
650 name, proxy.snmpConnInfo.manageIp)
651 ips.add(proxy.snmpConnInfo.manageIp)
652 self.log.info('Configured %d of %d devices',
653 len(deviceNames), len(self.proxies))
654 yield defer.succeed(None)
655 return drive(fetchDevices)
656
657
658
660 """
661 Create or update proxy
662
663 @parameter deviceName: device name known by zenhub
664 @type deviceName: string
665 @parameter snmpConnInfo: object information passed from zenhub
666 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py
667 @return: connection information from the proxy
668 @rtype: SnmpConnInfo class
669 """
670 p = self.proxies.get(deviceName, None)
671 if not p:
672 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
673 allowCache=True)
674 p.oidMap = {}
675 p.snmpStatus = SnmpStatus(0)
676 p.singleOidMode = False
677 p.lastChange = 0
678
679 if p.snmpConnInfo != snmpConnInfo:
680 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
681 allowCache=True)
682 t.oidMap = p.oidMap
683 t.snmpStatus = p.snmpStatus
684 t.singleOidMode = p.singleOidMode
685 t.lastChange = p.lastChange
686 p = t
687
688 return p
689
690
691
693 """
694 Update the SNMP failure counts from Status database
695 """
696 countMap = dict(status)
697 for name, proxy in self.proxies.items():
698 proxy.snmpStatus.count = countMap.get(name, 0)
699
700
702 """
703 Allows zenhub to delete a device from our configuration
704 """
705 self.log.debug("Async delete device %s" % doomed)
706 if doomed in self.proxies:
707 del self.proxies[doomed]
708
709
711 """
712 Allows zenhub to update our device configuration
713 """
714 self.log.debug("Device updates from zenhub received")
715 self.updateDeviceConfig(snmpTargets)
716
717
718
720 """
721 Examine the given device configuration, and if newer update the device
722 as well as its pickle file.
723 If no SNMP proxy created for the device, create one.
724 """
725
726 self.log.debug("Received config for %s", configs.device)
727 p = self.updateAgentProxy(configs.device, configs.connInfo)
728
729 if self.options.cacheconfigs:
730 p.lastChange = configs.lastChangeTime
731 data= cPickle.dumps(configs)
732 pickle_name= self.pickleName(configs.device)
733 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name )
734 write(pickle_name, data)
735
736
737 oidMap, p.oidMap = p.oidMap, {}
738 for name, oid, path, dsType, createCmd, minmax in configs.oids:
739 createCmd = createCmd.strip()
740 oid = str(oid).strip('.')
741
742 if oid:
743 oid = '.' + oid
744 oid_status = oidMap.setdefault(oid, OidData())
745 oid_status.update(name, path, dsType, createCmd, minmax)
746 p.oidMap[oid] = oid_status
747
748 self.proxies[configs.device] = p
749 self.thresholds.updateForDevice(configs.device, configs.thresholds)
750
751
752
764
765
767 """
768 Remember all the unresponsive devices
769 """
770 if isinstance(arg, list):
771 deviceList = arg
772 self.log.debug('unresponsive devices: %r' % deviceList)
773 self.unresponsiveDevices = Set(firsts(deviceList))
774 else:
775 self.log.error('Could not get unresponsive devices: %s', arg)
776 self.readDevices()
777
778
780 """
781 Periodically fetch the performance values from all known devices
782 """
783
784 if self.status:
785
786
787 pending = self.status.getQueryAges()
788
789
790 doneWaiting = []
791 for device, age in pending.items():
792 beenWaiting = time.time() - age
793 if beenWaiting >= self.perfsnmpCycleInterval \
794 * CYCLES_TO_WAIT_FOR_RESPONSE:
795 self.log.error('No response from %s after %s cycles.'
796 % (device, CYCLES_TO_WAIT_FOR_RESPONSE))
797 doneWaiting.append(device)
798 else:
799 self.log.warning('Continuing to wait for response from'
800 ' %s after %s seconds' % (device, beenWaiting))
801 for device in doneWaiting:
802 del pending[device]
803
804
805 queued = self.status.inQueue()
806 if queued:
807 self.log.error('%s devices still queued at end of cycle and did'
808 ' not get queried.' % len(queued))
809 self.log.debug('Devices not queried: %s' % ', '.join(queued))
810
811
812
813
814 if not self.status._stopTime:
815 self.reportRate()
816 else:
817 pending = {}
818
819 devicesToQuery = Set(self.proxies.keys())
820
821 devicesToQuery -= self.unresponsiveDevices
822
823 devicesToQuery -= Set(pending.keys())
824 self.status = Status(self)
825 d = self.status.start(devicesToQuery, pending)
826 d.addCallback(self.reportRate)
827 for unused in range(MAX_SNMP_REQUESTS):
828 if not len(self.status.inQueue()):
829 break
830 d = self.startReadDevice(self.status.popDevice())
831
832 def printError(reason):
833 """
834 Twisted errBack to record a traceback and log messages
835 """
836 from StringIO import StringIO
837 out = StringIO()
838 reason.printTraceback(out)
839 self.log.error(reason)
840
841 d.addErrback(printError)
842
843
845 """
846 Finished reading all the devices, report stats and maybe stop
847 """
848 info = self.status.stats()
849 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0
850
851 self.log.info('******** Cycle completed ********')
852 self.log.info("Sent %d OID requests", oidsRequested)
853 self.log.info('Queried %d devices' % (info['numSucceeded'] \
854 + info['numFailed'] + info['numInProcess']))
855 self.log.info(' %s in queue still unqueried' % info['queueSize'])
856 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
857 (info['numSucceeded'], info['numFailed'], info['numInProcess']))
858 self.log.info('Waited on %d queries from previous cycles.' %
859 (info['numPrevSucceeded'] + info['numPrevFailed'] \
860 + info['numPrevInProcess']))
861 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
862 (info['numPrevSucceeded'], info['numPrevFailed'],
863 info['numPrevInProcess']))
864 self.log.info('Cycle lasted %.2f seconds' % info['age'])
865 self.log.info('*********************************')
866
867 cycle = self.perfsnmpCycleInterval
868 self.sendEvents(
869 self.rrdStats.gauge('success', cycle,
870 info['numSucceeded'] + info['numPrevSucceeded']) +
871 self.rrdStats.gauge('failed', cycle,
872 info['numFailed'] + info['numPrevFailed']) +
873 self.rrdStats.gauge('cycleTime', cycle, info['age']) +
874 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
875 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle())
876 )
877
878 self.checkOldFiles()
879
881 """
882 Send an event showing whether we have old files or not
883 """
884 if not self.options.checkagingfiles:
885 return
886 self.oldFiles = Set(
887 [f for f in self.oldFiles
888 if os.path.exists(f) and self.oldCheck.test(f)]
889 )
890 if self.oldFiles:
891 root = performancePath('')
892 filenames = [f.lstrip(root) for f in self.oldFiles]
893 message = 'RRD files not updated: ' + ' '.join(filenames)
894 self.sendEvent(dict(
895 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'),
896 severity=Critical,
897 device=self.options.monitor,
898 eventClass=Status_Perf,
899 summary=message))
900 else:
901 self.sendEvent(dict(
902 severity=Clear,
903 device=self.options.monitor,
904 eventClass=Status_Perf,
905 summary='All RRD files have been recently updated'))
906
907
909 """
910 Initiate a request (or several) to read the performance data
911 from a device
912 """
913 proxy = self.proxies.get(deviceName, None)
914 if proxy is None:
915 return
916
917
918
919 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest)
920 if proxy.singleOidMode:
921 n = 1
922
923 def getLater(oids):
924 """
925 Return the result of proxy.get( oids, timeoute, tries )
926 """
927 return checkException(self.log,
928 proxy.get,
929 oids,
930 proxy.snmpConnInfo.zSnmpTimeout,
931 proxy.snmpConnInfo.zSnmpTries)
932
933
934 proxy.open()
935 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n)))
936 d = chain.run()
937
938 def closer(arg, proxy):
939 """
940 Close the proxy
941 """
942 try:
943 proxy.close()
944 except Exception, ex:
945 self.log.exception(ex)
946 raise ex
947
948 return arg
949
950 d.addCallback(closer, proxy)
951 d.addCallback(self.storeValues, deviceName)
952
953
954 self.snmpOidsRequested += len(proxy.oidMap)
955
956 return d
957
958
959 - def badOid(self, deviceName, oid):
980
981
983 """
984 Decode responses from devices and store the elements in RRD files
985 """
986
987 proxy = self.proxies.get(deviceName, None)
988 if proxy is None:
989 self.status.record(deviceName, True)
990 return
991
992
993 for success, update in updates:
994
995 if success and not update and not proxy.singleOidMode:
996 proxy.singleOidMode = True
997 self.log.warn('Error collecting data on %s -- retrying in single-OID mode',
998 deviceName)
999 self.startReadDevice(deviceName)
1000 return
1001
1002 if not success:
1003 if isinstance(update, failure.Failure) and \
1004 isinstance(update.value, error.TimeoutError):
1005 self.log.debug("Device %s timed out" % deviceName)
1006 else:
1007 self.log.warning('Failed to collect on %s (%s: %s)',
1008 deviceName,
1009 update.__class__,
1010 update)
1011
1012 successCount = sum(firsts(updates))
1013 oids = []
1014 for success, update in updates:
1015 if success:
1016 for oid, value in update.items():
1017
1018 if value == '' or value is None:
1019 self.badOid(deviceName, oid)
1020 else:
1021 self.storeRRD(deviceName, oid, value)
1022 oids.append(oid)
1023
1024 if successCount == len(updates) and proxy.singleOidMode:
1025
1026 for doomed in Set(proxy.oidMap.keys()) - Set(oids):
1027 self.badOid(deviceName, doomed)
1028
1029 if self.status.inQueue():
1030 self.startReadDevice(self.status.popDevice())
1031
1032 if successCount and len(updates) > 0:
1033 successPercent = successCount * 100 / len(updates)
1034 if successPercent not in (0, 100):
1035 self.log.debug("Successful request ratio for %s is %2d%%",
1036 deviceName,
1037 successPercent)
1038 success = True
1039 if updates:
1040 success = successCount > 0
1041 self.status.record(deviceName, success)
1042 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1043
1044
1045 - def storeRRD(self, device, oid, value):
1046 """
1047 Store a value into an RRD file
1048
1049 @param device: remote device name
1050 @type device: string
1051 @param oid: SNMP OID used as our performance metric
1052 @type oid: string
1053 @param value: data to be stored
1054 @type value: number
1055 """
1056 oidData = self.proxies[device].oidMap.get(oid, None)
1057 if not oidData: return
1058
1059 raw_value = value
1060 min, max = oidData.minmax
1061 try:
1062 value = self.rrd.save(oidData.path,
1063 value,
1064 oidData.dataStorageType,
1065 oidData.rrdCreateCommand,
1066 min=min, max=max)
1067 except Exception, ex:
1068 summary= "Unable to save data for OID %s in RRD %s" % \
1069 ( oid, oidData.path )
1070 self.log.critical( summary )
1071
1072 message= """Data was value= %s, type=%s, min=%s, max=%s
1073 RRD create command: %s""" % \
1074 ( value, oidData.dataStorageType, min, max, \
1075 oidData.rrdCreateCommand )
1076 self.log.critical( message )
1077 self.log.exception( ex )
1078
1079 import traceback
1080 trace_info= traceback.format_exc()
1081
1082 evid= self.sendEvent(dict(
1083 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
1084 severity=Critical,
1085 device=self.options.monitor,
1086 eventClass=Status_Perf,
1087 component="RRD",
1088 oid=oid,
1089 path=oidData.path,
1090 message=message,
1091 traceback=trace_info,
1092 summary=summary))
1093
1094
1095 return
1096
1097 if self.options.showdeviceresults:
1098 self.log.info("%s %s results: raw=%s RRD-converted=%s"
1099 " type=%s, min=%s, max=%s" % (
1100 device, oid, raw_value, value, oidData.dataStorageType, min, max))
1101
1102 for ev in self.thresholds.check(oidData.path, time.time(), value):
1103 eventKey = oidData.path.rsplit('/')[-1]
1104 if ev.has_key('eventKey'):
1105 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
1106 else:
1107 ev['eventKey'] = eventKey
1108 self.sendThresholdEvent(**ev)
1109
1110
1111
1119
1120
1122 """
1123 Build a list of command-line options
1124 """
1125 SnmpDaemon.buildOptions(self)
1126 self.parser.add_option('--checkAgingFiles',
1127 dest='checkagingfiles',
1128 action="store_true",
1129 default=False,
1130 help="Send events when RRD files are not being updated regularly")
1131
1132 self.parser.add_option('--cacheconfigs',
1133 dest='cacheconfigs',
1134 action="store_true",
1135 default=False,
1136 help="To improve startup times, cache configuration received from zenhub")
1137
1138 self.parser.add_option('--showdeviceresults',
1139 dest='showdeviceresults',
1140 action="store_true",
1141 default=False,
1142 help="Show the raw RRD values. For debugging purposes only.")
1143
1144
1145 if __name__ == '__main__':
1146
1147 from Products.ZenRRD.zenperfsnmp import zenperfsnmp
1148
1149 zpf = zenperfsnmp()
1150 zpf.run()
1151