1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__='''zenperfsnmp
16
17 Gets snmp performance data and stores it in the RRD files.
18
19 $Id$
20 '''
21
22 __version__ = "$Revision$"[11:-2]
23
24 import os
25 import time
26 import logging
27 log = logging.getLogger("zen.zenperfsnmp")
28
29 from sets import Set
30 import cPickle
31
32 from twisted.internet import reactor, defer
33
34 try:
35 from pynetsnmp.twistedsnmp import AgentProxy
36 except ImportError:
37 import warnings
38 warnings.warn("Using python-based snmp engine")
39 from twistedsnmp.agentproxy import AgentProxy
40 if not hasattr(AgentProxy, 'open'):
42 AgentProxy.open = ignore
43 AgentProxy.close = ignore
44
45 import Globals
46 from Products.ZenUtils.Chain import Chain
47 from Products.ZenUtils.Driver import drive, driveLater
48 from Products.ZenModel.PerformanceConf import performancePath
49 from Products.ZenEvents import Event
50 from Products.ZenEvents.ZenEventClasses import Perf_Snmp, Status_Snmp
51
52 from Products.ZenRRD.RRDUtil import RRDUtil
53 from SnmpDaemon import SnmpDaemon
54
55 from FileCleanup import FileCleanup
56
57 MAX_OIDS_PER_REQUEST = 40
58 MAX_SNMP_REQUESTS = 20
59 DEVICE_LOAD_CHUNK_SIZE = 2
60
62 if not os.path.exists(dir):
63 os.makedirs(dir, 0750)
64
66 if os.path.exists(fname):
67 fp = file(fname, 'rb')
68 try:
69 return fp.read()
70 finally:
71 fp.close()
72 return ''
73
81
85
87 'break lst into n-sized chunks'
88 return [lst[i:i+n] for i in range(0, len(lst), n)]
89
90 try:
91 sorted = sorted
92 except NameError:
94 lst.sort(*args, **kw)
95 return lst
96
98 'the first element of every item in a sequence'
99 return [item[0] for item in lst]
100
102 try:
103 return function(*args, **kw)
104 except Exception, ex:
105 log.exception(ex)
106 raise ex
107
109 'keep track of the status of many parallel requests'
110 _success = _fail = 0
111 _startTime = _stopTime = 0.0
112 _deferred = None
113
115 self._allDevices = Set()
116 self._reported = Set()
117
118 - def start(self, devices):
126
127
128 - def record(self, name, successOrFailure):
129 'Record success or failure'
130 if name in self._reported:
131 log.error("Device %s is reporting more than once", name)
132 return
133 self._reported.add(name)
134 if successOrFailure:
135 self.success()
136 else:
137 self.fail()
138
139
144
145
146 - def fail(self, *unused):
150
151
159
160
162 'determine if we have finished everything'
163 return self.outstanding() <= 0
164
165
174
175
177 'return the number of unfinished operations'
178 return len(self.outstandingNames())
179
181 'return the number of unfinished operations'
182 return self._allDevices - self._reported
183
184
186 "track and report SNMP status failures"
187
188 snmpStatusEvent = {'eventClass': Status_Snmp,
189 'component': 'snmp',
190 'eventGroup': 'SnmpTest'}
191
192
194 self.count = snmpState
195
196
214
215
217 - def update(self, name, path, dataStorageType, rrdCreateCommand, minmax):
218 self.name = name
219 self.path = path
220 self.dataStorageType = dataStorageType
221 self.rrdCreateCommand = rrdCreateCommand
222 self.minmax = minmax
223
224
226 "Periodically query all devices for SNMP values to archive in RRD files"
227
228
229 maxRrdFileAge = 30 * (24*60*60)
230 perfsnmpConfigInterval = 20*60
231 perfsnmpCycleInterval = 5*60
232 properties = SnmpDaemon.properties + ('perfsnmpCycleInterval',)
233 initialServices = SnmpDaemon.initialServices + ['SnmpPerfConfig']
234
250
253
266
268 self.log.warning("Deleting old RRD file: %s", fullPath)
269 os.unlink(fullPath)
270
272 "Stop if all performance has been fetched, and we aren't cycling"
273 if not self.options.daemon and \
274 not self.options.cycle:
275 reactor.callLater(0, reactor.stop)
276
294
296 'Periodically ask the Zope server for basic configuration data.'
297
298 log.info("fetching property items")
299 yield self.model().callRemote('propertyItems')
300 self.setPropertyItems(driver.next())
301
302 driveLater(self.configCycleInterval * 60, self.startUpdateConfig)
303
304 log.info("getting threshold classes")
305 yield self.model().callRemote('getThresholdClasses')
306 self.remote_updateThresholdClasses(driver.next())
307
308 log.info("checking for outdated configs")
309 current = [(k, v.lastChange) for k, v in self.proxies.items()]
310 yield self.model().callRemote('getDeviceUpdates', current)
311
312 devices = driver.next()
313 if self.options.device:
314 devices = [self.options.device]
315
316 log.info("fetching configs for %s", repr(devices)[0:800]+'...')
317 yield self.model().callRemote('getDevices', devices)
318 updatedDevices = driver.next()
319
320 log.info("fetching default RRDCreateCommand")
321 yield self.model().callRemote('getDefaultRRDCreateCommand')
322 createCommand = driver.next()
323
324 self.rrd = RRDUtil(createCommand, self.perfsnmpCycleInterval)
325
326 log.info("fetching snmp status")
327 yield self.model().callRemote('getSnmpStatus', self.options.device)
328 self.updateSnmpStatus(driver.next())
329
330
331 log.info("Initiating incremental device load")
332 d = self.updateDeviceList(updatedDevices, devices)
333 def report(result):
334 if result:
335 log.error("Error loading devices: %s", result)
336 d.addBoth(report)
337
338
339
341 'Update the config for devices devices'
342 def fetchDevices(driver):
343 deviceNames = Set()
344 length = len(responses)
345 log.debug("Fetching configs for %d devices", length)
346 for devices in chunk(responses, DEVICE_LOAD_CHUNK_SIZE):
347 log.debug("Fetching config for %s", devices)
348 yield self.model().callRemote('getDeviceConfigs', devices)
349 for response in driver.next():
350 self.updateDeviceConfig(response)
351 for d in devices:
352 deviceNames.add(d)
353 log.debug("Finished fetching configs for %d devices", length)
354
355
356 doomed = Set(requested) - deviceNames
357 if self.options.device:
358 self.log.debug('Gathering performance data for %s ' %
359 self.options.device)
360 doomed = Set(self.proxies.keys())
361 doomed.discard(self.options.device)
362 for name in doomed:
363 self.log.info('removing device %s' % name)
364 if name in self.proxies:
365 del self.proxies[name]
366 config = self.pickleName(name)
367 unlink(config)
368
369
370 ips = Set()
371 for name, proxy in self.proxies.items():
372 if proxy.ip in ips:
373 log.warning("Warning: device %s has a duplicate address %s",
374 name, proxy.ip)
375 ips.add(proxy.ip)
376 self.log.info('Configured %d of %d devices',
377 len(deviceNames), len(self.proxies))
378 yield defer.succeed(None)
379 return drive(fetchDevices)
380
381
382 - def updateAgentProxy(self,
383 deviceName, ip, port, community,
384 version, timeout, tries, maxoids=40):
408
414
415
417 self.log.debug("Async delete device %s" % doomed)
418 if doomed in self.proxies:
419 del self.proxies[doomed]
420
421
425
426
428 'Save the device configuration and create an SNMP proxy to talk to it'
429 last, identity, thresholds, oidData = snmpTargets
430 deviceName, hostPort, snmpConfig, maxOIDs = identity
431
432 if not oidData: return
433 (ip, port)= hostPort
434 (community, version, timeout, tries) = snmpConfig
435 self.log.debug("received config for %s", deviceName)
436 if version.find('1') >= 0:
437 version = '1'
438 else:
439 version = '2'
440 p = self.updateAgentProxy(deviceName,
441 ip, port, str(community),
442 version, timeout, tries, maxOIDs)
443 if p.lastChange < last:
444 p.lastChange = last
445 write(self.pickleName(deviceName), cPickle.dumps(snmpTargets))
446
447 oidMap, p.oidMap = p.oidMap, {}
448 for name, oid, path, dsType, createCmd, minmax in oidData:
449 createCmd = createCmd.strip()
450 oid = str(oid).strip('.')
451
452 if oid:
453 oid = '.' + oid
454 p.oidMap[oid] = d = oidMap.setdefault(oid, OidData())
455 d.update(name, path, dsType, createCmd, minmax)
456 self.proxies[deviceName] = p
457 self.thresholds.updateList(thresholds)
458
459
467
468
470 "remember all the unresponsive devices"
471 if isinstance(arg, list):
472 deviceList = arg
473 self.log.debug('unresponsive devices: %r' % deviceList)
474 self.unresponsiveDevices = Set(firsts(deviceList))
475 else:
476 self.log.error('Could not get unresponsive devices: %s', arg)
477 self.readDevices()
478
479
481 'Periodically fetch the performance values from all known devices'
482
483 if self.status and not self.status.finished():
484 _, _, _, age = self.status.stats()
485 self.log.warning("There are still %d devices to query",
486 self.status.outstanding())
487 self.log.warning("Problem devices: %r",
488 list(self.status.outstandingNames()))
489 if age < self.perfsnmpCycleInterval * 2:
490 self.log.warning("Waiting one more cycle period")
491 return
492 self.log.warning("Devices status is not clearing. Restarting.")
493
494 self.queryWorkList = Set(self.proxies.keys())
495 self.queryWorkList -= self.unresponsiveDevices
496 self.status = Status()
497 d = self.status.start(self.queryWorkList)
498 d.addCallback(self.reportRate)
499 for unused in range(MAX_SNMP_REQUESTS):
500 if not self.queryWorkList: break
501 d = self.startReadDevice(self.queryWorkList.pop())
502 def printError(reason):
503 from StringIO import StringIO
504 out = StringIO()
505 reason.printTraceback(out)
506 self.log.error(reason)
507 d.addErrback(printError)
508
509
511 'Finished reading all the devices, report stats and maybe stop'
512 total, success, failed, runtime = self.status.stats()
513 self.log.info("sent %d OID requests", self.snmpOidsRequested)
514 self.log.info("collected %d of %d devices in %.2f" %
515 (success, total, runtime))
516 self.snmpOidsRequested = 0
517 self.heartbeat()
518
519
533 proxy.open()
534 chain = Chain(getLater, iter(chunk(sorted(proxy.oidMap.keys()), n)))
535 d = chain.run()
536 def closer(arg, proxy):
537 checkException(self.log, proxy.close)
538 return arg
539 d.addCallback(closer, proxy)
540 d.addCallback(self.storeValues, deviceName)
541 self.snmpOidsRequested += len(proxy.oidMap)
542 return d
543
544
545 - def badOid(self, deviceName, oid):
560
561
563 'decode responses from devices and store the elements in RRD files'
564
565 proxy = self.proxies.get(deviceName, None)
566 if proxy is None:
567 self.status.record(deviceName, True)
568 return
569
570
571 for success, update in updates:
572
573 if success and not update and not proxy.singleOidMode:
574 proxy.singleOidMode = True
575 self.log.warn('Error collecting data on %s, recollecting',
576 deviceName)
577 self.startReadDevice(deviceName)
578 return
579 if not success:
580 self.log.debug('Failed to collect on %s (%s: %s)',
581 deviceName,
582 update.__class__,
583 update)
584
585 successCount = sum(firsts(updates))
586 oids = []
587 for success, update in updates:
588 if success:
589 for oid, value in update.items():
590
591 if value == '':
592 self.badOid(deviceName, oid)
593 else:
594 self.storeRRD(deviceName, oid, value)
595 oids.append(oid)
596
597 if successCount == len(updates) and proxy.singleOidMode:
598
599 for doomed in Set(proxy.oidMap.keys()) - Set(oids):
600 self.badOid(deviceName, doomed)
601
602 if self.queryWorkList:
603 self.startReadDevice(self.queryWorkList.pop())
604
605 if successCount:
606 successPercent = successCount * 100 / len(updates)
607 if successPercent not in (0, 100):
608 self.log.debug("Successful request ratio for %s is %2d%%",
609 deviceName,
610 successPercent)
611 success = True
612 if updates:
613 success = successCount > 0
614 self.status.record(deviceName, success)
615 proxy.snmpStatus.updateStatus(deviceName, success, self.sendEvent)
616
617
618 - def storeRRD(self, device, oid, value):
633
638
639
640 if __name__ == '__main__':
641 zpf = zenperfsnmp()
642 zpf.run()
643