1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """Discover (aka model) a device and its components.
15 For instance, find out what Ethernet interfaces and hard disks a server
16 has available.
17 This information should change much less frequently than performance metrics.
18 """
19
20 import Globals
21 from Products.ZenWin.WMIClient import WMIClient
22 from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon
23 from Products.ZenUtils.DaemonStats import DaemonStats
24 from Products.ZenUtils.Driver import drive, driveLater
25 from Products.ZenUtils.Utils import unused
26 from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
27
28 from PythonClient import PythonClient
29 from SshClient import SshClient
30 from TelnetClient import TelnetClient, buildOptions as TCbuildOptions
31 from SnmpClient import SnmpClient
32 from PortscanClient import PortscanClient
33
34 from Products.DataCollector import Classifier
35
36 from twisted.internet import reactor
37 from twisted.internet.defer import succeed
38
39 import time
40 import types
41 import re
42 import DateTime
43
44 import os
45 import os.path
46 import sys
47 import traceback
48
49 defaultPortScanTimeout = 5
50 defaultParallel = 1
51 defaultProtocol = "ssh"
52 defaultPort = 22
53 defaultStartSleep = 10 * 60
54
55
56 from Products.DataCollector import DeviceProxy
57 from Products.DataCollector import Plugins
58 unused(DeviceProxy, Plugins)
59
61 """
62 Daemon class to attach to zenhub and pass along
63 device configuration information.
64 """
65
66 name = 'zenmodeler'
67 initialServices = PBDaemon.initialServices + ['ModelerService']
68
69 generateEvents = True
70 configCycleInterval = 360
71
72 classCollectorPlugins = ()
73
75 """
76 Initalizer
77
78 @param single: collect from a single device?
79 @type single: boolean
80 """
81 PBDaemon.__init__(self)
82
83 self.options.force = True
84 if self.options.daemon:
85 if self.options.now:
86 self.log.debug("Run as a daemon, starting immediately.")
87 else:
88 self.log.info("Run as a daemon, waiting %s sec to start." %
89 defaultStartSleep)
90 time.sleep(defaultStartSleep)
91 self.log.debug("Run as a daemon, slept %s sec, starting now." %
92 defaultStartSleep)
93 else:
94 self.log.debug("Run in foreground, starting immediately.")
95
96 self.start = None
97 self.rrdStats = DaemonStats()
98 self.single = single
99 if self.options.device:
100 self.single = True
101 self.modelerCycleInterval = self.options.cycletime
102 self.collage = float( self.options.collage ) / 1440.0
103 self.pendingNewClients = False
104 self.clients = []
105 self.finished = []
106 self.devicegen = None
107
108
109
111 """
112 Log errors that have occurred
113
114 @param error: error message
115 @type error: string
116 """
117 self.log.error("Error occured: %s", error)
118
119
121 """
122 Called after connected to the zenhub service
123 """
124 d = self.configure()
125 d.addCallback(self.heartbeat)
126 d.addErrback(self.reportError)
127 d.addCallback(self.main)
128
129
170
171 return drive(inner)
172
173
179
180
182 """
183 Build a list of active plugins for a device, based on:
184
185 * the --collect command-line option which is a regex
186 * the --ignore command-line option which is a regex
187 * transport which is a string describing the type of plugin
188
189 @param device: device to collect against
190 @type device: string
191 @param transport: python, ssh, snmp, telnet, cmd
192 @type transport: string
193 @return: results of the plugin
194 @type: string
195 @todo: determine if an event for the collector AND the device should be sent
196 """
197 plugins = []
198 valid_loaders = []
199 for loader in device.plugins:
200 try:
201 plugin= loader.create()
202 self.log.debug( "Loaded plugin %s" % plugin.name() )
203 plugins.append( plugin )
204 valid_loaders.append( loader )
205
206 except (SystemExit, KeyboardInterrupt), ex:
207 self.log.info( "Interrupted by external signal (%s)" % str(ex) )
208 raise
209
210 except Plugins.PluginImportError, import_error:
211 import socket
212 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
213 collector_host= socket.gethostname()
214
215
216
217 evt= { "eventClass":"/Status/Update", "component":component,
218 "agent":collector_host, "device":collector_host,
219 "severity":Error }
220
221 info= "Problem loading plugin %s" % import_error.plugin
222 self.log.error( info )
223 evt[ 'summary' ]= info
224
225 info= import_error.traceback
226 self.log.error( info )
227 evt[ 'message' ]= info
228
229 info= ("Due to import errors, removing the %s plugin"
230 " from this collection cycle.") % import_error.plugin
231 self.log.error( info )
232 evt[ 'message' ] += "%s\n" % info
233 self.sendEvent( evt )
234
235
236
237 if len( device.plugins ) != len( valid_loaders ):
238 device.plugins= valid_loaders
239
240
241
242 collectTest = lambda x: False
243 ignoreTest = lambda x: False
244 if self.options.collectPlugins:
245 collectTest = re.compile(self.options.collectPlugins).search
246 elif self.options.ignorePlugins:
247 ignoreTest = re.compile(self.options.ignorePlugins).search
248
249 result = []
250 for plugin in plugins:
251 if plugin.transport != transport:
252 continue
253 name = plugin.name()
254 if ignoreTest(name):
255 self.log.debug("Ignoring %s on %s because of --ignore flag",
256 name, device.id)
257 elif collectTest(name):
258 self.log.debug("Using %s on %s because of --collect flag",
259 name, device.id)
260 result.append(plugin)
261 elif not self.options.collectPlugins:
262 self.log.debug("Using %s on %s", name, device.id)
263 result.append(plugin)
264 return result
265
266
267
269 """
270 Collect data from a single device.
271
272 @param device: device to collect against
273 @type device: string
274 """
275 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180)
276 ip = device.manageIp
277 timeout = clientTimeout + time.time()
278 self.wmiCollect(device, ip, timeout)
279 self.pythonCollect(device, ip, timeout)
280 self.cmdCollect(device, ip, timeout)
281 self.snmpCollect(device, ip, timeout)
282 self.portscanCollect(device, ip, timeout)
283
284
285
287 """
288 Start the Windows Management Instrumentation (WMI) collector
289
290 @param device: device to collect against
291 @type device: string
292 @param ip: IP address of device to collect against
293 @type ip: string
294 @param timeout: timeout before failing the connection
295 @type timeout: integer
296 """
297 if self.options.nowmi:
298 return
299
300 client = None
301 try:
302 plugins = self.selectPlugins(device, 'wmi')
303 if not plugins:
304 self.log.info("No WMI plugins found for %s" % device.id)
305 return
306 if self.checkCollection(device):
307 self.log.info('WMI collector method for device %s' % device.id)
308 self.log.info("plugins: %s",
309 ", ".join(map(lambda p: p.name(), plugins)))
310 client = WMIClient(device, self, plugins)
311 if not client or not plugins:
312 self.log.warn("WMI collector creation failed")
313 return
314 except (SystemExit, KeyboardInterrupt):
315 raise
316 except Exception:
317 self.log.exception("Error opening WMI collector")
318 self.addClient(client, timeout, 'WMI', device.id)
319
320
321
323 """
324 Start local Python collection client.
325
326 @param device: device to collect against
327 @type device: string
328 @param ip: IP address of device to collect against
329 @type ip: string
330 @param timeout: timeout before failing the connection
331 @type timeout: integer
332 """
333 client = None
334 try:
335 plugins = self.selectPlugins(device, "python")
336 if not plugins:
337 self.log.info("No Python plugins found for %s" % device.id)
338 return
339 if self.checkCollection(device):
340 self.log.info('Python collection device %s' % device.id)
341 self.log.info("plugins: %s",
342 ", ".join(map(lambda p: p.name(), plugins)))
343 client = PythonClient(device, self, plugins)
344 if not client or not plugins:
345 self.log.warn("Python client creation failed")
346 return
347 except (SystemExit, KeyboardInterrupt): raise
348 except:
349 self.log.exception("Error opening pythonclient")
350 self.addClient(client, timeout, 'python', device.id)
351
352
354 """
355 Start shell command collection client.
356
357 @param device: device to collect against
358 @type device: string
359 @param ip: IP address of device to collect against
360 @type ip: string
361 @param timeout: timeout before failing the connection
362 @type timeout: integer
363 """
364 client = None
365 clientType = 'snmp'
366
367 hostname = device.id
368 try:
369 plugins = self.selectPlugins(device,"command")
370 if not plugins:
371 self.log.info("No command plugins found for %s" % hostname)
372 return
373
374 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
375 commandPort = getattr(device, 'zCommandPort', defaultPort)
376
377 if protocol == "ssh":
378 client = SshClient(hostname, ip, commandPort,
379 options=self.options,
380 plugins=plugins, device=device,
381 datacollector=self, isLoseConnection=True)
382 clientType = 'ssh'
383 self.log.info('Using SSH collection method for device %s'
384 % hostname)
385
386 elif protocol == 'telnet':
387 if commandPort == 22: commandPort = 23
388 client = TelnetClient(hostname, ip, commandPort,
389 options=self.options,
390 plugins=plugins, device=device,
391 datacollector=self)
392 clientType = 'telnet'
393 self.log.info('Using telnet collection method for device %s'
394 % hostname)
395
396 else:
397 info = ("Unknown protocol %s for device %s -- "
398 "defaulting to %s collection method" %
399 (protocol, hostname, clientType ))
400 self.log.warn( info )
401 import socket
402 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
403 collector_host= socket.gethostname()
404 evt= { "eventClass":"/Status/Update", "agent":collector_host,
405 "device":hostname, "severity":Error }
406 evt[ 'summary' ]= info
407 self.sendEvent( evt )
408 return
409
410 if not client:
411 self.log.warn("Shell command collector creation failed")
412 else:
413 self.log.info("plugins: %s",
414 ", ".join(map(lambda p: p.name(), plugins)))
415 except (SystemExit, KeyboardInterrupt): raise
416 except:
417 self.log.exception("Error opening command collector")
418 self.addClient(client, timeout, clientType, device.id)
419
420
421
423 """
424 Start SNMP collection client.
425
426 @param device: device to collect against
427 @type device: string
428 @param ip: IP address of device to collect against
429 @type ip: string
430 @param timeout: timeout before failing the connection
431 @type timeout: integer
432 """
433 client = None
434 try:
435 hostname = device.id
436 if getattr( device, "zSnmpMonitorIgnore", True ):
437 self.log.info("SNMP monitoring off for %s" % hostname)
438 return
439
440 if not ip:
441 self.log.info("No manage IP for %s" % hostname)
442 return
443
444 plugins = []
445 plugins = self.selectPlugins(device,"snmp")
446 if not plugins:
447 self.log.info("No SNMP plugins found for %s" % hostname)
448 return
449
450 if self.checkCollection(device):
451 self.log.info('SNMP collection device %s' % hostname)
452 self.log.info("plugins: %s",
453 ", ".join(map(lambda p: p.name(), plugins)))
454 client = SnmpClient(device.id, ip, self.options,
455 device, self, plugins)
456 if not client or not plugins:
457 self.log.warn("SNMP collector creation failed")
458 return
459 except (SystemExit, KeyboardInterrupt): raise
460 except:
461 self.log.exception("Error opening the SNMP collector")
462 self.addClient(client, timeout, 'SNMP', device.id)
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500 - def addClient(self, device, timeout, clientType, name):
501 """
502 If device is not None, schedule the device to be collected.
503 Otherwise log an error.
504
505 @param device: device to collect against
506 @type device: string
507 @param timeout: timeout before failing the connection
508 @type timeout: integer
509 @param clientType: description of the plugin type
510 @type clientType: string
511 @param name: plugin name
512 @type name: string
513 """
514 if device:
515 device.timeout = timeout
516 device.timedOut = False
517 self.clients.append(device)
518 device.run()
519 else:
520 self.log.warn('Unable to create a %s collector for %s',
521 clientType, name)
522
523
524
526 """
527 Start portscan collection client.
528
529 @param device: device to collect against
530 @type device: string
531 @param ip: IP address of device to collect against
532 @type ip: string
533 @param timeout: timeout before failing the connection
534 @type timeout: integer
535 """
536 client = None
537 try:
538 hostname = device.id
539 plugins = self.selectPlugins(device, "portscan")
540 if not plugins:
541 self.log.info("No portscan plugins found for %s" % hostname)
542 return
543 if self.checkCollection(device):
544 self.log.info('Portscan collector method for device %s'
545 % hostname)
546 self.log.info("plugins: %s",
547 ", ".join(map(lambda p: p.name(), plugins)))
548 client = PortscanClient(device.id, ip, self.options,
549 device, self, plugins)
550 if not client or not plugins:
551 self.log.warn("Portscan collector creation failed")
552 return
553 except (SystemExit, KeyboardInterrupt): raise
554 except:
555 self.log.exception("Error opening portscan collector")
556 self.addClient(client, timeout, 'portscan', device.id)
557
558
560 """
561 See how old the data is that we've collected
562
563 @param device: device to collect against
564 @type device: string
565 @return: is the SNMP status number > 0 and is the last collection time + collage older than now?
566 @type: boolean
567 """
568 age = device.getSnmpLastCollection() + self.collage
569 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime():
570 self.log.info("Skipped collection of %s" % device.id)
571 return False
572 return True
573
574
576 """
577 Callback that processes the return values from a device.
578 Python iterable.
579
580 @param collectorClient: collector instance
581 @type collectorClient: collector class
582 @return: Twisted deferred object
583 @type: Twisted deferred object
584 """
585 device = collectorClient.device
586 self.log.debug("Client for %s finished collecting", device.id)
587 def processClient(driver):
588 try:
589 pluginStats = {}
590 self.log.debug("Processing data for device %s", device.id)
591 devchanged = False
592 maps = []
593 for plugin, results in collectorClient.getResults():
594 if plugin is None: continue
595 self.log.debug("Processing plugin %s on device %s ...",
596 plugin.name(), device.id)
597 if not results:
598 self.log.warn("The plugin %s returned no results.",
599 plugin.name())
600 continue
601
602 datamaps = []
603 try:
604 results = plugin.preprocess(results, self.log)
605 if results:
606 datamaps = plugin.process(device, results, self.log)
607 if datamaps:
608 pluginStats.setdefault(plugin.name(), plugin.weight)
609
610 except (SystemExit, KeyboardInterrupt), ex:
611 self.log.info( "Plugin %s terminated due to external"
612 " signal (%s)" % (plugin.name(), str(ex) )
613 )
614 continue
615
616 except Exception, ex:
617
618
619
620
621 import socket
622 component= os.path.splitext(
623 os.path.basename( sys.argv[0] )
624 )[0]
625 collector_host= socket.gethostname()
626 evt= { "eventClass":"/Status/Update",
627 "agent":collector_host, "device":device.id,
628 "severity":Error }
629
630 info= "Problem while executing plugin %s" %plugin.name()
631 self.log.error( info )
632 evt[ 'summary' ]= info
633
634 info= traceback.format_exc()
635 self.log.error( info )
636 evt[ 'message' ]= info
637 self.sendEvent( evt )
638 continue
639
640
641 if type(datamaps) not in (types.ListType, types.TupleType):
642 datamaps = [datamaps,]
643 if datamaps:
644 maps += [m for m in datamaps if m]
645 if maps:
646 deviceClass = Classifier.classifyDevice(pluginStats,
647 self.classCollectorPlugins)
648 yield self.config().callRemote(
649 'applyDataMaps', device.id,
650 maps, deviceClass)
651 if driver.next():
652 devchanged = True
653 if devchanged:
654 self.log.info("Changes in configuration applied")
655 else:
656 self.log.info("No change in configuration detected")
657 yield self.config().callRemote('setSnmpLastCollection',
658 device.id)
659 driver.next()
660 except Exception, ex:
661 self.log.exception(ex)
662 raise
663
664 def processClientFinished(result):
665 """
666 Called after the client collection finishes
667
668 @param result: object (unused)
669 @type result: object
670 """
671 if not result:
672 self.log.debug("Client %s finished" % device.id)
673 else:
674 self.log.error("Client %s finished with message: %s" %
675 (device.id, result))
676 try:
677 self.clients.remove(collectorClient)
678 self.finished.append(collectorClient)
679 except ValueError:
680 self.log.debug("Client %s not found in in the list"
681 " of active clients",
682 device.id)
683 d = drive(self.fillCollectionSlots)
684 d.addErrback(self.fillError)
685
686 d = drive(processClient)
687 d.addBoth(processClientFinished)
688
689
690
692 """
693 Twisted errback routine to log an error when
694 unable to collect some data
695
696 @param reason: error message
697 @type reason: string
698 """
699 self.log.error("Unable to fill collection slots: %s" % reason)
700
701
703 """
704 Return our cycle time (in minutes)
705
706 @return: cycle time
707 @rtype: integer
708 """
709 return self.modelerCycleInterval * 60
710
711
729
730
732 """
733 Check to see if there's anything to do.
734 If there isn't, report our statistics and exit.
735
736 @param unused: unused (unused)
737 @type unused: string
738 """
739 if self.clients: return
740 if self.devicegen: return
741
742 if self.start:
743 runTime = time.time() - self.start
744 self.start = None
745 self.log.info("Scan time: %0.2f seconds", runTime)
746 devices = len(self.finished)
747 timedOut = len([c for c in self.finished if c.timedOut])
748 self.sendEvents(
749 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) +
750 self.rrdStats.gauge('devices', self.cycleTime(), devices) +
751 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut)
752 )
753 if not self.options.cycle:
754 self.stop()
755 self.finished = []
756
757
759 """
760 An iterator which either returns a device to collect or
761 calls checkStop()
762
763 @param driver: driver object
764 @type driver: driver object
765 """
766 count = len(self.clients)
767 while count < self.options.parallel and self.devicegen \
768 and not self.pendingNewClients:
769
770 self.pendingNewClients = True
771 try:
772 device = self.devicegen.next()
773 yield self.config().callRemote('getDeviceConfig', [device])
774
775 devices = driver.next()
776 if devices:
777 self.collectDevice(devices[0])
778 except StopIteration:
779 self.devicegen = None
780
781 self.pendingNewClients = False
782 break
783
784 update = len(self.clients)
785 if update != count and update != 1:
786 self.log.info('Running %d clients', update)
787 else:
788 self.log.debug('Running %d clients', update)
789 self.checkStop()
790
791
793 """
794 Build our list of command-line options
795 """
796 PBDaemon.buildOptions(self)
797 self.parser.add_option('--debug',
798 dest='debug', action="store_true", default=False,
799 help="Don't fork threads for processing")
800 self.parser.add_option('--nowmi',
801 dest='nowmi', action="store_true", default=False,
802 help="Do not execute WMI plugins")
803 self.parser.add_option('--parallel', dest='parallel',
804 type='int', default=defaultParallel,
805 help="Number of devices to collect from in parallel")
806 self.parser.add_option('--cycletime',
807 dest='cycletime',default=720,type='int',
808 help="Run collection every x minutes")
809 self.parser.add_option('--ignore',
810 dest='ignorePlugins',default="",
811 help="Modeler plugins to ignore. Takes a regular expression")
812 self.parser.add_option('--collect',
813 dest='collectPlugins',default="",
814 help="Modeler plugins to use. Takes a regular expression")
815 self.parser.add_option('-p', '--path', dest='path',
816 help="Start class path for collection ie /NetworkDevices")
817 self.parser.add_option('-d', '--device', dest='device',
818 help="Fully qualified device name ie www.confmon.com")
819 self.parser.add_option('-a', '--collage',
820 dest='collage', default=0, type='float',
821 help="Do not collect from devices whose collect date " +
822 "is within this many minutes")
823 self.parser.add_option('--writetries',
824 dest='writetries',default=2,type='int',
825 help="Number of times to try to write if a "
826 "read conflict is found")
827
828 self.parser.add_option("-F", "--force",
829 dest="force", action='store_true', default=True,
830 help="Force collection of config data (deprecated)")
831 self.parser.add_option('--portscantimeout', dest='portscantimeout',
832 type='int', default=defaultPortScanTimeout,
833 help="Time to wait for connection failures when port scanning")
834 self.parser.add_option('--now',
835 dest='now', action="store_true", default=False,
836 help="Start daemon now, do not sleep before starting")
837 TCbuildOptions(self.parser, self.usage)
838
839
840
842 """
843 Check what the user gave us vs what we'll accept
844 for command-line options
845 """
846 if not self.options.path and not self.options.device:
847 self.options.path = "/Devices"
848 if self.options.ignorePlugins and self.options.collectPlugins:
849 raise SystemExit( "Only one of --ignore or --collect"
850 " can be used at a time")
851
852
854 """
855 The guts of the timeoutClients method (minus the twisted reactor
856 stuff). Breaking this part out as a separate method facilitates unit
857 testing.
858 """
859 active = []
860 for client in self.clients:
861 if client.timeout < time.time():
862 self.log.warn("Client %s timeout", client.hostname)
863 self.finished.append(client)
864 client.timedOut = True
865 client.stop()
866 else:
867 active.append(client)
868 self.clients = active
869
870
871
885
886
887
889 """
890 Twisted main loop
891 """
892 reactor.startRunning()
893 while reactor.running:
894 try:
895 while reactor.running:
896 reactor.runUntilCurrent()
897 timeout = reactor.timeout()
898 reactor.doIteration(timeout)
899 except:
900 if reactor.running:
901 self.log.exception("Unexpected error in main loop.")
902
903
904
906 """
907 Get the list of devices for which we are collecting:
908 * if -d devicename was used, use the devicename
909 * if a class path flag was supplied, gather the devices
910 along that organizer
911 * otherwise get all of the devices associated with our collector
912
913 @return: list of devices
914 @rtype: list
915 """
916 if self.options.device:
917 self.log.info("Collecting for device %s", self.options.device)
918 return succeed([self.options.device])
919
920 self.log.info("Collecting for path %s", self.options.path)
921 return self.config().callRemote('getDeviceListByOrganizer',
922 self.options.path,
923 self.options.monitor)
924
925
926 - def mainLoop(self, driver):
927 """
928 Main collection loop, a Python iterable
929
930 @param driver: driver object
931 @type driver: driver object
932 @return: Twisted deferred object
933 @rtype: Twisted deferred object
934 """
935 if self.options.cycle:
936 driveLater(self.cycleTime(), self.mainLoop)
937
938 if self.clients:
939 self.log.error("Modeling cycle taking too long")
940 return
941
942 self.start = time.time()
943
944 self.log.debug("Starting collector loop...")
945 yield self.getDeviceList()
946 self.devicegen = iter(driver.next())
947 d = drive(self.fillCollectionSlots)
948 d.addErrback(self.fillError)
949 yield d
950 driver.next()
951 self.log.debug("Collection slots filled")
952
953
954
955 - def main(self, unused=None):
956 """
957 Wrapper around the mainLoop
958
959 @param unused: unused (unused)
960 @type unused: string
961 @return: Twisted deferred object
962 @rtype: Twisted deferred object
963 """
964 self.finished = []
965 d = drive(self.mainLoop)
966 d.addCallback(self.timeoutClients)
967 return d
968
969
970
972 """
973 Stub function
974
975 @param device: device name (unused)
976 @type device: string
977 @todo: implement
978 """
979
980 self.log.debug("Asynch deleteDevice %s" % device)
981
982
983 if __name__ == '__main__':
984 dc = ZenModeler()
985 dc.processOptions()
986 reactor.run = dc.reactorLoop
987 dc.run()
988