1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__="""zenperfsnmp
16
17 Gets SNMP performance data and stores it in RRD files.
18
19 """
20
21 import os
22 import time
23 import logging
24 log = logging.getLogger("zen.zenperfsnmp")
25
26 import copy
27 from sets import Set
28 import cPickle
29
30 from twisted.internet import reactor, defer, error
31 from twisted.python import failure
32
33 import Globals
34 from Products.ZenUtils.Utils import unused
35 from Products.ZenUtils.Chain import Chain
36 from Products.ZenUtils.Driver import drive, driveLater
37 from Products.ZenModel.PerformanceConf import performancePath
38 from Products.ZenEvents import Event
39 from Products.ZenEvents.ZenEventClasses \
40 import Perf_Snmp, Status_Snmp, Status_Perf
41 from Products.ZenEvents.ZenEventClasses import Critical, Clear
42
43 from Products.ZenRRD.RRDUtil import RRDUtil
44 from SnmpDaemon import SnmpDaemon
45
46 from FileCleanup import FileCleanup
47
48
49 from Products.ZenHub.services.PerformanceConfig import PerformanceConfig
50 unused(PerformanceConfig)
51
52 MAX_OIDS_PER_REQUEST = 40
53 MAX_SNMP_REQUESTS = 20
54 DEVICE_LOAD_CHUNK_SIZE = 20
55 CYCLES_TO_WAIT_FOR_RESPONSE = 2
56
58 """
59 Wrapper around makedirs that sanity checks before running
60 """
61 if os.path.exists(dir):
62 return
63
64 try:
65 os.makedirs(dir, 0750)
66 except Exception, ex:
67 log.critical( "Unable to create directories for %s because %s" % ( dir, ex ) )
68
69
71 """
72 Wrapper around the standard function to open a file and read its contents
73 """
74 if os.path.exists(fname):
75 fp = file(fname, 'rb')
76 try:
77 return fp.read()
78 finally:
79 fp.close()
80 return ''
81
82
84 """
85 Wrapper around the standard function to open a file and write data
86 """
87 makeDirs(os.path.dirname(fname))
88
89 try:
90 fp = open(fname, 'wb')
91 try:
92 fp.write(data)
93 finally:
94 fp.close()
95
96 except Exception, ex:
97 log.critical( "Unable to write data to %s because %s" % ( fname, ex ) )
98
99
101 """
102 Wrapper around the standard function to delete a file
103 """
104 if os.path.exists(fname):
105 os.unlink(fname)
106
108 """
109 Break lst into n-sized chunks
110 """
111 return [lst[i:i+n] for i in range(0, len(lst), n)]
112
113 try:
114 sorted = sorted
115 except NameError:
116 - def sorted(lst, *args, **kw):
117 """
118 Keep things sane in a pre-python 2.4 environment
119 """
120 lst.sort(*args, **kw)
121 return lst
122
124 """
125 The first element of every item in a sequence
126 """
127 return [item[0] for item in lst]
128
130 """
131 Execute the function with arguments and keywords.
132 If there is an exception, log it using the given
133 logging function 'alog'.
134 """
135 try:
136 return function(*args, **kw)
137 except Exception, ex:
138 alog.exception(ex)
139 raise ex
140
141
142
143 from twisted.spread import pb
154
155 pb.setUnjellyableForClass(SnmpConfig, SnmpConfig)
156
157
159 """
160 Keep track of the status of many parallel requests
161 """
162
164 """
165 Initializer
166 """
167 self.daemon = daemon
168 self.reset()
169
171 """
172 Reset instance variables to intial values
173 """
174
175
176 self._numSucceeded = 0
177
178
179
180 self._numPrevSucceeded = 0
181
182
183 self._numFailed = 0
184
185
186
187 self._numPrevFailed = 0
188
189
190 self._startTime = 0
191
192
193 self._stopTime = 0
194
195
196 self._deferred = defer.Deferred()
197
198
199 self._devicesToQueryThisCycle = Set()
200
201
202
203 self._prevQueriesAndAges = {}
204
205
206
207
208 self._queue = Set()
209
210
211
212 self._reported = Set()
213
214
215 - def start(self, devicesToQuery, prevQueriesAndAges):
216 """
217 Record our start time, and return a deferred for our devices
218
219 @type devicesToQuery: iterable
220 @param devicesToQuery: names of devices to poll
221 @type prevQueriesAndAges: dict
222 @param prevQueriesAndAges: devices with outstanding quests
223 @return: deferred
224 """
225 self.reset()
226 self._startTime = time.time()
227
228
229
230 self._devicesToQueryThisCycle = \
231 Set(devicesToQuery) - Set(prevQueriesAndAges.keys())
232 self._prevQueriesAndAges = prevQueriesAndAges
233 self._queue = copy.copy(self._devicesToQueryThisCycle)
234 self._checkFinished()
235 return self._deferred
236
237
238 - def record(self, name, success):
239 """
240 Record success or failure
241
242 @type name: string
243 @param name: name of device reporting results
244 @type success: boolean
245 @param success: True if query succeeded, False otherwise.
246 """
247 if name in self._reported:
248 log.error("Device %s is reporting more than once", name)
249 return
250 self._reported.add(name)
251 if name in self._devicesToQueryThisCycle:
252 if success:
253 self._numSucceeded += 1
254 else:
255 self._numFailed += 1
256 self._checkFinished()
257 elif name in self._prevQueriesAndAges:
258 if success:
259 self._numPrevSucceeded += 1
260 else:
261 self._numPrevFailed += 1
262 else:
263 log.debug('Unrecognized device reporting: %s' % name)
264
266 """
267 Determine the stopping point and log our current stats
268 """
269 if self.finished():
270 self._stopTime = time.time()
271 if not self._deferred.called:
272 self._deferred.callback(self)
273 self.daemon.heartbeat()
274 info = self.stats()
275 log.info(
276 'success:%d ' % info['numSucceeded'] +
277 'fail:%d ' % info['numFailed'] +
278 'pending:%d ' % info['numInProcess'] +
279 'todo:%d ' % info['queueSize'])
280
281
283 """
284 Determine if we have finished, disregarding devices that were queried
285 in a previous cycle and still haven't reported back.
286 """
287 return len(self.inQueue()) == 0 and len(self.inProcess()) == 0
288
290 """
291 Return a dictionary with stats for this cycle:
292 numSucceeded - queries made this cycle and reported back success
293 numPrevSucceeded - queries from a prev cycle reported back success
294 numFailed - queries made this cycle reported back failure
295 numPrevFailed - queries made prev cycle reported back failure
296 startTime - timestamp when this cycle started
297 stopTime - timestamp when this cycle stopped
298 age - time cycle took to run or current age (if still running)
299 queueSize - num of devices not queried yet
300 numInProcess - num queried this cycle not reported back yet
301 numPrevInProcess - num queried prev cycle still not reported back
302 numReported - number reported back from this or previous cycles
303 """
304 return dict(
305 numSucceeded = self._numSucceeded,
306 numPrevSucceeded = self._numPrevSucceeded,
307 numFailed = self._numFailed,
308 numPrevFailed = self._numPrevFailed,
309 startTime = self._startTime,
310 stopTime = self._stopTime,
311 age = self._stopTime and (self._stopTime - self._startTime) \
312 or (time.time() - self._startTime),
313 queueSize = len(self._queue),
314 numInProcess = len(self.inProcess()),
315 numPrevInProcess = len(self.prevInProcess()),
316 numReported = len(self._reported)
317 )
318
320 """
321 Return the name of the devices that have been queried this cycle
322 but from whom no response has been received.
323 """
324 return self._devicesToQueryThisCycle - self._reported - self._queue
325
327 """
328 Return the names of the devices that were queried prior to this cycle
329 and have not yet reported.
330 """
331 return Set(self._prevQueriesAndAges.keys()) - self._reported
332
334 """
335 Return the names of the devices that have yet to be queried.
336 """
337 return self._queue
338
340 """
341 Pop a device to be queried from the queue
342 """
343 return self._queue.pop()
344
346 """
347 Return a dictionary with the device and age of each query from this
348 or previous cycles that has not yet responeded.
349 """
350 waiting = dict([(d,a) for (d, a) in self._prevQueriesAndAges.items()
351 if d not in self._reported])
352 waiting.update(dict([(d, self._startTime)
353 for d in self.inProcess()]))
354 return waiting
355
356
358 """
359 Track and report SNMP status failures
360 """
361
362 snmpStatusEvent = {'eventClass': Status_Snmp,
363 'component': 'snmp',
364 'eventGroup': 'SnmpTest'}
365
366
368 """
369 Initializer
370 """
371 self.count = snmpState
372
373
393
394
395
397 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
406
408 """
409 Periodically query all devices for SNMP values to archive in RRD files
410 """
411
412
413 maxRrdFileAge = 30 * (24*60*60)
414 perfsnmpConfigInterval = 20*60
415 perfsnmpCycleInterval = 5*60
416 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',)
417 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig']
418
420 """
421 Create any base performance directories (if necessary),
422 load cached configuration data and clean up any old RRD files
423 (if specified by --checkAgingFiles)
424 """
425 SnmpDaemon.__init__(self, 'zenperfsnmp', noopts)
426 self.status = None
427 self.proxies = {}
428 self.unresponsiveDevices = Set()
429 self.snmpOidsRequested = 0
430
431 self.log.info( "Initializing daemon..." )
432
433 perfRoot = performancePath('')
434 makeDirs(perfRoot)
435
436 if self.options.cacheconfigs:
437 self.loadConfigs()
438
439 self.oldFiles = Set()
440
441
442 if self.options.checkagingfiles:
443 self.oldCheck = FileCleanup(perfRoot, '.*\\.rrd$',
444 24 * 60 * 60,
445 frequency=60)
446 self.oldCheck.process = self.reportOldFile
447 self.oldCheck.start()
448
449
450 self.fileCleanup = FileCleanup(perfRoot, '.*\\.rrd$',
451 self.maxRrdFileAge,
452 frequency=90*60)
453 self.fileCleanup.process = self.cleanup
454 self.fileCleanup.start()
455
456
458 """
459 Return the path to the pickle file for a device
460 """
461 return performancePath('Devices/%s/%s-config.pickle' % (id, self.options.monitor))
462
463
464
466 """
467 Read cached configuration values from pickle files at startup.
468
469 NB: We cache in pickles to get a full collect cycle, because
470 loading the initial config can take several minutes.
471 """
472 self.log.info( "Gathering cached configuration information" )
473
474 base = performancePath('Devices')
475 makeDirs(base)
476 root, ds, fs = os.walk(base).next()
477 for d in ds:
478 pickle_name= self.pickleName(d)
479 config = read( pickle_name )
480 if config:
481 try:
482 self.log.debug( "Reading cached config info from pickle file %s" % pickle_name )
483 data= cPickle.loads(config)
484 self.updateDeviceConfig( data )
485
486 except Exception, ex:
487 self.log.warn( "Received %s while loading cached configs in %s -- ignoring" % (ex, pickle_name ) )
488 try:
489 os.unlink( pickle_name )
490 except Exception, ex:
491 self.log.warn( "Unable to delete corrupted pickle file %s because %s" % ( pickle_name, ex ) )
492
493
494
496 """
497 Delete an old RRD file
498 """
499 self.log.warning("Deleting old RRD file: %s", fullPath)
500 os.unlink(fullPath)
501 self.oldFiles.discard(fullPath)
502
503
505 """
506 Add an RRD file to the list of files to be removed
507 """
508 self.oldFiles.add(fullPath)
509
511 """
512 Gather the list of devices from zenhub, update all devices config
513 in the list of devices, and remove any devices that we know about,
514 but zenhub doesn't know about.
515
516 NB: This is callable from within zenhub.
517 """
518 SnmpDaemon.remote_updateDeviceList(self, devices)
519
520 survivors = []
521 doomed = Set(self.proxies.keys())
522 for device, lastChange in devices:
523 doomed.discard(device)
524 proxy = self.proxies.get(device)
525 if not proxy or proxy.lastChange < lastChange:
526 survivors.append(device)
527
528 log.info("Deleting %s", doomed)
529 for d in doomed:
530 del self.proxies[d]
531
532 if survivors:
533 log.info("Fetching configs: %s", survivors)
534 d = self.model().callRemote('getDevices', survivors)
535 d.addCallback(self.updateDeviceList, survivors)
536 d.addErrback(self.error)
537
538
539
541 """
542 Periodically ask the Zope server for basic configuration data.
543 """
544
545 now = time.time()
546
547 log.info("Fetching property items...")
548 yield self.model().callRemote('propertyItems')
549 self.setPropertyItems(driver.next())
550
551 driveLater(self.configCycleInterval * 60, self.startUpdateConfig)
552
553 log.info("Getting threshold classes...")
554 yield self.model().callRemote('getThresholdClasses')
555 self.remote_updateThresholdClasses(driver.next())
556
557 log.info("Checking for outdated configs...")
558 current = [(k, v.lastChange) for k, v in self.proxies.items()]
559 yield self.model().callRemote('getDeviceUpdates', current)
560
561 devices = driver.next()
562 if self.options.device:
563 devices = [self.options.device]
564
565 log.info("Fetching configs for %s", repr(devices)[0:800]+'...')
566 yield self.model().callRemote('getDevices', devices)
567 updatedDevices = driver.next()
568
569 log.info("Fetching default RRDCreateCommand...")
570 yield self.model().callRemote('getDefaultRRDCreateCommand')
571 createCommand = driver.next()
572
573 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval)
574
575 log.info( "Getting collector thresholds..." )
576 yield self.model().callRemote('getCollectorThresholds')
577 self.rrdStats.config(self.options.monitor, self.name, driver.next(),
578 createCommand)
579
580 log.info("Fetching SNMP status...")
581 yield self.model().callRemote('getSnmpStatus', self.options.device)
582 self.updateSnmpStatus(driver.next())
583
584
585 log.info("Initiating incremental device load")
586 d = self.updateDeviceList(updatedDevices, devices)
587 def report(result):
588 """
589 Twisted deferred errBack to check for errors
590 """
591 if result:
592 log.error("Error loading devices: %s", result)
593 d.addBoth(report)
594 self.sendEvents(self.rrdStats.gauge('configTime',
595 self.configCycleInterval * 60,
596 time.time() - now))
597
598
600 """
601 Update the config for devices
602 """
603
604 def fetchDevices(driver):
605 """
606 An iterable to go over the list of devices
607 """
608 deviceNames = Set()
609 length = len(responses)
610 log.debug("Fetching configs for %d devices", length)
611 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE):
612 log.debug("Fetching config for %s", devices)
613 yield self.model().callRemote('getDeviceConfigs', devices)
614 try:
615 for response in driver.next():
616 self.updateDeviceConfig(response)
617 except Exception, ex:
618 log.warning("Error loading config for devices %s" % devices)
619 for d in devices:
620 deviceNames.add(d)
621 log.debug("Finished fetching configs for %d devices", length)
622
623
624 doomed = Set(requested) - deviceNames
625 if self.options.device:
626 self.log.debug('Gathering performance data for %s ' %
627 self.options.device)
628 doomed = Set(self.proxies.keys())
629 doomed.discard(self.options.device)
630 for name in doomed:
631 self.log.info('Removing device %s' % name)
632 if name in self.proxies:
633 del self.proxies[name]
634
635
636 config = self.pickleName(name)
637 unlink(config)
638
639
640 ips = Set()
641 for name, proxy in self.proxies.items():
642 if proxy.snmpConnInfo.manageIp in ips:
643 log.warning("Warning: device %s has a duplicate address %s",
644 name, proxy.snmpConnInfo.manageIp)
645 ips.add(proxy.snmpConnInfo.manageIp)
646 self.log.info('Configured %d of %d devices',
647 len(deviceNames), len(self.proxies))
648 yield defer.succeed(None)
649 return drive(fetchDevices)
650
651
652
654 """
655 Create or update proxy
656
657 @parameter deviceName: device name known by zenhub
658 @type deviceName: string
659 @parameter snmpConnInfo: object information passed from zenhub
660 @type snmpConnInfo: class SnmpConnInfo from Products/ZenHub/services/PerformanceConfig.py
661 @return: connection information from the proxy
662 @rtype: SnmpConnInfo class
663 """
664 p = self.proxies.get(deviceName, None)
665 if not p:
666 p = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
667 allowCache=True)
668 p.oidMap = {}
669 p.snmpStatus = SnmpStatus(0)
670 p.singleOidMode = False
671 p.lastChange = 0
672
673 if p.snmpConnInfo != snmpConnInfo:
674 t = snmpConnInfo.createSession(protocol=self.snmpPort.protocol,
675 allowCache=True)
676 t.oidMap = p.oidMap
677 t.snmpStatus = p.snmpStatus
678 t.singleOidMode = p.singleOidMode
679 t.lastChange = p.lastChange
680 p = t
681
682 return p
683
684
685
693
694
696 """
697 Allows zenhub to delete a device from our configuration
698 """
699 self.log.debug("Async delete device %s" % doomed)
700 if doomed in self.proxies:
701 del self.proxies[doomed]
702
703
705 """
706 Allows zenhub to update our device configuration
707 """
708 self.log.debug("Device updates from zenhub received")
709 self.updateDeviceConfig(snmpTargets)
710
711
712
714 """
715 Examine the given device configuration, and if newer update the device
716 as well as its pickle file.
717 If no SNMP proxy created for the device, create one.
718 """
719
720 self.log.debug("Received config for %s", configs.device)
721 p = self.updateAgentProxy(configs.device, configs.connInfo)
722
723 if self.options.cacheconfigs:
724 p.lastChange = configs.lastChangeTime
725 data= cPickle.dumps(configs)
726 pickle_name= self.pickleName(configs.device)
727 self.log.debug( "Updating cached configs in pickle file %s" % pickle_name )
728 write(pickle_name, data)
729
730
731 oidMap, p.oidMap = p.oidMap, {}
732 for name, oid, path, dsType, createCmd, minmax in configs.oids:
733 createCmd = createCmd.strip()
734 oid = str(oid).strip('.')
735
736 if oid:
737 oid = '.' + oid
738 oid_status = oidMap.setdefault(oid, OidData())
739 oid_status.update(name, path, dsType, createCmd, minmax)
740 p.oidMap[oid] = oid_status
741
742 self.proxies[configs.device] = p
743 self.thresholds.updateForDevice(configs.device, configs.thresholds)
744
745
746
758
759
761 """
762 Remember all the unresponsive devices
763 """
764 if isinstance(arg, list):
765 deviceList = arg
766 self.log.debug('unresponsive devices: %r' % deviceList)
767 self.unresponsiveDevices = Set(firsts(deviceList))
768 else:
769 self.log.error('Could not get unresponsive devices: %s', arg)
770 self.readDevices()
771
772
774 """
775 Periodically fetch the performance values from all known devices
776 """
777
778 if self.status:
779
780
781 pending = self.status.getQueryAges()
782
783
784 doneWaiting = []
785 for device, age in pending.items():
786 beenWaiting = time.time() - age
787 if beenWaiting >= self.perfsnmpCycleInterval \
788 * CYCLES_TO_WAIT_FOR_RESPONSE:
789 self.log.error('No response from %s after %s cycles.'
790 % (device, CYCLES_TO_WAIT_FOR_RESPONSE))
791 doneWaiting.append(device)
792 else:
793 self.log.warning('Continuing to wait for response from'
794 ' %s after %s seconds' % (device, beenWaiting))
795 for device in doneWaiting:
796 del pending[device]
797
798
799 queued = self.status.inQueue()
800 if queued:
801 self.log.error('%s devices still queued at end of cycle and did'
802 ' not get queried.' % len(queued))
803 self.log.debug('Devices not queried: %s' % ', '.join(queued))
804
805
806
807
808 if not self.status._stopTime:
809 self.reportRate()
810 else:
811 pending = {}
812
813 devicesToQuery = Set(self.proxies.keys())
814
815 devicesToQuery -= self.unresponsiveDevices
816
817 devicesToQuery -= Set(pending.keys())
818 self.status = Status(self)
819 d = self.status.start(devicesToQuery, pending)
820 d.addCallback(self.reportRate)
821 for unused in range(MAX_SNMP_REQUESTS):
822 if not len(self.status.inQueue()):
823 break
824 d = self.startReadDevice(self.status.popDevice())
825
826 def printError(reason):
827 """
828 Twisted errBack to record a traceback and log messages
829 """
830 from StringIO import StringIO
831 out = StringIO()
832 reason.printTraceback(out)
833 self.log.error(reason)
834
835 d.addErrback(printError)
836
837
839 """
840 Finished reading all the devices, report stats and maybe stop
841 """
842 info = self.status.stats()
843 oidsRequested, self.snmpOidsRequested = self.snmpOidsRequested, 0
844
845 self.log.info('******** Cycle completed ********')
846 self.log.info("Sent %d OID requests", oidsRequested)
847 self.log.info('Queried %d devices' % (info['numSucceeded'] \
848 + info['numFailed'] + info['numInProcess']))
849 self.log.info(' %s in queue still unqueried' % info['queueSize'])
850 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
851 (info['numSucceeded'], info['numFailed'], info['numInProcess']))
852 self.log.info('Waited on %d queries from previous cycles.' %
853 (info['numPrevSucceeded'] + info['numPrevFailed'] \
854 + info['numPrevInProcess']))
855 self.log.info(' Successes: %d Failures: %d Not reporting: %d' %
856 (info['numPrevSucceeded'], info['numPrevFailed'],
857 info['numPrevInProcess']))
858 self.log.info('Cycle lasted %.2f seconds' % info['age'])
859 self.log.info('*********************************')
860
861 cycle = self.perfsnmpCycleInterval
862 self.sendEvents(
863 self.rrdStats.gauge('success', cycle,
864 info['numSucceeded'] + info['numPrevSucceeded']) +
865 self.rrdStats.gauge('failed', cycle,
866 info['numFailed'] + info['numPrevFailed']) +
867 self.rrdStats.gauge('cycleTime', cycle, info['age']) +
868 self.rrdStats.counter('dataPoints', cycle, self.rrd.dataPoints) +
869 self.rrdStats.gauge('cyclePoints', cycle, self.rrd.endCycle())
870 )
871
872 self.checkOldFiles()
873
875 """
876 Send an event showing whether we have old files or not
877 """
878 if not self.options.checkagingfiles:
879 return
880 self.oldFiles = Set(
881 [f for f in self.oldFiles
882 if os.path.exists(f) and self.oldCheck.test(f)]
883 )
884 if self.oldFiles:
885 root = performancePath('')
886 filenames = [f.lstrip(root) for f in self.oldFiles]
887 message = 'RRD files not updated: ' + ' '.join(filenames)
888 self.sendEvent(dict(
889 dedupid="%s|%s" % (self.options.monitor, 'RRD files too old'),
890 severity=Critical,
891 device=self.options.monitor,
892 eventClass=Status_Perf,
893 summary=message))
894 else:
895 self.sendEvent(dict(
896 severity=Clear,
897 device=self.options.monitor,
898 eventClass=Status_Perf,
899 summary='All RRD files have been recently updated'))
900
901
903 """
904 Initiate a request (or several) to read the performance data
905 from a device
906 """
907 proxy = self.proxies.get(deviceName, None)
908 if proxy is None:
909 return
910
911
912
913 n = int(proxy.snmpConnInfo.zMaxOIDPerRequest)
914 if proxy.singleOidMode:
915 n = 1
916
917 def getLater(oids):
918 """
919 Return the result of proxy.get( oids, timeoute, tries )
920 """
921 return checkException(self.log,
922 proxy.get,
923 oids,
924 proxy.snmpConnInfo.zSnmpTimeout,
925 proxy.snmpConnInfo.zSnmpTries)
926
927
928 proxy.open()
929 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n)))
930 d = chain.run()
931
932 def closer(arg, proxy):
933 """
934 Close the proxy
935 """
936 try:
937 proxy.close()
938 except Exception, ex:
939 self.log.exception(ex)
940 raise ex
941
942 return arg
943
944 d.addCallback(closer, proxy)
945 d.addCallback(self.storeValues, deviceName)
946
947
948 self.snmpOidsRequested += len(proxy.oidMap)
949
950 return d
951
952
953 - def badOid(self, deviceName, oid):
974
975
977 """
978 Decode responses from devices and store the elements in RRD files
979 """
980
981 proxy = self.proxies.get(deviceName, None)
982 if proxy is None:
983 self.status.record(deviceName, True)
984 return
985
986
987 for success, update in updates:
988
989 if success and not update and not proxy.singleOidMode:
990 proxy.singleOidMode = True
991 self.log.warn('Error collecting data on %s -- retrying in single-OID mode',
992 deviceName)
993 self.startReadDevice(deviceName)
994 return
995
996 if not success:
997 if isinstance(update, failure.Failure) and \
998 isinstance(update.value, error.TimeoutError):
999 self.log.debug("Device %s timed out" % deviceName)
1000 else:
1001 self.log.warning('Failed to collect on %s (%s: %s)',
1002 deviceName,
1003 update.__class__,
1004 update)
1005
1006 successCount = sum(firsts(updates))
1007 oids = []
1008 for success, update in updates:
1009 if success:
1010 for oid, value in update.items():
1011
1012 if value == '' or value is None:
1013 self.badOid(deviceName, oid)
1014 else:
1015 self.storeRRD(deviceName, oid, value)
1016 oids.append(oid)
1017
1018 if successCount == len(updates) and proxy.singleOidMode:
1019
1020 for doomed in Set(proxy.oidMap.keys()) - Set(oids):
1021 self.badOid(deviceName, doomed)
1022
1023 if self.status.inQueue():
1024 self.startReadDevice(self.status.popDevice())
1025
1026 if successCount and len(updates) > 0:
1027 successPercent = successCount * 100 / len(updates)
1028 if successPercent not in (0, 100):
1029 self.log.debug("Successful request ratio for %s is %2d%%",
1030 deviceName,
1031 successPercent)
1032 success = True
1033 if updates:
1034 success = successCount > 0
1035 self.status.record(deviceName, success)
1036 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
1037
1038
1039 - def storeRRD(self, device, oid, value):
1040 """
1041 Store a value into an RRD file
1042
1043 @param device: remote device name
1044 @type device: string
1045 @param oid: SNMP OID used as our performance metric
1046 @type oid: string
1047 @param value: data to be stored
1048 @type value: number
1049 """
1050 oidData = self.proxies[device].oidMap.get(oid, None)
1051 if not oidData: return
1052
1053 raw_value = value
1054 min, max = oidData.minmax
1055 try:
1056 value = self.rrd.save(oidData.path,
1057 value,
1058 oidData.dataStorageType,
1059 oidData.rrdCreateCommand,
1060 min=min, max=max)
1061 except Exception, ex:
1062 summary= "Unable to save data for OID %s in RRD %s" % \
1063 ( oid, oidData.path )
1064 self.log.critical( summary )
1065
1066 message= """Data was value= %s, type=%s, min=%s, max=%s
1067 RRD create command: %s""" % \
1068 ( value, oidData.dataStorageType, min, max, \
1069 oidData.rrdCreateCommand )
1070 self.log.critical( message )
1071 self.log.exception( ex )
1072
1073 import traceback
1074 trace_info= traceback.format_exc()
1075
1076 evid= self.sendEvent(dict(
1077 dedupid="%s|%s" % (self.options.monitor, 'RRD write failure'),
1078 severity=Critical,
1079 device=self.options.monitor,
1080 eventClass=Status_Perf,
1081 component="RRD",
1082 oid=oid,
1083 path=oidData.path,
1084 message=message,
1085 traceback=trace_info,
1086 summary=summary))
1087
1088
1089 return
1090
1091 if self.options.showdeviceresults:
1092 self.log.info("%s %s results: raw=%s RRD-converted=%s"
1093 " type=%s, min=%s, max=%s" % (
1094 device, oid, raw_value, value, oidData.dataStorageType, min, max))
1095
1096 for ev in self.thresholds.check(oidData.path, time.time(), value):
1097 eventKey = oidData.path.rsplit('/')[-1]
1098 if ev.has_key('eventKey'):
1099 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
1100 else:
1101 ev['eventKey'] = eventKey
1102 self.sendThresholdEvent(**ev)
1103
1104
1105
1113
1114
1116 """
1117 Build a list of command-line options
1118 """
1119 SnmpDaemon.buildOptions(self)
1120 self.parser.add_option('--checkAgingFiles',
1121 dest='checkagingfiles',
1122 action="store_true",
1123 default=False,
1124 help="Send events when RRD files are not being updated regularly")
1125
1126 self.parser.add_option('--cacheconfigs',
1127 dest='cacheconfigs',
1128 action="store_true",
1129 default=False,
1130 help="To improve startup times, cache configuration received from zenhub")
1131
1132 self.parser.add_option('--showdeviceresults',
1133 dest='showdeviceresults',
1134 action="store_true",
1135 default=False,
1136 help="Show the raw RRD values. For debugging purposes only.")
1137
1138
1139 if __name__ == '__main__':
1140
1141 from Products.ZenRRD.zenperfsnmp import zenperfsnmp
1142
1143 zpf = zenperfsnmp()
1144 zpf.run()
1145