1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """Discover (aka model) a device and its components.
15 For instance, find out what Ethernet interfaces and hard disks a server
16 has available.
17 This information should change much less frequently than performance metrics.
18 """
19
20
21
22
23
24 import pysamba.twisted.reactor
25
26 import Globals
27 from Products.ZenWin.WMIClient import WMIClient
28 from Products.ZenWin.utils import addNTLMv2Option, setNTLMv2Auth
29 from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon
30 from Products.ZenUtils.DaemonStats import DaemonStats
31 from Products.ZenUtils.Driver import drive, driveLater
32 from Products.ZenUtils.Utils import unused
33 from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
34
35 from PythonClient import PythonClient
36 from SshClient import SshClient
37 from TelnetClient import TelnetClient, buildOptions as TCbuildOptions
38 from SnmpClient import SnmpClient
39 from PortscanClient import PortscanClient
40
41 from Products.DataCollector import Classifier
42
43 from twisted.internet import reactor
44 from twisted.internet.defer import succeed
45
46 import time
47 import types
48 import re
49 import DateTime
50
51 import os
52 import os.path
53 import sys
54 import traceback
55 from random import randint
56
57 defaultPortScanTimeout = 5
58 defaultParallel = 1
59 defaultProtocol = "ssh"
60 defaultPort = 22
61
62
63 from Products.DataCollector import DeviceProxy
64 from Products.DataCollector import Plugins
65 unused(DeviceProxy, Plugins)
66
68 """
69 Daemon class to attach to zenhub and pass along
70 device configuration information.
71 """
72
73 name = 'zenmodeler'
74 initialServices = PBDaemon.initialServices + ['ModelerService']
75
76 generateEvents = True
77 configCycleInterval = 360
78
79 classCollectorPlugins = ()
80
82 """
83 Initalizer
84
85 @param single: collect from a single device?
86 @type single: boolean
87 """
88 PBDaemon.__init__(self)
89
90 self.options.force = True
91 self.start = None
92 self.rrdStats = DaemonStats()
93 self.single = single
94 if self.options.device:
95 self.single = True
96 self.modelerCycleInterval = self.options.cycletime
97 self.collage = float( self.options.collage ) / 1440.0
98 self.pendingNewClients = False
99 self.clients = []
100 self.finished = []
101 self.devicegen = None
102
103
104 self.started = False
105 self.startDelay = 0
106 if self.options.daemon:
107 if self.options.now:
108 self.log.debug("Run as a daemon, starting immediately.")
109 else:
110
111 self.startDelay = randint(10, 60) * 1
112 self.log.info("Run as a daemon, waiting %s seconds to start." %
113 self.startDelay)
114 else:
115 self.log.debug("Run in foreground, starting immediately.")
116
117
119 """
120 Log errors that have occurred
121
122 @param error: error message
123 @type error: string
124 """
125 self.log.error("Error occured: %s", error)
126
127
129 """
130 Called after connected to the zenhub service
131 """
132 d = self.configure()
133 d.addCallback(self.heartbeat)
134 d.addErrback(self.reportError)
135
136
177
178 return drive(inner)
179
180
186
187
189 """
190 Build a list of active plugins for a device, based on:
191
192 * the --collect command-line option which is a regex
193 * the --ignore command-line option which is a regex
194 * transport which is a string describing the type of plugin
195
196 @param device: device to collect against
197 @type device: string
198 @param transport: python, ssh, snmp, telnet, cmd
199 @type transport: string
200 @return: results of the plugin
201 @type: string
202 @todo: determine if an event for the collector AND the device should be sent
203 """
204 plugins = []
205 valid_loaders = []
206 for loader in device.plugins:
207 try:
208 plugin= loader.create()
209 self.log.debug( "Loaded plugin %s" % plugin.name() )
210 plugins.append( plugin )
211 valid_loaders.append( loader )
212
213 except (SystemExit, KeyboardInterrupt), ex:
214 self.log.info( "Interrupted by external signal (%s)" % str(ex) )
215 raise
216
217 except Plugins.PluginImportError, import_error:
218 import socket
219 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
220 collector_host= socket.gethostname()
221
222
223
224 evt= { "eventClass":"/Status/Update", "component":component,
225 "agent":collector_host, "device":collector_host,
226 "severity":Error }
227
228 info= "Problem loading plugin %s" % import_error.plugin
229 self.log.error( info )
230 evt[ 'summary' ]= info
231
232 info= import_error.traceback
233 self.log.error( info )
234 evt[ 'message' ]= info
235
236 info= ("Due to import errors, removing the %s plugin"
237 " from this collection cycle.") % import_error.plugin
238 self.log.error( info )
239 evt[ 'message' ] += "%s\n" % info
240 self.sendEvent( evt )
241
242
243
244 if len( device.plugins ) != len( valid_loaders ):
245 device.plugins= valid_loaders
246
247
248
249 collectTest = lambda x: False
250 ignoreTest = lambda x: False
251 if self.options.collectPlugins:
252 collectTest = re.compile(self.options.collectPlugins).search
253 elif self.options.ignorePlugins:
254 ignoreTest = re.compile(self.options.ignorePlugins).search
255
256 result = []
257 for plugin in plugins:
258 if plugin.transport != transport:
259 continue
260 name = plugin.name()
261 if ignoreTest(name):
262 self.log.debug("Ignoring %s on %s because of --ignore flag",
263 name, device.id)
264 elif collectTest(name):
265 self.log.debug("Using %s on %s because of --collect flag",
266 name, device.id)
267 result.append(plugin)
268 elif not self.options.collectPlugins:
269 self.log.debug("Using %s on %s", name, device.id)
270 result.append(plugin)
271 return result
272
273
274
276 """
277 Collect data from a single device.
278
279 @param device: device to collect against
280 @type device: string
281 """
282 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180)
283 ip = device.manageIp
284 timeout = clientTimeout + time.time()
285 self.wmiCollect(device, ip, timeout)
286 self.pythonCollect(device, ip, timeout)
287 self.cmdCollect(device, ip, timeout)
288 self.snmpCollect(device, ip, timeout)
289 self.portscanCollect(device, ip, timeout)
290
291
292
294 """
295 Start the Windows Management Instrumentation (WMI) collector
296
297 @param device: device to collect against
298 @type device: string
299 @param ip: IP address of device to collect against
300 @type ip: string
301 @param timeout: timeout before failing the connection
302 @type timeout: integer
303 """
304 if self.options.nowmi:
305 return
306
307 client = None
308 try:
309 plugins = self.selectPlugins(device, 'wmi')
310 if not plugins:
311 self.log.info("No WMI plugins found for %s" % device.id)
312 return
313 if self.checkCollection(device):
314 self.log.info('WMI collector method for device %s' % device.id)
315 self.log.info("plugins: %s",
316 ", ".join(map(lambda p: p.name(), plugins)))
317 client = WMIClient(device, self, plugins)
318 if not client or not plugins:
319 self.log.warn("WMI collector creation failed")
320 return
321 except (SystemExit, KeyboardInterrupt):
322 raise
323 except Exception:
324 self.log.exception("Error opening WMI collector")
325 self.addClient(client, timeout, 'WMI', device.id)
326
327
328
330 """
331 Start local Python collection client.
332
333 @param device: device to collect against
334 @type device: string
335 @param ip: IP address of device to collect against
336 @type ip: string
337 @param timeout: timeout before failing the connection
338 @type timeout: integer
339 """
340 client = None
341 try:
342 plugins = self.selectPlugins(device, "python")
343 if not plugins:
344 self.log.info("No Python plugins found for %s" % device.id)
345 return
346 if self.checkCollection(device):
347 self.log.info('Python collection device %s' % device.id)
348 self.log.info("plugins: %s",
349 ", ".join(map(lambda p: p.name(), plugins)))
350 client = PythonClient(device, self, plugins)
351 if not client or not plugins:
352 self.log.warn("Python client creation failed")
353 return
354 except (SystemExit, KeyboardInterrupt): raise
355 except:
356 self.log.exception("Error opening pythonclient")
357 self.addClient(client, timeout, 'python', device.id)
358
359
361 """
362 Start shell command collection client.
363
364 @param device: device to collect against
365 @type device: string
366 @param ip: IP address of device to collect against
367 @type ip: string
368 @param timeout: timeout before failing the connection
369 @type timeout: integer
370 """
371 client = None
372 clientType = 'snmp'
373
374 hostname = device.id
375 try:
376 plugins = self.selectPlugins(device,"command")
377 if not plugins:
378 self.log.info("No command plugins found for %s" % hostname)
379 return
380
381 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
382 commandPort = getattr(device, 'zCommandPort', defaultPort)
383
384 if protocol == "ssh":
385 client = SshClient(hostname, ip, commandPort,
386 options=self.options,
387 plugins=plugins, device=device,
388 datacollector=self, isLoseConnection=True)
389 clientType = 'ssh'
390 self.log.info('Using SSH collection method for device %s'
391 % hostname)
392
393 elif protocol == 'telnet':
394 if commandPort == 22: commandPort = 23
395 client = TelnetClient(hostname, ip, commandPort,
396 options=self.options,
397 plugins=plugins, device=device,
398 datacollector=self)
399 clientType = 'telnet'
400 self.log.info('Using telnet collection method for device %s'
401 % hostname)
402
403 else:
404 info = ("Unknown protocol %s for device %s -- "
405 "defaulting to %s collection method" %
406 (protocol, hostname, clientType ))
407 self.log.warn( info )
408 import socket
409 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
410 collector_host= socket.gethostname()
411 evt= { "eventClass":"/Status/Update", "agent":collector_host,
412 "device":hostname, "severity":Error }
413 evt[ 'summary' ]= info
414 self.sendEvent( evt )
415 return
416
417 if not client:
418 self.log.warn("Shell command collector creation failed")
419 else:
420 self.log.info("plugins: %s",
421 ", ".join(map(lambda p: p.name(), plugins)))
422 except (SystemExit, KeyboardInterrupt): raise
423 except:
424 self.log.exception("Error opening command collector")
425 self.addClient(client, timeout, clientType, device.id)
426
427
428
430 """
431 Start SNMP collection client.
432
433 @param device: device to collect against
434 @type device: string
435 @param ip: IP address of device to collect against
436 @type ip: string
437 @param timeout: timeout before failing the connection
438 @type timeout: integer
439 """
440 client = None
441 try:
442 hostname = device.id
443 if getattr( device, "zSnmpMonitorIgnore", True ):
444 self.log.info("SNMP monitoring off for %s" % hostname)
445 return
446
447 if not ip:
448 self.log.info("No manage IP for %s" % hostname)
449 return
450
451 plugins = []
452 plugins = self.selectPlugins(device,"snmp")
453 if not plugins:
454 self.log.info("No SNMP plugins found for %s" % hostname)
455 return
456
457 if self.checkCollection(device):
458 self.log.info('SNMP collection device %s' % hostname)
459 self.log.info("plugins: %s",
460 ", ".join(map(lambda p: p.name(), plugins)))
461 client = SnmpClient(device.id, ip, self.options,
462 device, self, plugins)
463 if not client or not plugins:
464 self.log.warn("SNMP collector creation failed")
465 return
466 except (SystemExit, KeyboardInterrupt): raise
467 except:
468 self.log.exception("Error opening the SNMP collector")
469 self.addClient(client, timeout, 'SNMP', device.id)
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506 - def addClient(self, device, timeout, clientType, name):
507 """
508 If device is not None, schedule the device to be collected.
509 Otherwise log an error.
510
511 @param device: device to collect against
512 @type device: string
513 @param timeout: timeout before failing the connection
514 @type timeout: integer
515 @param clientType: description of the plugin type
516 @type clientType: string
517 @param name: plugin name
518 @type name: string
519 """
520 if device:
521 device.timeout = timeout
522 device.timedOut = False
523 self.clients.append(device)
524 device.run()
525 else:
526 self.log.warn('Unable to create a %s collector for %s',
527 clientType, name)
528
529
530
532 """
533 Start portscan collection client.
534
535 @param device: device to collect against
536 @type device: string
537 @param ip: IP address of device to collect against
538 @type ip: string
539 @param timeout: timeout before failing the connection
540 @type timeout: integer
541 """
542 client = None
543 try:
544 hostname = device.id
545 plugins = self.selectPlugins(device, "portscan")
546 if not plugins:
547 self.log.info("No portscan plugins found for %s" % hostname)
548 return
549 if self.checkCollection(device):
550 self.log.info('Portscan collector method for device %s'
551 % hostname)
552 self.log.info("plugins: %s",
553 ", ".join(map(lambda p: p.name(), plugins)))
554 client = PortscanClient(device.id, ip, self.options,
555 device, self, plugins)
556 if not client or not plugins:
557 self.log.warn("Portscan collector creation failed")
558 return
559 except (SystemExit, KeyboardInterrupt): raise
560 except:
561 self.log.exception("Error opening portscan collector")
562 self.addClient(client, timeout, 'portscan', device.id)
563
564
566 """
567 See how old the data is that we've collected
568
569 @param device: device to collect against
570 @type device: string
571 @return: is the SNMP status number > 0 and is the last collection time + collage older than now?
572 @type: boolean
573 """
574 age = device.getSnmpLastCollection() + self.collage
575 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime():
576 self.log.info("Skipped collection of %s" % device.id)
577 return False
578 return True
579
580
582 """
583 Callback that processes the return values from a device.
584 Python iterable.
585
586 @param collectorClient: collector instance
587 @type collectorClient: collector class
588 @return: Twisted deferred object
589 @type: Twisted deferred object
590 """
591 device = collectorClient.device
592 self.log.debug("Client for %s finished collecting", device.id)
593 def processClient(driver):
594 try:
595 pluginStats = {}
596 self.log.debug("Processing data for device %s", device.id)
597 devchanged = False
598 maps = []
599 for plugin, results in collectorClient.getResults():
600 if plugin is None: continue
601 self.log.debug("Processing plugin %s on device %s ...",
602 plugin.name(), device.id)
603 if not results:
604 self.log.warn("The plugin %s returned no results.",
605 plugin.name())
606 continue
607
608 datamaps = []
609 try:
610 results = plugin.preprocess(results, self.log)
611 if results:
612 datamaps = plugin.process(device, results, self.log)
613 if datamaps:
614 pluginStats.setdefault(plugin.name(), plugin.weight)
615
616 except (SystemExit, KeyboardInterrupt), ex:
617 self.log.info( "Plugin %s terminated due to external"
618 " signal (%s)" % (plugin.name(), str(ex) )
619 )
620 continue
621
622 except Exception, ex:
623
624
625
626
627 import socket
628 component= os.path.splitext(
629 os.path.basename( sys.argv[0] )
630 )[0]
631 collector_host= socket.gethostname()
632 evt= { "eventClass":"/Status/Update",
633 "agent":collector_host, "device":device.id,
634 "severity":Error }
635
636 info= "Problem while executing plugin %s" %plugin.name()
637 self.log.error( info )
638 evt[ 'summary' ]= info
639
640 info= traceback.format_exc()
641 self.log.error( info )
642 evt[ 'message' ]= info
643 self.sendEvent( evt )
644 continue
645
646
647 if type(datamaps) not in (types.ListType, types.TupleType):
648 datamaps = [datamaps,]
649 if datamaps:
650 maps += [m for m in datamaps if m]
651 if maps:
652 deviceClass = Classifier.classifyDevice(pluginStats,
653 self.classCollectorPlugins)
654 yield self.config().callRemote(
655 'applyDataMaps', device.id,
656 maps, deviceClass)
657 if driver.next():
658 devchanged = True
659 if devchanged:
660 self.log.info("Changes in configuration applied")
661 else:
662 self.log.info("No change in configuration detected")
663 yield self.config().callRemote('setSnmpLastCollection',
664 device.id)
665 driver.next()
666 except Exception, ex:
667 self.log.exception(ex)
668 raise
669
670 def processClientFinished(result):
671 """
672 Called after the client collection finishes
673
674 @param result: object (unused)
675 @type result: object
676 """
677 if not result:
678 self.log.debug("Client %s finished" % device.id)
679 else:
680 self.log.error("Client %s finished with message: %s" %
681 (device.id, result))
682 try:
683 self.clients.remove(collectorClient)
684 self.finished.append(collectorClient)
685 except ValueError:
686 self.log.debug("Client %s not found in in the list"
687 " of active clients",
688 device.id)
689 d = drive(self.fillCollectionSlots)
690 d.addErrback(self.fillError)
691
692 d = drive(processClient)
693 d.addBoth(processClientFinished)
694
695
696
698 """
699 Twisted errback routine to log an error when
700 unable to collect some data
701
702 @param reason: error message
703 @type reason: string
704 """
705 self.log.error("Unable to fill collection slots: %s" % reason)
706
707
709 """
710 Return our cycle time (in minutes)
711
712 @return: cycle time
713 @rtype: integer
714 """
715 return self.modelerCycleInterval * 60
716
717
740
741
743 """
744 Check to see if there's anything to do.
745 If there isn't, report our statistics and exit.
746
747 @param unused: unused (unused)
748 @type unused: string
749 """
750 if self.clients: return
751 if self.devicegen: return
752
753 if self.start:
754 runTime = time.time() - self.start
755 self.start = None
756 self.log.info("Scan time: %0.2f seconds", runTime)
757 devices = len(self.finished)
758 timedOut = len([c for c in self.finished if c.timedOut])
759 self.sendEvents(
760 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) +
761 self.rrdStats.gauge('devices', self.cycleTime(), devices) +
762 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut)
763 )
764 if not self.options.cycle:
765 self.stop()
766 self.finished = []
767
768
770 """
771 An iterator which either returns a device to collect or
772 calls checkStop()
773
774 @param driver: driver object
775 @type driver: driver object
776 """
777 count = len(self.clients)
778 while count < self.options.parallel and self.devicegen \
779 and not self.pendingNewClients:
780
781 self.pendingNewClients = True
782 try:
783 device = self.devicegen.next()
784 yield self.config().callRemote('getDeviceConfig', [device])
785
786 devices = driver.next()
787 if devices:
788 self.collectDevice(devices[0])
789 except StopIteration:
790 self.devicegen = None
791
792 self.pendingNewClients = False
793 break
794
795 update = len(self.clients)
796 if update != count and update != 1:
797 self.log.info('Running %d clients', update)
798 else:
799 self.log.debug('Running %d clients', update)
800 self.checkStop()
801
802
804 """
805 Build our list of command-line options
806 """
807 PBDaemon.buildOptions(self)
808 self.parser.add_option('--debug',
809 dest='debug', action="store_true", default=False,
810 help="Don't fork threads for processing")
811 self.parser.add_option('--nowmi',
812 dest='nowmi', action="store_true", default=False,
813 help="Do not execute WMI plugins")
814 self.parser.add_option('--parallel', dest='parallel',
815 type='int', default=defaultParallel,
816 help="Number of devices to collect from in parallel")
817 self.parser.add_option('--cycletime',
818 dest='cycletime',default=720,type='int',
819 help="Run collection every x minutes")
820 self.parser.add_option('--ignore',
821 dest='ignorePlugins',default="",
822 help="Modeler plugins to ignore. Takes a regular expression")
823 self.parser.add_option('--collect',
824 dest='collectPlugins',default="",
825 help="Modeler plugins to use. Takes a regular expression")
826 self.parser.add_option('-p', '--path', dest='path',
827 help="Start class path for collection ie /NetworkDevices")
828 self.parser.add_option('-d', '--device', dest='device',
829 help="Fully qualified device name ie www.confmon.com")
830 self.parser.add_option('-a', '--collage',
831 dest='collage', default=0, type='float',
832 help="Do not collect from devices whose collect date " +
833 "is within this many minutes")
834 self.parser.add_option('--writetries',
835 dest='writetries',default=2,type='int',
836 help="Number of times to try to write if a "
837 "read conflict is found")
838
839 self.parser.add_option("-F", "--force",
840 dest="force", action='store_true', default=True,
841 help="Force collection of config data (deprecated)")
842 self.parser.add_option('--portscantimeout', dest='portscantimeout',
843 type='int', default=defaultPortScanTimeout,
844 help="Time to wait for connection failures when port scanning")
845 self.parser.add_option('--now',
846 dest='now', action="store_true", default=False,
847 help="Start daemon now, do not sleep before starting")
848 TCbuildOptions(self.parser, self.usage)
849 addNTLMv2Option(self.parser)
850
851
852
854 """
855 Check what the user gave us vs what we'll accept
856 for command-line options
857 """
858 if not self.options.path and not self.options.device:
859 self.options.path = "/Devices"
860 if self.options.ignorePlugins and self.options.collectPlugins:
861 raise SystemExit( "Only one of --ignore or --collect"
862 " can be used at a time")
863 setNTLMv2Auth(self.options)
864
866 """
867 The guts of the timeoutClients method (minus the twisted reactor
868 stuff). Breaking this part out as a separate method facilitates unit
869 testing.
870 """
871 active = []
872 for client in self.clients:
873 if client.timeout < time.time():
874 self.log.warn("Client %s timeout", client.hostname)
875 self.finished.append(client)
876 client.timedOut = True
877 client.stop()
878 else:
879 active.append(client)
880 self.clients = active
881
882
883
897
898
899
901 """
902 Twisted main loop
903 """
904 reactor.startRunning()
905 while reactor.running:
906 try:
907 while reactor.running:
908 reactor.runUntilCurrent()
909 timeout = reactor.timeout()
910 reactor.doIteration(timeout)
911 except:
912 if reactor.running:
913 self.log.exception("Unexpected error in main loop.")
914
915
916
918 """
919 Get the list of devices for which we are collecting:
920 * if -d devicename was used, use the devicename
921 * if a class path flag was supplied, gather the devices
922 along that organizer
923 * otherwise get all of the devices associated with our collector
924
925 @return: list of devices
926 @rtype: list
927 """
928 if self.options.device:
929 self.log.info("Collecting for device %s", self.options.device)
930 return succeed([self.options.device])
931
932 self.log.info("Collecting for path %s", self.options.path)
933 return self.config().callRemote('getDeviceListByOrganizer',
934 self.options.path,
935 self.options.monitor)
936
937
938 - def mainLoop(self, driver):
939 """
940 Main collection loop, a Python iterable
941
942 @param driver: driver object
943 @type driver: driver object
944 @return: Twisted deferred object
945 @rtype: Twisted deferred object
946 """
947 if self.options.cycle:
948 driveLater(self.cycleTime(), self.mainLoop)
949
950 if self.clients:
951 self.log.error("Modeling cycle taking too long")
952 return
953
954 self.start = time.time()
955
956 self.log.debug("Starting collector loop...")
957 yield self.getDeviceList()
958 self.devicegen = iter(driver.next())
959 d = drive(self.fillCollectionSlots)
960 d.addErrback(self.fillError)
961 yield d
962 driver.next()
963 self.log.debug("Collection slots filled")
964
965
966
967 - def main(self, unused=None):
968 """
969 Wrapper around the mainLoop
970
971 @param unused: unused (unused)
972 @type unused: string
973 @return: Twisted deferred object
974 @rtype: Twisted deferred object
975 """
976 self.finished = []
977 d = drive(self.mainLoop)
978 d.addCallback(self.timeoutClients)
979 return d
980
981
982
984 """
985 Stub function
986
987 @param device: device name (unused)
988 @type device: string
989 @todo: implement
990 """
991
992 self.log.debug("Asynch deleteDevice %s" % device)
993
994
995 if __name__ == '__main__':
996 dc = ZenModeler()
997 dc.processOptions()
998 reactor.run = dc.reactorLoop
999 dc.run()
1000