1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """Discover (aka model) a device and its components.
15 For instance, find out what Ethernet interfaces and hard disks a server
16 has available.
17 This information should change much less frequently than performance metrics.
18 """
19
20
21
22
23
24 import pysamba.twisted.reactor
25
26 import Globals
27 from Products.ZenWin.WMIClient import WMIClient
28 from Products.ZenWin.utils import addNTLMv2Option, setNTLMv2Auth
29 from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon
30 from Products.ZenUtils.DaemonStats import DaemonStats
31 from Products.ZenUtils.Driver import drive, driveLater
32 from Products.ZenUtils.Utils import unused
33 from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
34
35 from PythonClient import PythonClient
36 from SshClient import SshClient
37 from TelnetClient import TelnetClient, buildOptions as TCbuildOptions
38 from SnmpClient import SnmpClient
39 from PortscanClient import PortscanClient
40
41 from Products.DataCollector import Classifier
42
43 from twisted.internet import reactor
44 from twisted.internet.defer import succeed
45
46 import time
47 import types
48 import re
49 import DateTime
50
51 import os
52 import os.path
53 import sys
54 import traceback
55 from random import randint
56
57 defaultPortScanTimeout = 5
58 defaultParallel = 1
59 defaultProtocol = "ssh"
60 defaultPort = 22
61
62
63 from Products.DataCollector import DeviceProxy
64 from Products.DataCollector import Plugins
65 unused(DeviceProxy, Plugins)
66
68 """
69 Daemon class to attach to zenhub and pass along
70 device configuration information.
71 """
72
73 name = 'zenmodeler'
74 initialServices = PBDaemon.initialServices + ['ModelerService']
75
76 generateEvents = True
77 configCycleInterval = 360
78
79 classCollectorPlugins = ()
80
82 """
83 Initalizer
84
85 @param single: collect from a single device?
86 @type single: boolean
87 """
88 PBDaemon.__init__(self)
89
90 self.options.force = True
91 self.start = None
92 self.rrdStats = DaemonStats()
93 self.single = single
94 if self.options.device:
95 self.single = True
96 self.modelerCycleInterval = self.options.cycletime
97 self.collage = float( self.options.collage ) / 1440.0
98 self.pendingNewClients = False
99 self.clients = []
100 self.finished = []
101 self.devicegen = None
102
103
104 self.started = False
105 self.startDelay = 0
106 if self.options.daemon:
107 if self.options.now:
108 self.log.debug("Run as a daemon, starting immediately.")
109 else:
110
111 self.startDelay = randint(10, 60) * 1
112 self.log.info("Run as a daemon, waiting %s seconds to start." %
113 self.startDelay)
114 else:
115 self.log.debug("Run in foreground, starting immediately.")
116
117
119 """
120 Log errors that have occurred
121
122 @param error: error message
123 @type error: string
124 """
125 self.log.error("Error occured: %s", error)
126
127
129 """
130 Called after connected to the zenhub service
131 """
132 d = self.configure()
133 d.addCallback(self.heartbeat)
134 d.addErrback(self.reportError)
135
136
177
178 return drive(inner)
179
180
186
187
189 """
190 Build a list of active plugins for a device, based on:
191
192 * the --collect command-line option which is a regex
193 * the --ignore command-line option which is a regex
194 * transport which is a string describing the type of plugin
195
196 @param device: device to collect against
197 @type device: string
198 @param transport: python, ssh, snmp, telnet, cmd
199 @type transport: string
200 @return: results of the plugin
201 @type: string
202 @todo: determine if an event for the collector AND the device should be sent
203 """
204 plugins = []
205 valid_loaders = []
206 for loader in device.plugins:
207 try:
208 plugin= loader.create()
209 self.log.debug( "Loaded plugin %s" % plugin.name() )
210 plugins.append( plugin )
211 valid_loaders.append( loader )
212
213 except (SystemExit, KeyboardInterrupt), ex:
214 self.log.info( "Interrupted by external signal (%s)" % str(ex) )
215 raise
216
217 except Plugins.PluginImportError, import_error:
218 import socket
219 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
220 collector_host= socket.gethostname()
221
222
223
224 evt= { "eventClass":"/Status/Update", "component":component,
225 "agent":collector_host, "device":collector_host,
226 "severity":Error }
227
228 info= "Problem loading plugin %s" % import_error.plugin
229 self.log.error( info )
230 evt[ 'summary' ]= info
231
232 info= import_error.traceback
233 self.log.error( info )
234 evt[ 'message' ]= info
235
236 info= ("Due to import errors, removing the %s plugin"
237 " from this collection cycle.") % import_error.plugin
238 self.log.error( info )
239 evt[ 'message' ] += "%s\n" % info
240 self.sendEvent( evt )
241
242
243
244 if len( device.plugins ) != len( valid_loaders ):
245 device.plugins= valid_loaders
246
247
248
249 collectTest = lambda x: False
250 ignoreTest = lambda x: False
251 if self.options.collectPlugins:
252 collectTest = re.compile(self.options.collectPlugins).search
253 elif self.options.ignorePlugins:
254 ignoreTest = re.compile(self.options.ignorePlugins).search
255
256 result = []
257 for plugin in plugins:
258 if plugin.transport != transport:
259 continue
260 name = plugin.name()
261 if ignoreTest(name):
262 self.log.debug("Ignoring %s on %s because of --ignore flag",
263 name, device.id)
264 elif collectTest(name):
265 self.log.debug("Using %s on %s because of --collect flag",
266 name, device.id)
267 result.append(plugin)
268 elif not self.options.collectPlugins:
269 self.log.debug("Using %s on %s", name, device.id)
270 result.append(plugin)
271 return result
272
273
274
276 """
277 Collect data from a single device.
278
279 @param device: device to collect against
280 @type device: string
281 """
282 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180)
283 ip = device.manageIp
284 timeout = clientTimeout + time.time()
285 self.wmiCollect(device, ip, timeout)
286 self.pythonCollect(device, ip, timeout)
287 self.cmdCollect(device, ip, timeout)
288 self.snmpCollect(device, ip, timeout)
289 self.portscanCollect(device, ip, timeout)
290
291
292
294 """
295 Start the Windows Management Instrumentation (WMI) collector
296
297 @param device: device to collect against
298 @type device: string
299 @param ip: IP address of device to collect against
300 @type ip: string
301 @param timeout: timeout before failing the connection
302 @type timeout: integer
303 """
304 if self.options.nowmi:
305 return
306
307 client = None
308 try:
309 plugins = self.selectPlugins(device, 'wmi')
310 if not plugins:
311 self.log.info("No WMI plugins found for %s" % device.id)
312 return
313 if self.checkCollection(device):
314 self.log.info('WMI collector method for device %s' % device.id)
315 self.log.info("plugins: %s",
316 ", ".join(map(lambda p: p.name(), plugins)))
317 client = WMIClient(device, self, plugins)
318 if not client or not plugins:
319 self.log.warn("WMI collector creation failed")
320 return
321 except (SystemExit, KeyboardInterrupt):
322 raise
323 except Exception:
324 self.log.exception("Error opening WMI collector")
325 self.addClient(client, timeout, 'WMI', device.id)
326
327
328
330 """
331 Start local Python collection client.
332
333 @param device: device to collect against
334 @type device: string
335 @param ip: IP address of device to collect against
336 @type ip: string
337 @param timeout: timeout before failing the connection
338 @type timeout: integer
339 """
340 client = None
341 try:
342 plugins = self.selectPlugins(device, "python")
343 if not plugins:
344 self.log.info("No Python plugins found for %s" % device.id)
345 return
346 if self.checkCollection(device):
347 self.log.info('Python collection device %s' % device.id)
348 self.log.info("plugins: %s",
349 ", ".join(map(lambda p: p.name(), plugins)))
350 client = PythonClient(device, self, plugins)
351 if not client or not plugins:
352 self.log.warn("Python client creation failed")
353 return
354 except (SystemExit, KeyboardInterrupt): raise
355 except:
356 self.log.exception("Error opening pythonclient")
357 self.addClient(client, timeout, 'python', device.id)
358
359
361 """
362 Start shell command collection client.
363
364 @param device: device to collect against
365 @type device: string
366 @param ip: IP address of device to collect against
367 @type ip: string
368 @param timeout: timeout before failing the connection
369 @type timeout: integer
370 """
371 client = None
372 clientType = 'snmp'
373
374 hostname = device.id
375 try:
376 plugins = self.selectPlugins(device,"command")
377 if not plugins:
378 self.log.info("No command plugins found for %s" % hostname)
379 return
380
381 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
382 commandPort = getattr(device, 'zCommandPort', defaultPort)
383
384 if protocol == "ssh":
385 client = SshClient(hostname, ip, commandPort,
386 options=self.options,
387 plugins=plugins, device=device,
388 datacollector=self, isLoseConnection=True)
389 clientType = 'ssh'
390 self.log.info('Using SSH collection method for device %s'
391 % hostname)
392
393 elif protocol == 'telnet':
394 if commandPort == 22: commandPort = 23
395 client = TelnetClient(hostname, ip, commandPort,
396 options=self.options,
397 plugins=plugins, device=device,
398 datacollector=self)
399 clientType = 'telnet'
400 self.log.info('Using telnet collection method for device %s'
401 % hostname)
402
403 else:
404 info = ("Unknown protocol %s for device %s -- "
405 "defaulting to %s collection method" %
406 (protocol, hostname, clientType ))
407 self.log.warn( info )
408 import socket
409 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
410 collector_host= socket.gethostname()
411 evt= { "eventClass":"/Status/Update", "agent":collector_host,
412 "device":hostname, "severity":Error }
413 evt[ 'summary' ]= info
414 self.sendEvent( evt )
415 return
416
417 if not client:
418 self.log.warn("Shell command collector creation failed")
419 else:
420 self.log.info("plugins: %s",
421 ", ".join(map(lambda p: p.name(), plugins)))
422 except (SystemExit, KeyboardInterrupt): raise
423 except:
424 self.log.exception("Error opening command collector")
425 self.addClient(client, timeout, clientType, device.id)
426
427
428
430 """
431 Start SNMP collection client.
432
433 @param device: device to collect against
434 @type device: string
435 @param ip: IP address of device to collect against
436 @type ip: string
437 @param timeout: timeout before failing the connection
438 @type timeout: integer
439 """
440 client = None
441 try:
442 hostname = device.id
443 if getattr( device, "zSnmpMonitorIgnore", True ):
444 self.log.info("SNMP monitoring off for %s" % hostname)
445 return
446
447 if not ip:
448 self.log.info("No manage IP for %s" % hostname)
449 return
450
451 plugins = []
452 plugins = self.selectPlugins(device,"snmp")
453 if not plugins:
454 self.log.info("No SNMP plugins found for %s" % hostname)
455 return
456
457 if self.checkCollection(device):
458 self.log.info('SNMP collection device %s' % hostname)
459 self.log.info("plugins: %s",
460 ", ".join(map(lambda p: p.name(), plugins)))
461 client = SnmpClient(device.id, ip, self.options,
462 device, self, plugins)
463 if not client or not plugins:
464 self.log.warn("SNMP collector creation failed")
465 return
466 except (SystemExit, KeyboardInterrupt): raise
467 except:
468 self.log.exception("Error opening the SNMP collector")
469 self.addClient(client, timeout, 'SNMP', device.id)
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506 - def addClient(self, device, timeout, clientType, name):
507 """
508 If device is not None, schedule the device to be collected.
509 Otherwise log an error.
510
511 @param device: device to collect against
512 @type device: string
513 @param timeout: timeout before failing the connection
514 @type timeout: integer
515 @param clientType: description of the plugin type
516 @type clientType: string
517 @param name: plugin name
518 @type name: string
519 """
520 if device:
521 device.timeout = timeout
522 device.timedOut = False
523 self.clients.append(device)
524 device.run()
525 else:
526 self.log.warn('Unable to create a %s collector for %s',
527 clientType, name)
528
529
530
532 """
533 Start portscan collection client.
534
535 @param device: device to collect against
536 @type device: string
537 @param ip: IP address of device to collect against
538 @type ip: string
539 @param timeout: timeout before failing the connection
540 @type timeout: integer
541 """
542 client = None
543 try:
544 hostname = device.id
545 plugins = self.selectPlugins(device, "portscan")
546 if not plugins:
547 self.log.info("No portscan plugins found for %s" % hostname)
548 return
549 if self.checkCollection(device):
550 self.log.info('Portscan collector method for device %s'
551 % hostname)
552 self.log.info("plugins: %s",
553 ", ".join(map(lambda p: p.name(), plugins)))
554 client = PortscanClient(device.id, ip, self.options,
555 device, self, plugins)
556 if not client or not plugins:
557 self.log.warn("Portscan collector creation failed")
558 return
559 except (SystemExit, KeyboardInterrupt): raise
560 except:
561 self.log.exception("Error opening portscan collector")
562 self.addClient(client, timeout, 'portscan', device.id)
563
564
566 """
567 See how old the data is that we've collected
568
569 @param device: device to collect against
570 @type device: string
571 @return: is the SNMP status number > 0 and is the last collection time + collage older than now?
572 @type: boolean
573 """
574 age = device.getSnmpLastCollection() + self.collage
575 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime():
576 self.log.info("Skipped collection of %s" % device.id)
577 return False
578 return True
579
580
582 """
583 Callback that processes the return values from a device.
584 Python iterable.
585
586 @param collectorClient: collector instance
587 @type collectorClient: collector class
588 @return: Twisted deferred object
589 @type: Twisted deferred object
590 """
591 device = collectorClient.device
592 self.log.debug("Client for %s finished collecting", device.id)
593 def processClient(driver):
594 try:
595 pluginStats = {}
596 self.log.debug("Processing data for device %s", device.id)
597 devchanged = False
598 maps = []
599 for plugin, results in collectorClient.getResults():
600 if plugin is None: continue
601 self.log.debug("Processing plugin %s on device %s ...",
602 plugin.name(), device.id)
603 if not results:
604 self.log.warn("The plugin %s returned no results.",
605 plugin.name())
606 continue
607
608 self.log.debug("Plugin %s results = %s", plugin.name(), results)
609 datamaps = []
610 try:
611 results = plugin.preprocess(results, self.log)
612 if results:
613 datamaps = plugin.process(device, results, self.log)
614 if datamaps:
615 pluginStats.setdefault(plugin.name(), plugin.weight)
616
617 except (SystemExit, KeyboardInterrupt), ex:
618 self.log.info( "Plugin %s terminated due to external"
619 " signal (%s)" % (plugin.name(), str(ex) )
620 )
621 continue
622
623 except Exception, ex:
624
625
626
627
628 import socket
629 component= os.path.splitext(
630 os.path.basename( sys.argv[0] )
631 )[0]
632 collector_host= socket.gethostname()
633 evt= { "eventClass":"/Status/Update",
634 "agent":collector_host, "device":device.id,
635 "severity":Error }
636
637 info= "Problem while executing plugin %s" %plugin.name()
638 self.log.error( info )
639 evt[ 'summary' ]= info
640
641 info= traceback.format_exc()
642 self.log.error( info )
643 evt[ 'message' ]= info
644 self.sendEvent( evt )
645 continue
646
647
648 if type(datamaps) not in (types.ListType, types.TupleType):
649 datamaps = [datamaps,]
650 if datamaps:
651 maps += [m for m in datamaps if m]
652 if maps:
653 deviceClass = Classifier.classifyDevice(pluginStats,
654 self.classCollectorPlugins)
655 yield self.config().callRemote(
656 'applyDataMaps', device.id,
657 maps, deviceClass)
658 if driver.next():
659 devchanged = True
660 if devchanged:
661 self.log.info("Changes in configuration applied")
662 else:
663 self.log.info("No change in configuration detected")
664 yield self.config().callRemote('setSnmpLastCollection',
665 device.id)
666 driver.next()
667 except Exception, ex:
668 self.log.exception(ex)
669 raise
670
671 def processClientFinished(result):
672 """
673 Called after the client collection finishes
674
675 @param result: object (unused)
676 @type result: object
677 """
678 if not result:
679 self.log.debug("Client %s finished" % device.id)
680 else:
681 self.log.error("Client %s finished with message: %s" %
682 (device.id, result))
683 try:
684 self.clients.remove(collectorClient)
685 self.finished.append(collectorClient)
686 except ValueError:
687 self.log.debug("Client %s not found in in the list"
688 " of active clients",
689 device.id)
690 d = drive(self.fillCollectionSlots)
691 d.addErrback(self.fillError)
692
693 d = drive(processClient)
694 d.addBoth(processClientFinished)
695
696
697
699 """
700 Twisted errback routine to log an error when
701 unable to collect some data
702
703 @param reason: error message
704 @type reason: string
705 """
706 self.log.error("Unable to fill collection slots: %s" % reason)
707
708
710 """
711 Return our cycle time (in minutes)
712
713 @return: cycle time
714 @rtype: integer
715 """
716 return self.modelerCycleInterval * 60
717
718
741
742
744 """
745 Check to see if there's anything to do.
746 If there isn't, report our statistics and exit.
747
748 @param unused: unused (unused)
749 @type unused: string
750 """
751 if self.clients: return
752 if self.devicegen: return
753
754 if self.start:
755 runTime = time.time() - self.start
756 self.start = None
757 self.log.info("Scan time: %0.2f seconds", runTime)
758 devices = len(self.finished)
759 timedOut = len([c for c in self.finished if c.timedOut])
760 self.sendEvents(
761 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) +
762 self.rrdStats.gauge('devices', self.cycleTime(), devices) +
763 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut)
764 )
765 if not self.options.cycle:
766 self.stop()
767 self.finished = []
768
769
771 """
772 An iterator which either returns a device to collect or
773 calls checkStop()
774
775 @param driver: driver object
776 @type driver: driver object
777 """
778 count = len(self.clients)
779 while count < self.options.parallel and self.devicegen \
780 and not self.pendingNewClients:
781
782 self.pendingNewClients = True
783 try:
784 device = self.devicegen.next()
785 yield self.config().callRemote('getDeviceConfig', [device])
786
787 devices = driver.next()
788 if devices:
789 self.collectDevice(devices[0])
790 except StopIteration:
791 self.devicegen = None
792
793 self.pendingNewClients = False
794 break
795
796 update = len(self.clients)
797 if update != count and update != 1:
798 self.log.info('Running %d clients', update)
799 else:
800 self.log.debug('Running %d clients', update)
801 self.checkStop()
802
803
805 """
806 Build our list of command-line options
807 """
808 PBDaemon.buildOptions(self)
809 self.parser.add_option('--debug',
810 dest='debug', action="store_true", default=False,
811 help="Don't fork threads for processing")
812 self.parser.add_option('--nowmi',
813 dest='nowmi', action="store_true", default=False,
814 help="Do not execute WMI plugins")
815 self.parser.add_option('--parallel', dest='parallel',
816 type='int', default=defaultParallel,
817 help="Number of devices to collect from in parallel")
818 self.parser.add_option('--cycletime',
819 dest='cycletime',default=720,type='int',
820 help="Run collection every x minutes")
821 self.parser.add_option('--ignore',
822 dest='ignorePlugins',default="",
823 help="Modeler plugins to ignore. Takes a regular expression")
824 self.parser.add_option('--collect',
825 dest='collectPlugins',default="",
826 help="Modeler plugins to use. Takes a regular expression")
827 self.parser.add_option('-p', '--path', dest='path',
828 help="Start class path for collection ie /NetworkDevices")
829 self.parser.add_option('-d', '--device', dest='device',
830 help="Fully qualified device name ie www.confmon.com")
831 self.parser.add_option('-a', '--collage',
832 dest='collage', default=0, type='float',
833 help="Do not collect from devices whose collect date " +
834 "is within this many minutes")
835 self.parser.add_option('--writetries',
836 dest='writetries',default=2,type='int',
837 help="Number of times to try to write if a "
838 "read conflict is found")
839
840 self.parser.add_option("-F", "--force",
841 dest="force", action='store_true', default=True,
842 help="Force collection of config data (deprecated)")
843 self.parser.add_option('--portscantimeout', dest='portscantimeout',
844 type='int', default=defaultPortScanTimeout,
845 help="Time to wait for connection failures when port scanning")
846 self.parser.add_option('--now',
847 dest='now', action="store_true", default=False,
848 help="Start daemon now, do not sleep before starting")
849 TCbuildOptions(self.parser, self.usage)
850 addNTLMv2Option(self.parser)
851
852
853
855 """
856 Check what the user gave us vs what we'll accept
857 for command-line options
858 """
859 if not self.options.path and not self.options.device:
860 self.options.path = "/Devices"
861 if self.options.ignorePlugins and self.options.collectPlugins:
862 raise SystemExit( "Only one of --ignore or --collect"
863 " can be used at a time")
864 setNTLMv2Auth(self.options)
865
867 """
868 The guts of the timeoutClients method (minus the twisted reactor
869 stuff). Breaking this part out as a separate method facilitates unit
870 testing.
871 """
872 active = []
873 for client in self.clients:
874 if client.timeout < time.time():
875 self.log.warn("Client %s timeout", client.hostname)
876 self.finished.append(client)
877 client.timedOut = True
878 client.stop()
879 else:
880 active.append(client)
881 self.clients = active
882
883
884
898
899
900
902 """
903 Twisted main loop
904 """
905 reactor.startRunning()
906 while reactor.running:
907 try:
908 while reactor.running:
909 reactor.runUntilCurrent()
910 timeout = reactor.timeout()
911 reactor.doIteration(timeout)
912 except:
913 if reactor.running:
914 self.log.exception("Unexpected error in main loop.")
915
916
917
919 """
920 Get the list of devices for which we are collecting:
921 * if -d devicename was used, use the devicename
922 * if a class path flag was supplied, gather the devices
923 along that organizer
924 * otherwise get all of the devices associated with our collector
925
926 @return: list of devices
927 @rtype: list
928 """
929 if self.options.device:
930 self.log.info("Collecting for device %s", self.options.device)
931 return succeed([self.options.device])
932
933 self.log.info("Collecting for path %s", self.options.path)
934 return self.config().callRemote('getDeviceListByOrganizer',
935 self.options.path,
936 self.options.monitor)
937
938
939 - def mainLoop(self, driver):
940 """
941 Main collection loop, a Python iterable
942
943 @param driver: driver object
944 @type driver: driver object
945 @return: Twisted deferred object
946 @rtype: Twisted deferred object
947 """
948 if self.options.cycle:
949 driveLater(self.cycleTime(), self.mainLoop)
950
951 if self.clients:
952 self.log.error("Modeling cycle taking too long")
953 return
954
955 self.start = time.time()
956
957 self.log.debug("Starting collector loop...")
958 yield self.getDeviceList()
959 self.devicegen = iter(driver.next())
960 d = drive(self.fillCollectionSlots)
961 d.addErrback(self.fillError)
962 yield d
963 driver.next()
964 self.log.debug("Collection slots filled")
965
966
967
968 - def main(self, unused=None):
969 """
970 Wrapper around the mainLoop
971
972 @param unused: unused (unused)
973 @type unused: string
974 @return: Twisted deferred object
975 @rtype: Twisted deferred object
976 """
977 self.finished = []
978 d = drive(self.mainLoop)
979 d.addCallback(self.timeoutClients)
980 return d
981
982
983
985 """
986 Stub function
987
988 @param device: device name (unused)
989 @type device: string
990 @todo: implement
991 """
992
993 self.log.debug("Asynch deleteDevice %s" % device)
994
995
996 if __name__ == '__main__':
997 dc = ZenModeler()
998 dc.processOptions()
999 reactor.run = dc.reactorLoop
1000 dc.run()
1001