1
2
3
4
5
6
7
8
9
10
11 __doc__= """Discover (aka model) a device and its components.
12 For instance, find out what Ethernet interfaces and hard disks a server
13 has available.
14 This information should change much less frequently than performance metrics.
15 """
16 import Globals
17
18
19
20
21
22 try:
23 import pysamba.twisted.reactor
24 from ZenPacks.zenoss.WindowsMonitor.WMIClient import WMIClient
25 from ZenPacks.zenoss.WindowsMonitor.utils import addNTLMv2Option, setNTLMv2Auth
26 USE_WMI = True
27 except ImportError:
28 USE_WMI = False
29
30 from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon
31 from Products.ZenUtils.DaemonStats import DaemonStats
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused, atomicWrite, zenPath
34 from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
35 from twisted.python.failure import Failure
36
37 from PythonClient import PythonClient
38 from SshClient import SshClient
39 from TelnetClient import TelnetClient, buildOptions as TCbuildOptions
40 from SnmpClient import SnmpClient
41 from PortscanClient import PortscanClient
42
43 from Products.DataCollector import Classifier
44
45 from twisted.internet import reactor
46 from twisted.internet.defer import succeed
47
48 import collections
49 import cPickle as pickle
50 import time
51 import re
52 import DateTime
53
54 import os
55 import os.path
56 import sys
57 import traceback
58 from random import randint
59 from itertools import chain
60
61 defaultPortScanTimeout = 5
62 defaultParallel = 1
63 defaultProtocol = "ssh"
64 defaultPort = 22
65
66
67 from Products.DataCollector import DeviceProxy
68 from Products.DataCollector import Plugins
69 unused(DeviceProxy, Plugins)
72 """
73 Daemon class to attach to zenhub and pass along
74 device configuration information.
75 """
76
77 name = 'zenmodeler'
78 initialServices = PBDaemon.initialServices + ['ModelerService']
79
80 generateEvents = True
81 configCycleInterval = 360
82
83 classCollectorPlugins = ()
84
86 """
87 Initalizer
88
89 @param single: collect from a single device?
90 @type single: boolean
91 """
92 PBDaemon.__init__(self)
93
94 self.options.force = True
95 self.start = None
96 self.rrdStats = DaemonStats()
97 self.single = single
98 if self.options.device:
99 self.single = True
100 self.modelerCycleInterval = self.options.cycletime
101 self.collage = float( self.options.collage ) / 1440.0
102 self.pendingNewClients = False
103 self.clients = []
104 self.finished = []
105 self.devicegen = None
106 self.counters = collections.Counter()
107
108
109 self.started = False
110 self.startDelay = 0
111 if self.options.daemon:
112 if self.options.now:
113 self.log.debug("Run as a daemon, starting immediately.")
114 else:
115
116 self.startDelay = randint(10, 60) * 1
117 self.log.info("Run as a daemon, waiting %s seconds to start." %
118 self.startDelay)
119 else:
120 self.log.debug("Run in foreground, starting immediately.")
121
122
123 self.loadCounters()
124
126 """
127 Log errors that have occurred
128
129 @param error: error message
130 @type error: string
131 """
132 self.log.error("Error occured: %s", error)
133
134
136 """
137 Called after connected to the zenhub service
138 """
139 d = self.configure()
140 d.addCallback(self.heartbeat)
141 d.addErrback(self.reportError)
142
143
184
185 return drive(inner)
186
187
193
194
196 """
197 Build a list of active plugins for a device, based on:
198
199 * the --collect command-line option which is a regex
200 * the --ignore command-line option which is a regex
201 * transport which is a string describing the type of plugin
202
203 @param device: device to collect against
204 @type device: string
205 @param transport: python, ssh, snmp, telnet, cmd
206 @type transport: string
207 @return: results of the plugin
208 @type: string
209 @todo: determine if an event for the collector AND the device should be sent
210 """
211 plugins = []
212 valid_loaders = []
213 for loader in device.plugins:
214 try:
215 plugin= loader.create()
216 self.log.debug( "Loaded plugin %s" % plugin.name() )
217 plugins.append( plugin )
218 valid_loaders.append( loader )
219
220 except (SystemExit, KeyboardInterrupt), ex:
221 self.log.info( "Interrupted by external signal (%s)" % str(ex) )
222 raise
223
224 except Plugins.PluginImportError, import_error:
225 import socket
226 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
227 collector_host= socket.gethostname()
228
229
230
231 evt= { "eventClass":"/Status/Update", "component":component,
232 "agent":collector_host, "device":collector_host,
233 "severity":Error }
234
235 info= "Problem loading plugin %s" % import_error.plugin
236 self.log.error( info )
237 evt[ 'summary' ]= info
238
239 info= import_error.traceback
240 self.log.error( info )
241 evt[ 'message' ]= info
242
243 info= ("Due to import errors, removing the %s plugin"
244 " from this collection cycle.") % import_error.plugin
245 self.log.error( info )
246 evt[ 'message' ] += "%s\n" % info
247 self.sendEvent( evt )
248
249
250
251 if len( device.plugins ) != len( valid_loaders ):
252 device.plugins= valid_loaders
253
254
255
256 collectTest = lambda x: False
257 ignoreTest = lambda x: False
258 if self.options.collectPlugins:
259 collectTest = re.compile(self.options.collectPlugins).search
260 elif self.options.ignorePlugins:
261 ignoreTest = re.compile(self.options.ignorePlugins).search
262
263 result = []
264 for plugin in plugins:
265 if plugin.transport != transport:
266 continue
267 name = plugin.name()
268 if ignoreTest(name):
269 self.log.debug("Ignoring %s on %s because of --ignore flag",
270 name, device.id)
271 elif collectTest(name):
272 self.log.debug("Using %s on %s because of --collect flag",
273 name, device.id)
274 result.append(plugin)
275 elif not self.options.collectPlugins:
276 self.log.debug("Using %s on %s", name, device.id)
277 result.append(plugin)
278 return result
279
280
281
283 """
284 Collect data from a single device.
285
286 @param device: device to collect against
287 @type device: string
288 """
289 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180)
290 ip = device.manageIp
291 timeout = clientTimeout + time.time()
292 if USE_WMI:
293 self.wmiCollect(device, ip, timeout)
294 else:
295 self.log.info("skipping WMI-based collection, PySamba zenpack not installed")
296 self.pythonCollect(device, ip, timeout)
297 self.cmdCollect(device, ip, timeout)
298 self.snmpCollect(device, ip, timeout)
299 self.portscanCollect(device, ip, timeout)
300
301
302
304 """
305 Start the Windows Management Instrumentation (WMI) collector
306
307 @param device: device to collect against
308 @type device: string
309 @param ip: IP address of device to collect against
310 @type ip: string
311 @param timeout: timeout before failing the connection
312 @type timeout: integer
313 """
314 if self.options.nowmi:
315 return
316
317 client = None
318 try:
319 plugins = self.selectPlugins(device, 'wmi')
320 if not plugins:
321 self.log.info("No WMI plugins found for %s" % device.id)
322 return
323 if self.checkCollection(device):
324 self.log.info('WMI collector method for device %s' % device.id)
325 self.log.info("plugins: %s",
326 ", ".join(map(lambda p: p.name(), plugins)))
327 client = WMIClient(device, self, plugins)
328 if not client or not plugins:
329 self.log.warn("WMI collector creation failed")
330 return
331 except (SystemExit, KeyboardInterrupt):
332 raise
333 except Exception:
334 self.log.exception("Error opening WMI collector")
335 self.addClient(client, timeout, 'WMI', device.id)
336
337
338
340 """
341 Start local Python collection client.
342
343 @param device: device to collect against
344 @type device: string
345 @param ip: IP address of device to collect against
346 @type ip: string
347 @param timeout: timeout before failing the connection
348 @type timeout: integer
349 """
350 client = None
351 try:
352 plugins = self.selectPlugins(device, "python")
353 if not plugins:
354 self.log.info("No Python plugins found for %s" % device.id)
355 return
356 if self.checkCollection(device):
357 self.log.info('Python collection device %s' % device.id)
358 self.log.info("plugins: %s",
359 ", ".join(map(lambda p: p.name(), plugins)))
360 client = PythonClient(device, self, plugins)
361 if not client or not plugins:
362 self.log.warn("Python client creation failed")
363 return
364 except (SystemExit, KeyboardInterrupt): raise
365 except:
366 self.log.exception("Error opening pythonclient")
367 self.addClient(client, timeout, 'python', device.id)
368
369
371 """
372 Start shell command collection client.
373
374 @param device: device to collect against
375 @type device: string
376 @param ip: IP address of device to collect against
377 @type ip: string
378 @param timeout: timeout before failing the connection
379 @type timeout: integer
380 """
381 client = None
382 clientType = 'snmp'
383
384 hostname = device.id
385 try:
386 plugins = self.selectPlugins(device,"command")
387 if not plugins:
388 self.log.info("No command plugins found for %s" % hostname)
389 return
390
391 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
392 commandPort = getattr(device, 'zCommandPort', defaultPort)
393
394 if protocol == "ssh":
395 client = SshClient(hostname, ip, commandPort,
396 options=self.options,
397 plugins=plugins, device=device,
398 datacollector=self, isLoseConnection=True)
399 clientType = 'ssh'
400 self.log.info('Using SSH collection method for device %s'
401 % hostname)
402
403 elif protocol == 'telnet':
404 if commandPort == 22: commandPort = 23
405 client = TelnetClient(hostname, ip, commandPort,
406 options=self.options,
407 plugins=plugins, device=device,
408 datacollector=self)
409 clientType = 'telnet'
410 self.log.info('Using telnet collection method for device %s'
411 % hostname)
412
413 else:
414 info = ("Unknown protocol %s for device %s -- "
415 "defaulting to %s collection method" %
416 (protocol, hostname, clientType ))
417 self.log.warn( info )
418 import socket
419 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) )
420 collector_host= socket.gethostname()
421 evt= { "eventClass":"/Status/Update", "agent":collector_host,
422 "device":hostname, "severity":Error }
423 evt[ 'summary' ]= info
424 self.sendEvent( evt )
425 return
426
427 if not client:
428 self.log.warn("Shell command collector creation failed")
429 else:
430 self.log.info("plugins: %s",
431 ", ".join(map(lambda p: p.name(), plugins)))
432 except (SystemExit, KeyboardInterrupt): raise
433 except:
434 self.log.exception("Error opening command collector")
435 self.addClient(client, timeout, clientType, device.id)
436
437
438
440 """
441 Start SNMP collection client.
442
443 @param device: device to collect against
444 @type device: string
445 @param ip: IP address of device to collect against
446 @type ip: string
447 @param timeout: timeout before failing the connection
448 @type timeout: integer
449 """
450 client = None
451 try:
452 hostname = device.id
453 if getattr( device, "zSnmpMonitorIgnore", True ):
454 self.log.info("SNMP monitoring off for %s" % hostname)
455 return
456
457 if not ip:
458 self.log.info("No manage IP for %s" % hostname)
459 return
460
461 plugins = []
462 plugins = self.selectPlugins(device,"snmp")
463 if not plugins:
464 self.log.info("No SNMP plugins found for %s" % hostname)
465 return
466
467 if self.checkCollection(device):
468 self.log.info('SNMP collection device %s' % hostname)
469 self.log.info("plugins: %s",
470 ", ".join(map(lambda p: p.name(), plugins)))
471 client = SnmpClient(device.id, ip, self.options,
472 device, self, plugins)
473 if not client or not plugins:
474 self.log.warn("SNMP collector creation failed")
475 return
476 except (SystemExit, KeyboardInterrupt): raise
477 except:
478 self.log.exception("Error opening the SNMP collector")
479 self.addClient(client, timeout, 'SNMP', device.id)
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516 - def addClient(self, device, timeout, clientType, name):
517 """
518 If device is not None, schedule the device to be collected.
519 Otherwise log an error.
520
521 @param device: device to collect against
522 @type device: string
523 @param timeout: timeout before failing the connection
524 @type timeout: integer
525 @param clientType: description of the plugin type
526 @type clientType: string
527 @param name: plugin name
528 @type name: string
529 """
530 if device:
531 device.timeout = timeout
532 device.timedOut = False
533 self.clients.append(device)
534 device.run()
535 else:
536 self.log.warn('Unable to create a %s collector for %s',
537 clientType, name)
538
539
540
542 """
543 Start portscan collection client.
544
545 @param device: device to collect against
546 @type device: string
547 @param ip: IP address of device to collect against
548 @type ip: string
549 @param timeout: timeout before failing the connection
550 @type timeout: integer
551 """
552 client = None
553 try:
554 hostname = device.id
555 plugins = self.selectPlugins(device, "portscan")
556 if not plugins:
557 self.log.info("No portscan plugins found for %s" % hostname)
558 return
559 if self.checkCollection(device):
560 self.log.info('Portscan collector method for device %s'
561 % hostname)
562 self.log.info("plugins: %s",
563 ", ".join(map(lambda p: p.name(), plugins)))
564 client = PortscanClient(device.id, ip, self.options,
565 device, self, plugins)
566 if not client or not plugins:
567 self.log.warn("Portscan collector creation failed")
568 return
569 except (SystemExit, KeyboardInterrupt): raise
570 except:
571 self.log.exception("Error opening portscan collector")
572 self.addClient(client, timeout, 'portscan', device.id)
573
574
576 """
577 See how old the data is that we've collected
578
579 @param device: device to collect against
580 @type device: string
581 @return: is the SNMP status number > 0 and is the last collection time + collage older than now?
582 @type: boolean
583 """
584 age = device.getSnmpLastCollection() + self.collage
585 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime():
586 self.log.info("Skipped collection of %s" % device.id)
587 return False
588 return True
589
591 """
592 Callback that processes the return values from a device.
593 Python iterable.
594 @param collectorClient: collector instance
595 @type collectorClient: collector class
596 @return: Twisted deferred object
597 @type: Twisted deferred object
598 """
599 device = collectorClient.device
600 self.log.debug("Client for %s finished collecting", device.id)
601 def processClient(driver):
602 try:
603 if (isinstance(collectorClient, SnmpClient)
604 and collectorClient.connInfo.changed == True):
605 self.log.info(
606 "SNMP connection info for %s changed. Updating...",
607 device.id)
608 yield self.config().callRemote('setSnmpConnectionInfo',
609 device.id,
610 collectorClient.connInfo.zSnmpVer,
611 collectorClient.connInfo.zSnmpPort,
612 collectorClient.connInfo.zSnmpCommunity
613 )
614 driver.next()
615
616 pluginStats = {}
617 self.log.debug("Processing data for device %s", device.id)
618 devchanged = False
619 maps = []
620 for plugin, results in collectorClient.getResults():
621 if plugin is None: continue
622 self.log.debug("Processing plugin %s on device %s ...",
623 plugin.name(), device.id)
624 if not results:
625 self.log.warn("The plugin %s returned no results.",
626 plugin.name())
627 continue
628 self.log.debug("Plugin %s results = %s", plugin.name(), results)
629 datamaps = []
630 try:
631 results = plugin.preprocess(results, self.log)
632 if results:
633 datamaps = plugin.process(device, results, self.log)
634 if datamaps:
635 pluginStats.setdefault(plugin.name(), plugin.weight)
636
637 except (SystemExit, KeyboardInterrupt), ex:
638 self.log.info( "Plugin %s terminated due to external"
639 " signal (%s)" % (plugin.name(), str(ex) )
640 )
641 continue
642
643 except Exception, ex:
644
645
646
647
648 import socket
649 component= os.path.splitext(
650 os.path.basename( sys.argv[0] )
651 )[0]
652 collector_host= socket.gethostname()
653 evt= { "eventClass":"/Status/Update",
654 "agent":collector_host, "device":device.id,
655 "severity":Error }
656
657 info= "Problem while executing plugin %s" %plugin.name()
658 self.log.error( info )
659 evt[ 'summary' ]= info
660
661 info= traceback.format_exc()
662 self.log.error( info )
663 evt[ 'message' ]= info
664 self.sendEvent( evt )
665 continue
666
667
668 if not isinstance(datamaps, (list, tuple)):
669 datamaps = [datamaps,]
670 if datamaps:
671 maps += [m for m in datamaps if m]
672 if maps:
673 deviceClass = Classifier.classifyDevice(pluginStats,
674 self.classCollectorPlugins)
675 yield self.config().callRemote(
676 'applyDataMaps', device.id,
677 maps, deviceClass, True)
678
679 if driver.next():
680 devchanged = True
681 if devchanged:
682 self.log.info("Changes in configuration applied")
683 else:
684 self.log.info("No change in configuration detected")
685
686 except Exception, ex:
687 self.log.exception(ex)
688 raise
689
690 def processClientFinished(result):
691 """
692 Called after the client collection finishes
693
694 @param result: object (unused)
695 @type result: object
696 """
697 self.counters['modeledDevicesCount'] += 1
698
699 if result and isinstance(result, (basestring, Failure)):
700 self.log.error("Client %s finished with message: %s" %
701 (device.id, result))
702 else:
703 self.log.debug("Client %s finished" % device.id)
704
705 try:
706 self.clients.remove(collectorClient)
707 self.finished.append(collectorClient)
708 except ValueError:
709 self.log.debug("Client %s not found in in the list"
710 " of active clients",
711 device.id)
712 d = drive(self.fillCollectionSlots)
713 d.addErrback(self.fillError)
714
715 d = drive(processClient)
716 d.addBoth(processClientFinished)
717
718
719
721 """
722 Twisted errback routine to log an error when
723 unable to collect some data
724
725 @param reason: error message
726 @type reason: string
727 """
728 self.log.error("Unable to fill collection slots: %s" % reason)
729
730
732 """
733 Return our cycle time (in minutes)
734
735 @return: cycle time
736 @rtype: integer
737 """
738 return self.modelerCycleInterval * 60
739
740
742 """
743 Twisted keep-alive mechanism to ensure that
744 we're still connected to zenhub
745
746 @param ignored: object (unused)
747 @type ignored: object
748 """
749 ARBITRARY_BEAT = 30
750 reactor.callLater(ARBITRARY_BEAT, self.heartbeat)
751 if self.options.cycle:
752 evt = dict(eventClass=Heartbeat,
753 component='zenmodeler',
754 device=self.options.monitor,
755 timeout=3*ARBITRARY_BEAT)
756 self.sendEvent(evt)
757 self.niceDoggie(self.cycleTime())
758
759
760 if not self.started:
761 self.started = True
762 reactor.callLater(self.startDelay, self.main)
763
764
765 self.rrdStats.derive('modeledDevices', ARBITRARY_BEAT, self.counters['modeledDevicesCount'])
766
767
768 self.rrdStats.gauge('modeledDevicesCount', ARBITRARY_BEAT, self.counters['modeledDevicesCount'])
769
770
771 self.saveCounters()
772
775
782
784 try:
785 self.counters = pickle.load(open(self._getCountersFile()))
786 except Exception:
787 self.counters = collections.Counter()
788
789 @property
791 """check it self.devicegen (an iterator) is not empty and has at least
792 one value. doing this check changes the iterator, so this method
793 restores it to its original state before returning"""
794 result = False
795 if self.devicegen is not None:
796 try:
797 first = self.devicegen.next()
798 except StopIteration:
799 pass
800 else:
801 result = True
802 self.devicegen = chain([first], self.devicegen)
803 return result
804
805
807 """
808 Check to see if there's anything to do.
809 If there isn't, report our statistics and exit.
810
811 @param unused: unused (unused)
812 @type unused: string
813 """
814 if self.pendingNewClients or self.clients: return
815 if self._devicegen_has_items: return
816
817 if self.start:
818 runTime = time.time() - self.start
819 self.start = None
820 self.log.info("Scan time: %0.2f seconds", runTime)
821 devices = len(self.finished)
822 timedOut = len([c for c in self.finished if c.timedOut])
823 self.sendEvents(
824 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) +
825 self.rrdStats.gauge('devices', self.cycleTime(), devices) +
826 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut)
827 )
828 if not self.options.cycle:
829 self.stop()
830 self.finished = []
831
833 """
834 An iterator which either returns a device to collect or
835 calls checkStop()
836 @param driver: driver object
837 @type driver: driver object
838 """
839 count = len(self.clients)
840 while count < self.options.parallel and self._devicegen_has_items \
841 and not self.pendingNewClients:
842 self.pendingNewClients = True
843 try:
844 device = self.devicegen.next()
845 yield self.config().callRemote('getDeviceConfig', [device],
846 self.options.checkStatus)
847
848 devices = driver.next()
849 if devices:
850 d = devices[0]
851 if d.skipModelMsg:
852 self.log.info(d.skipModelMsg)
853 else:
854 self.collectDevice(d)
855 else:
856 self.log.info("Device %s not returned is it down?", device)
857 except StopIteration:
858 self.devicegen = None
859 finally:
860 self.pendingNewClients = False
861 break
862 update = len(self.clients)
863 if update != count and update != 1:
864 self.log.info('Running %d clients', update)
865 else:
866 self.log.debug('Running %d clients', update)
867 self.checkStop()
868
870 """
871 Build our list of command-line options
872 """
873 PBDaemon.buildOptions(self)
874 self.parser.add_option('--debug',
875 dest='debug', action="store_true", default=False,
876 help="Don't fork threads for processing")
877 self.parser.add_option('--nowmi',
878 dest='nowmi', action="store_true", default=not USE_WMI,
879 help="Do not execute WMI plugins")
880 self.parser.add_option('--parallel', dest='parallel',
881 type='int', default=defaultParallel,
882 help="Number of devices to collect from in parallel")
883 self.parser.add_option('--cycletime',
884 dest='cycletime',default=720,type='int',
885 help="Run collection every x minutes")
886 self.parser.add_option('--ignore',
887 dest='ignorePlugins',default="",
888 help="Modeler plugins to ignore. Takes a regular expression")
889 self.parser.add_option('--collect',
890 dest='collectPlugins',default="",
891 help="Modeler plugins to use. Takes a regular expression")
892 self.parser.add_option('-p', '--path', dest='path',
893 help="Start class path for collection ie /NetworkDevices")
894 self.parser.add_option('-d', '--device', dest='device',
895 help="Fully qualified device name ie www.confmon.com")
896 self.parser.add_option('-a', '--collage',
897 dest='collage', default=0, type='float',
898 help="Do not collect from devices whose collect date " +
899 "is within this many minutes")
900 self.parser.add_option('--writetries',
901 dest='writetries',default=2,type='int',
902 help="Number of times to try to write if a "
903 "read conflict is found")
904
905 self.parser.add_option("-F", "--force",
906 dest="force", action='store_true', default=True,
907 help="Force collection of config data (deprecated)")
908 self.parser.add_option('--portscantimeout', dest='portscantimeout',
909 type='int', default=defaultPortScanTimeout,
910 help="Time to wait for connection failures when port scanning")
911 self.parser.add_option('--now',
912 dest='now', action="store_true", default=False,
913 help="Start daemon now, do not sleep before starting")
914 self.parser.add_option('--communities',
915 dest='discoverCommunity', action="store_true", default=False,
916 help="If an snmp connection fails try and rediscover it's connection info")
917 self.parser.add_option('--checkstatus',
918 dest='checkStatus', action="store_true", default=False,
919 help="Don't model if the device is ping or snmp down")
920 TCbuildOptions(self.parser, self.usage)
921 if USE_WMI:
922 addNTLMv2Option(self.parser)
923
925 """
926 Check what the user gave us vs what we'll accept
927 for command-line options
928 """
929 if not self.options.path and not self.options.device:
930 self.options.path = "/Devices"
931 if self.options.ignorePlugins and self.options.collectPlugins:
932 raise SystemExit( "Only one of --ignore or --collect"
933 " can be used at a time")
934 if USE_WMI:
935 setNTLMv2Auth(self.options)
936
938 """
939 The guts of the timeoutClients method (minus the twisted reactor
940 stuff). Breaking this part out as a separate method facilitates unit
941 testing.
942 """
943 active = []
944 for client in self.clients:
945 if client.timeout < time.time():
946 self.log.warn("Client %s timeout", client.hostname)
947 self.finished.append(client)
948 client.timedOut = True
949 try:
950 client.stop()
951 except AssertionError, ex:
952 pass
953 else:
954 active.append(client)
955 self.clients = active
956
957
958
972
973
974
976 """
977 Twisted main loop
978 """
979 reactor.startRunning()
980 while reactor.running:
981 try:
982 while reactor.running:
983 reactor.runUntilCurrent()
984 timeout = reactor.timeout()
985 reactor.doIteration(timeout)
986 except Exception:
987 if reactor.running:
988 self.log.exception("Unexpected error in main loop.")
989
990
991
993 """
994 Get the list of devices for which we are collecting:
995 * if -d devicename was used, use the devicename
996 * if a class path flag was supplied, gather the devices
997 along that organizer
998 * otherwise get all of the devices associated with our collector
999
1000 @return: list of devices
1001 @rtype: list
1002 """
1003 if self.options.device:
1004 self.log.info("Collecting for device %s", self.options.device)
1005 return succeed([self.options.device])
1006
1007 self.log.info("Collecting for path %s", self.options.path)
1008
1009 d = self.config().callRemote('getDeviceListByOrganizer',
1010 self.options.path,
1011 self.options.monitor)
1012 def handle(results):
1013 self.log.critical("%s is not a valid organizer." % (self.options.path))
1014 reactor.running = False
1015 sys.exit(1)
1016
1017
1018 d.addErrback(handle)
1019 return d
1020
1021 - def mainLoop(self, driver):
1022 """
1023 Main collection loop, a Python iterable
1024
1025 @param driver: driver object
1026 @type driver: driver object
1027 @return: Twisted deferred object
1028 @rtype: Twisted deferred object
1029 """
1030 if self.options.cycle:
1031 driveLater(self.cycleTime(), self.mainLoop)
1032
1033 if self.clients:
1034 self.log.error("Modeling cycle taking too long")
1035 return
1036
1037 self.start = time.time()
1038
1039 self.log.debug("Starting collector loop...")
1040 yield self.getDeviceList()
1041 self.devicegen = iter(driver.next())
1042 d = drive(self.fillCollectionSlots)
1043 d.addErrback(self.fillError)
1044 yield d
1045 driver.next()
1046 self.log.debug("Collection slots filled")
1047
1048
1049
1050 - def main(self, unused=None):
1051 """
1052 Wrapper around the mainLoop
1053
1054 @param unused: unused (unused)
1055 @type unused: string
1056 @return: Twisted deferred object
1057 @rtype: Twisted deferred object
1058 """
1059 self.finished = []
1060 d = drive(self.mainLoop)
1061 d.addCallback(self.timeoutClients)
1062 return d
1063
1064
1065
1067 """
1068 Stub function
1069
1070 @param device: device name (unused)
1071 @type device: string
1072 @todo: implement
1073 """
1074
1075 self.log.debug("Asynch deleteDevice %s" % device)
1076
1077
1078 if __name__ == '__main__':
1079 dc = ZenModeler()
1080 dc.processOptions()
1081 reactor.run = dc.reactorLoop
1082 try:
1083 dc.run()
1084 finally:
1085 dc.saveCounters()
1086