1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """Discover (aka model) a device and its components.
15 For instance, find out what Ethernet interfaces and hard disks a server
16 has available.
17 This information should change much less frequently than performance metrics.
18 """
19
20 import Globals
21 from Products.ZenWin.WMIClient import WMIClient
22 from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon
23 from Products.ZenUtils.DaemonStats import DaemonStats
24 from Products.ZenUtils.Driver import drive, driveLater
25 from Products.ZenUtils.Utils import unused
26 from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
27
28 from PythonClient import PythonClient
29 from SshClient import SshClient
30 from TelnetClient import TelnetClient, buildOptions as TCbuildOptions
31 from SnmpClient import SnmpClient
32 from PortscanClient import PortscanClient
33
34 from Products.DataCollector import Classifier
35
36 from twisted.internet import reactor
37 from twisted.internet.defer import succeed
38
39 import time
40 import types
41 import re
42 import DateTime
43
44 import os
45 import os.path
46 import sys
47 import traceback
48
49 defaultPortScanTimeout = 5
50 defaultParallel = 1
51 defaultProtocol = "ssh"
52 defaultPort = 22
53 defaultStartSleep = 10 * 60
54
55
56 from Products.DataCollector import DeviceProxy
57 from Products.DataCollector import Plugins
58 unused(DeviceProxy, Plugins)
59
61 """
62 Daemon class to attach to zenhub and pass along
63 device configuration information.
64 """
65
66 name = 'zenmodeler'
67 initialServices = PBDaemon.initialServices + ['ModelerService']
68
69 generateEvents = True
70 configCycleInterval = 360
71
72 classCollectorPlugins = ()
73
75 """
76 Initalizer
77
78 @param single: collect from a single device?
79 @type single: boolean
80 """
81 PBDaemon.__init__(self)
82
83 self.options.force = True
84 if self.options.daemon:
85 if self.options.now:
86 self.log.debug("Run as a daemon, starting immediately.")
87 else:
88 self.log.info("Run as a daemon, waiting %s sec to start." %
89 defaultStartSleep)
90 time.sleep(defaultStartSleep)
91 self.log.debug("Run as a daemon, slept %s sec, starting now." %
92 defaultStartSleep)
93 else:
94 self.log.debug("Run in foreground, starting immediately.")
95
96 self.start = None
97 self.rrdStats = DaemonStats()
98 self.single = single
99 if self.options.device:
100 self.single = True
101 self.modelerCycleInterval = self.options.cycletime
102 self.collage = float( self.options.collage ) / 1440.0
103 self.pendingNewClients = False
104 self.clients = []
105 self.finished = []
106 self.devicegen = None
107
108
109
111 """
112 Log errors that have occurred
113
114 @param error: error message
115 @type error: string
116 """
117 self.log.error("Error occured: %s", error)
118
119
121 """
122 Called after connected to the zenhub service
123 """
124 d = self.configure()
125 d.addCallback(self.heartbeat)
126 d.addErrback(self.reportError)
127 d.addCallback(self.main)
128
129
170
171 return drive(inner)
172
173
179
180
182 """
183 Build a list of active plugins for a device, based on:
184
185 * the --collect command-line option which is a regex
186 * the --ignore command-line option which is a regex
187 * transport which is a string describing the type of plugin
188
189 @param device: device to collect against
190 @type device: string
191 @param transport: python, ssh, snmp, telnet, cmd
192 @type transport: string
193 @return: results of the plugin
194 @type: string
195 @todo: determine if an event for the collector AND the device should be sent
196 """
197 plugins = []
198 valid_loaders = []
199 for loader in device.plugins:
200 try:
201 plugin= loader.create()
202 self.log.debug( "Loaded plugin %s" % plugin.name() )
203 plugins.append( plugin )
204 valid_loaders.append( loader )
205
206 except (SystemExit, KeyboardInterrupt), ex:
207 self.log.info( "Interrupted by external signal (%s)" % str(ex) )
208 raise
209
210 except Plugins.pluginImportError, import_error:
211 import socket
212 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
213 collector_host= socket.gethostname()
214
215
216
217 evt= { "eventClass":"/Status/Update", "component":component,
218 "agent":collector_host, "device":collector_host,
219 "severity":Error }
220
221 info= "Problem loading plugin %s" % import_error.plugin
222 self.log.error( info )
223 evt[ 'summary' ]= info
224
225 info= import_error.traceback
226 self.log.error( info )
227 evt[ 'message' ]= info
228
229 info= ("Due to import errors, removing the %s plugin"
230 " from this collection cycle.") % import_error.plugin
231 self.log.error( info )
232 evt[ 'message' ] += "%s\n" % info
233 self.sendEvent( evt )
234
235
236
237 if len( device.plugins ) != len( valid_loaders ):
238 device.plugins= valid_loaders
239
240
241
242 collectTest = lambda x: False
243 ignoreTest = lambda x: False
244 if self.options.collectPlugins:
245 collectTest = re.compile(self.options.collectPlugins).search
246 elif self.options.ignorePlugins:
247 ignoreTest = re.compile(self.options.ignorePlugins).search
248
249 result = []
250 for plugin in plugins:
251 if plugin.transport != transport:
252 continue
253 name = plugin.name()
254 if ignoreTest(name):
255 self.log.debug("Ignoring %s on %s because of --ignore flag",
256 name, device.id)
257 elif collectTest(name):
258 self.log.debug("Using %s on %s because of --collect flag",
259 name, device.id)
260 result.append(plugin)
261 elif not self.options.collectPlugins:
262 self.log.debug("Using %s on %s", name, device.id)
263 result.append(plugin)
264 return result
265
266
267
269 """
270 Collect data from a single device.
271
272 @param device: device to collect against
273 @type device: string
274 """
275 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180)
276 ip = device.manageIp
277 timeout = clientTimeout + time.time()
278 self.wmiCollect(device, ip, timeout)
279 self.pythonCollect(device, ip, timeout)
280 self.cmdCollect(device, ip, timeout)
281 self.snmpCollect(device, ip, timeout)
282 self.portscanCollect(device, ip, timeout)
283
284
285
287 """
288 Start the Windows Management Instrumentation (WMI) collector
289
290 @param device: device to collect against
291 @type device: string
292 @param ip: IP address of device to collect against
293 @type ip: string
294 @param timeout: timeout before failing the connection
295 @type timeout: integer
296 """
297 if self.options.nowmi:
298 return
299
300 client = None
301 try:
302 plugins = self.selectPlugins(device, 'wmi')
303 if not plugins:
304 self.log.info("No WMI plugins found for %s" % device.id)
305 return
306 if self.checkCollection(device):
307 self.log.info('WMI collector method for device %s' % device.id)
308 self.log.info("plugins: %s",
309 ", ".join(map(lambda p: p.name(), plugins)))
310 client = WMIClient(device, self, plugins)
311 if not client or not plugins:
312 self.log.warn("WMI collector creation failed")
313 return
314 except (SystemExit, KeyboardInterrupt):
315 raise
316 except Exception:
317 self.log.exception("Error opening WMI collector")
318 self.addClient(client, timeout, 'WMI', device.id)
319
320
321
323 """
324 Start local Python collection client.
325
326 @param device: device to collect against
327 @type device: string
328 @param ip: IP address of device to collect against
329 @type ip: string
330 @param timeout: timeout before failing the connection
331 @type timeout: integer
332 """
333 client = None
334 try:
335 plugins = self.selectPlugins(device, "python")
336 if not plugins:
337 self.log.info("No Python plugins found for %s" % device.id)
338 return
339 if self.checkCollection(device):
340 self.log.info('Python collection device %s' % device.id)
341 self.log.info("plugins: %s",
342 ", ".join(map(lambda p: p.name(), plugins)))
343 client = PythonClient(device, self, plugins)
344 if not client or not plugins:
345 self.log.warn("Python client creation failed")
346 return
347 except (SystemExit, KeyboardInterrupt): raise
348 except:
349 self.log.exception("Error opening pythonclient")
350 self.addClient(client, timeout, 'python', device.id)
351
352
354 """
355 Start shell command collection client.
356
357 @param device: device to collect against
358 @type device: string
359 @param ip: IP address of device to collect against
360 @type ip: string
361 @param timeout: timeout before failing the connection
362 @type timeout: integer
363 """
364 client = None
365 clientType = 'snmp'
366
367 hostname = device.id
368 try:
369 plugins = self.selectPlugins(device,"command")
370 if not plugins:
371 self.log.info("No command plugins found for %s" % hostname)
372 return
373
374 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
375 commandPort = getattr(device, 'zCommandPort', defaultPort)
376
377 if protocol == "ssh":
378 client = SshClient(hostname, ip, commandPort,
379 options=self.options,
380 plugins=plugins, device=device,
381 datacollector=self)
382 clientType = 'ssh'
383 self.log.info('Using SSH collection method for device %s'
384 % hostname)
385
386 elif protocol == 'telnet':
387 if commandPort == 22: commandPort = 23
388 client = TelnetClient(hostname, ip, commandPort,
389 options=self.options,
390 plugins=plugins, device=device,
391 datacollector=self)
392 clientType = 'telnet'
393 self.log.info('Using telnet collection method for device %s'
394 % hostname)
395
396 else:
397 info = ("Unknown protocol %s for device %s -- "
398 "defaulting to %s collection method" %
399 (protocol, hostname, clientType ))
400 self.log.warn( info )
401 import socket
402 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
403 collector_host= socket.gethostname()
404 evt= { "eventClass":"/Status/Update", "agent":collector_host,
405 "device":hostname, "severity":Error }
406 evt[ 'summary' ]= info
407 self.sendEvent( evt )
408 return
409
410 if not client:
411 self.log.warn("Shell command collector creation failed")
412 else:
413 self.log.info("plugins: %s",
414 ", ".join(map(lambda p: p.name(), plugins)))
415 except (SystemExit, KeyboardInterrupt): raise
416 except:
417 self.log.exception("Error opening command collector")
418 self.addClient(client, timeout, clientType, device.id)
419
420
421
423 """
424 Start SNMP collection client.
425
426 @param device: device to collect against
427 @type device: string
428 @param ip: IP address of device to collect against
429 @type ip: string
430 @param timeout: timeout before failing the connection
431 @type timeout: integer
432 """
433 client = None
434 try:
435 hostname = device.id
436 if getattr( device, "zSnmpMonitorIgnore", True ):
437 self.log.info("SNMP monitoring off for %s" % hostname)
438 return
439
440 plugins = []
441 plugins = self.selectPlugins(device,"snmp")
442 if not plugins:
443 self.log.info("No SNMP plugins found for %s" % hostname)
444 return
445
446 if self.checkCollection(device):
447 self.log.info('SNMP collection device %s' % hostname)
448 self.log.info("plugins: %s",
449 ", ".join(map(lambda p: p.name(), plugins)))
450 client = SnmpClient(device.id, ip, self.options,
451 device, self, plugins)
452 if not client or not plugins:
453 self.log.warn("SNMP collector creation failed")
454 return
455 except (SystemExit, KeyboardInterrupt): raise
456 except:
457 self.log.exception("Error opening the SNMP collector")
458 self.addClient(client, timeout, 'SNMP', device.id)
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496 - def addClient(self, device, timeout, clientType, name):
497 """
498 If device is not None, schedule the device to be collected.
499 Otherwise log an error.
500
501 @param device: device to collect against
502 @type device: string
503 @param timeout: timeout before failing the connection
504 @type timeout: integer
505 @param clientType: description of the plugin type
506 @type clientType: string
507 @param name: plugin name
508 @type name: string
509 """
510 if device:
511 device.timeout = timeout
512 device.timedOut = False
513 self.clients.append(device)
514 device.run()
515 else:
516 self.log.warn('Unable to create a %s collector for %s',
517 clientType, name)
518
519
520
522 """
523 Start portscan collection client.
524
525 @param device: device to collect against
526 @type device: string
527 @param ip: IP address of device to collect against
528 @type ip: string
529 @param timeout: timeout before failing the connection
530 @type timeout: integer
531 """
532 client = None
533 try:
534 hostname = device.id
535 plugins = self.selectPlugins(device, "portscan")
536 if not plugins:
537 self.log.info("No portscan plugins found for %s" % hostname)
538 return
539 if self.checkCollection(device):
540 self.log.info('Portscan collector method for device %s'
541 % hostname)
542 self.log.info("plugins: %s",
543 ", ".join(map(lambda p: p.name(), plugins)))
544 client = PortscanClient(device.id, ip, self.options,
545 device, self, plugins)
546 if not client or not plugins:
547 self.log.warn("Portscan collector creation failed")
548 return
549 except (SystemExit, KeyboardInterrupt): raise
550 except:
551 self.log.exception("Error opening portscan collector")
552 self.addClient(client, timeout, 'portscan', device.id)
553
554
556 """
557 See how old the data is that we've collected
558
559 @param device: device to collect against
560 @type device: string
561 @return: is the SNMP status number > 0 and is the last collection time + collage older than now?
562 @type: boolean
563 """
564 age = device.getSnmpLastCollection() + self.collage
565 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime():
566 self.log.info("Skipped collection of %s" % device.id)
567 return False
568 return True
569
570
572 """
573 Callback that processes the return values from a device.
574 Python iterable.
575
576 @param collectorClient: collector instance
577 @type collectorClient: collector class
578 @return: Twisted deferred object
579 @type: Twisted deferred object
580 """
581 device = collectorClient.device
582 self.log.debug("Client for %s finished collecting", device.id)
583 def processClient(driver):
584 try:
585 pluginStats = {}
586 self.log.debug("Processing data for device %s", device.id)
587 devchanged = False
588 maps = []
589 for plugin, results in collectorClient.getResults():
590 if plugin is None: continue
591 self.log.debug("Processing plugin %s on device %s ...",
592 plugin.name(), device.id)
593 if not results:
594 self.log.warn("The plugin %s returned no results.",
595 plugin.name())
596 continue
597
598 datamaps = []
599 try:
600 results = plugin.preprocess(results, self.log)
601 if results:
602 datamaps = plugin.process(device, results, self.log)
603 if datamaps:
604 pluginStats.setdefault(plugin.name(), plugin.weight)
605
606 except (SystemExit, KeyboardInterrupt), ex:
607 self.log.info( "Plugin %s terminated due to external"
608 " signal (%s)" % (plugin.name(), str(ex) )
609 )
610 continue
611
612 except Exception, ex:
613
614
615
616
617 import socket
618 component= os.path.splitext(
619 os.path.basename( sys.argv[0] )
620 )[0]
621 collector_host= socket.gethostname()
622 evt= { "eventClass":"/Status/Update",
623 "agent":collector_host, "device":device.id,
624 "severity":Error }
625
626 info= "Problem while executing plugin %s" %plugin.name()
627 self.log.error( info )
628 evt[ 'summary' ]= info
629
630 info= traceback.format_exc()
631 self.log.error( info )
632 evt[ 'message' ]= info
633 self.sendEvent( evt )
634 continue
635
636
637 if type(datamaps) not in (types.ListType, types.TupleType):
638 datamaps = [datamaps,]
639 if datamaps:
640 maps += [m for m in datamaps if m]
641 if maps:
642 deviceClass = Classifier.classifyDevice(pluginStats,
643 self.classCollectorPlugins)
644 yield self.config().callRemote(
645 'applyDataMaps', device.id,
646 maps, deviceClass)
647 if driver.next():
648 devchanged = True
649 if devchanged:
650 self.log.info("Changes in configuration applied")
651 else:
652 self.log.info("No change in configuration detected")
653 yield self.config().callRemote('setSnmpLastCollection',
654 device.id)
655 driver.next()
656 except Exception, ex:
657 self.log.exception(ex)
658 raise
659
660 def processClientFinished(result):
661 """
662 Called after the client collection finishes
663
664 @param result: object (unused)
665 @type result: object
666 """
667 if not result:
668 self.log.debug("Client %s finished" % device.id)
669 else:
670 self.log.error("Client %s finished with message: %s" %
671 (device.id, result))
672 try:
673 self.clients.remove(collectorClient)
674 self.finished.append(collectorClient)
675 except ValueError:
676 self.log.debug("Client %s not found in in the list"
677 " of active clients",
678 device.id)
679 d = drive(self.fillCollectionSlots)
680 d.addErrback(self.fillError)
681
682 d = drive(processClient)
683 d.addBoth(processClientFinished)
684
685
686
688 """
689 Twisted errback routine to log an error when
690 unable to collect some data
691
692 @param reason: error message
693 @type reason: string
694 """
695 self.log.error("Unable to fill collection slots: %s" % reason)
696
697
699 """
700 Return our cycle time (in minutes)
701
702 @return: cycle time
703 @rtype: integer
704 """
705 return self.modelerCycleInterval * 60
706
707
725
726
728 """
729 Check to see if there's anything to do.
730 If there isn't, report our statistics and exit.
731
732 @param unused: unused (unused)
733 @type unused: string
734 """
735 if self.clients: return
736 if self.devicegen: return
737
738 if self.start:
739 runTime = time.time() - self.start
740 self.start = None
741 self.log.info("Scan time: %0.2f seconds", runTime)
742 devices = len(self.finished)
743 timedOut = len([c for c in self.finished if c.timedOut])
744 self.sendEvents(
745 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) +
746 self.rrdStats.gauge('devices', self.cycleTime(), devices) +
747 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut)
748 )
749 if not self.options.cycle:
750 self.stop()
751 self.finished = []
752
753
755 """
756 An iterator which either returns a device to collect or
757 calls checkStop()
758
759 @param driver: driver object
760 @type driver: driver object
761 """
762 count = len(self.clients)
763 while count < self.options.parallel and self.devicegen \
764 and not self.pendingNewClients:
765
766 self.pendingNewClients = True
767 try:
768 device = self.devicegen.next()
769 yield self.config().callRemote('getDeviceConfig', [device])
770
771 devices = driver.next()
772 if devices:
773 self.collectDevice(devices[0])
774 except StopIteration:
775 self.devicegen = None
776
777 self.pendingNewClients = False
778 break
779
780 update = len(self.clients)
781 if update != count and update != 1:
782 self.log.info('Running %d clients', update)
783 else:
784 self.log.debug('Running %d clients', update)
785 self.checkStop()
786
787
789 """
790 Build our list of command-line options
791 """
792 PBDaemon.buildOptions(self)
793 self.parser.add_option('--debug',
794 dest='debug', action="store_true", default=False,
795 help="Don't fork threads for processing")
796 self.parser.add_option('--nowmi',
797 dest='nowmi', action="store_true", default=False,
798 help="Do not execute WMI plugins")
799 self.parser.add_option('--parallel', dest='parallel',
800 type='int', default=defaultParallel,
801 help="Number of devices to collect from in parallel")
802 self.parser.add_option('--cycletime',
803 dest='cycletime',default=0,type='int',
804 help="Run collection every x minutes")
805 self.parser.add_option('--ignore',
806 dest='ignorePlugins',default="",
807 help="Modeler plugins to ignore. Takes a regular expression")
808 self.parser.add_option('--collect',
809 dest='collectPlugins',default="",
810 help="Modeler plugins to use. Takes a regular expression")
811 self.parser.add_option('-p', '--path', dest='path',
812 help="Start class path for collection ie /NetworkDevices")
813 self.parser.add_option('-d', '--device', dest='device',
814 help="Fully qualified device name ie www.confmon.com")
815 self.parser.add_option('-a', '--collage',
816 dest='collage', default=0, type='float',
817 help="Do not collect from devices whose collect date " +
818 "is within this many minutes")
819 self.parser.add_option('--writetries',
820 dest='writetries',default=2,type='int',
821 help="Number of times to try to write if a "
822 "read conflict is found")
823
824 self.parser.add_option("-F", "--force",
825 dest="force", action='store_true', default=True,
826 help="Force collection of config data (deprecated)")
827 self.parser.add_option('--portscantimeout', dest='portscantimeout',
828 type='int', default=defaultPortScanTimeout,
829 help="Time to wait for connection failures when port scanning")
830 self.parser.add_option('--now',
831 dest='now', action="store_true", default=False,
832 help="Start daemon now, do not sleep before starting")
833 TCbuildOptions(self.parser, self.usage)
834
835
836
838 """
839 Check what the user gave us vs what we'll accept
840 for command-line options
841 """
842 if not self.options.path and not self.options.device:
843 self.options.path = "/Devices"
844 if self.options.ignorePlugins and self.options.collectPlugins:
845 raise SystemExit( "Only one of --ignore or --collect"
846 " can be used at a time")
847
848
850 """
851 The guts of the timeoutClients method (minus the twisted reactor
852 stuff). Breaking this part out as a separate method facilitates unit
853 testing.
854 """
855 active = []
856 for client in self.clients:
857 if client.timeout < time.time():
858 self.log.warn("Client %s timeout", client.hostname)
859 self.finished.append(client)
860 client.timedOut = True
861 client.stop()
862 else:
863 active.append(client)
864 self.clients = active
865
866
867
881
882
883
885 """
886 Twisted main loop
887 """
888 reactor.startRunning()
889 while reactor.running:
890 try:
891 while reactor.running:
892 reactor.runUntilCurrent()
893 timeout = reactor.timeout()
894 reactor.doIteration(timeout)
895 except:
896 if reactor.running:
897 self.log.exception("Unexpected error in main loop.")
898
899
900
902 """
903 Get the list of devices for which we are collecting:
904 * if -d devicename was used, use the devicename
905 * if a class path flag was supplied, gather the devices
906 along that organizer
907 * otherwise get all of the devices associated with our collector
908
909 @return: list of devices
910 @rtype: list
911 """
912 if self.options.device:
913 self.log.info("Collecting for device %s", self.options.device)
914 return succeed([self.options.device])
915
916 self.log.info("Collecting for path %s", self.options.path)
917 return self.config().callRemote('getDeviceListByOrganizer',
918 self.options.path,
919 self.options.monitor)
920
921
922 - def mainLoop(self, driver):
923 """
924 Main collection loop, a Python iterable
925
926 @param driver: driver object
927 @type driver: driver object
928 @return: Twisted deferred object
929 @rtype: Twisted deferred object
930 """
931 if self.options.cycle:
932 driveLater(self.cycleTime(), self.mainLoop)
933
934 if self.clients:
935 self.log.error("Modeling cycle taking too long")
936 return
937
938 self.start = time.time()
939
940 self.log.debug("Starting collector loop...")
941 yield self.getDeviceList()
942 self.devicegen = iter(driver.next())
943 d = drive(self.fillCollectionSlots)
944 d.addErrback(self.fillError)
945 yield d
946 driver.next()
947 self.log.debug("Collection slots filled")
948
949
950
951 - def main(self, unused=None):
952 """
953 Wrapper around the mainLoop
954
955 @param unused: unused (unused)
956 @type unused: string
957 @return: Twisted deferred object
958 @rtype: Twisted deferred object
959 """
960 self.finished = []
961 d = drive(self.mainLoop)
962 d.addCallback(self.timeoutClients)
963 return d
964
965
966
968 """
969 Stub function
970
971 @param device: device name (unused)
972 @type device: string
973 @todo: implement
974 """
975
976 self.log.debug("Asynch deleteDevice %s" % device)
977
978
979 if __name__ == '__main__':
980 dc = ZenModeler()
981 dc.processOptions()
982 reactor.run = dc.reactorLoop
983 dc.run()
984