Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.zenmodeler

  1  ########################################################################## 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__= """Discover (aka model) a device and its components. 
 15  For instance, find out what Ethernet interfaces and hard disks a server 
 16  has available. 
 17  This information should change much less frequently than performance metrics. 
 18  """ 
 19   
 20  import Globals 
 21  from Products.ZenWin.WMIClient import WMIClient 
 22  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
 23  from Products.ZenUtils.DaemonStats import DaemonStats 
 24  from Products.ZenUtils.Driver import drive, driveLater 
 25  from Products.ZenUtils.Utils import unused 
 26  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
 27   
 28  from PythonClient   import PythonClient 
 29  from SshClient      import SshClient 
 30  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
 31  from SnmpClient     import SnmpClient 
 32  from PortscanClient import PortscanClient 
 33                            
 34  from Products.DataCollector import Classifier 
 35   
 36  from twisted.internet import reactor 
 37  from twisted.internet.defer import succeed 
 38   
 39  import time 
 40  import types 
 41  import re 
 42  import DateTime 
 43   
 44  import os 
 45  import os.path 
 46  import sys 
 47  import traceback   
 48          
 49  defaultPortScanTimeout = 5 
 50  defaultParallel = 1 
 51  defaultProtocol = "ssh" 
 52  defaultPort = 22 
 53  defaultStartSleep = 10 * 60 
 54   
 55  # needed for Twisted's PB (Perspective Broker) to work 
 56  from Products.DataCollector import DeviceProxy 
 57  from Products.DataCollector import Plugins 
 58  unused(DeviceProxy, Plugins) 
 59   
60 -class ZenModeler(PBDaemon):
61 """ 62 Daemon class to attach to zenhub and pass along 63 device configuration information. 64 """ 65 66 name = 'zenmodeler' 67 initialServices = PBDaemon.initialServices + ['ModelerService'] 68 69 generateEvents = True 70 configCycleInterval = 360 71 72 classCollectorPlugins = () 73
74 - def __init__(self, single=False ):
75 """ 76 Initalizer 77 78 @param single: collect from a single device? 79 @type single: boolean 80 """ 81 PBDaemon.__init__(self) 82 # FIXME: cleanup --force option #2660 83 self.options.force = True 84 if self.options.daemon: 85 if self.options.now: 86 self.log.debug("Run as a daemon, starting immediately.") 87 else: 88 self.log.info("Run as a daemon, waiting %s sec to start." % 89 defaultStartSleep) 90 time.sleep(defaultStartSleep) 91 self.log.debug("Run as a daemon, slept %s sec, starting now." % 92 defaultStartSleep) 93 else: 94 self.log.debug("Run in foreground, starting immediately.") 95 96 self.start = None 97 self.rrdStats = DaemonStats() 98 self.single = single 99 if self.options.device: 100 self.single = True 101 self.modelerCycleInterval = self.options.cycletime 102 self.collage = float( self.options.collage ) / 1440.0 103 self.pendingNewClients = False 104 self.clients = [] 105 self.finished = [] 106 self.devicegen = None
107 108 109
110 - def reportError(self, error):
111 """ 112 Log errors that have occurred 113 114 @param error: error message 115 @type error: string 116 """ 117 self.log.error("Error occured: %s", error)
118 119
120 - def connected(self):
121 """ 122 Called after connected to the zenhub service 123 """ 124 d = self.configure() 125 d.addCallback(self.heartbeat) 126 d.addErrback(self.reportError) 127 d.addCallback(self.main)
128 129
130 - def configure(self):
131 """ 132 Get our configuration from zenhub 133 """ 134 # add in the code to fetch cycle time, etc. 135 def inner(driver): 136 """ 137 Generator function to gather our configuration 138 139 @param driver: driver object 140 @type driver: driver object 141 """ 142 self.log.debug('fetching monitor properties') 143 yield self.config().callRemote('propertyItems') 144 items = dict(driver.next()) 145 if self.options.cycletime == 0: 146 self.modelerCycleInterval = items.get('modelerCycleInterval', 147 self.modelerCycleInterval) 148 self.configCycleInterval = items.get('configCycleInterval', 149 self.configCycleInterval) 150 reactor.callLater(self.configCycleInterval * 60, self.configure) 151 152 self.log.debug("Getting threshold classes...") 153 yield self.config().callRemote('getThresholdClasses') 154 self.remote_updateThresholdClasses(driver.next()) 155 156 self.log.debug("Fetching default RRDCreateCommand...") 157 yield self.config().callRemote('getDefaultRRDCreateCommand') 158 createCommand = driver.next() 159 160 self.log.debug("Getting collector thresholds...") 161 yield self.config().callRemote('getCollectorThresholds') 162 self.rrdStats.config(self.options.monitor, 163 self.name, 164 driver.next(), 165 createCommand) 166 167 self.log.debug("Getting collector plugins for each DeviceClass") 168 yield self.config().callRemote('getClassCollectorPlugins') 169 self.classCollectorPlugins = driver.next()
170 171 return drive(inner)
172 173
174 - def config(self):
175 """ 176 Get the ModelerService 177 """ 178 return self.services.get('ModelerService', FakeRemote())
179 180
181 - def selectPlugins(self, device, transport):
182 """ 183 Build a list of active plugins for a device, based on: 184 185 * the --collect command-line option which is a regex 186 * the --ignore command-line option which is a regex 187 * transport which is a string describing the type of plugin 188 189 @param device: device to collect against 190 @type device: string 191 @param transport: python, ssh, snmp, telnet, cmd 192 @type transport: string 193 @return: results of the plugin 194 @type: string 195 @todo: determine if an event for the collector AND the device should be sent 196 """ 197 plugins = [] 198 valid_loaders = [] 199 for loader in device.plugins: 200 try: 201 plugin= loader.create() 202 self.log.debug( "Loaded plugin %s" % plugin.name() ) 203 plugins.append( plugin ) 204 valid_loaders.append( loader ) 205 206 except (SystemExit, KeyboardInterrupt), ex: 207 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 208 raise 209 210 except Plugins.pluginImportError, import_error: 211 import socket 212 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 213 collector_host= socket.gethostname() 214 # NB: an import errror affects all devices, 215 # so report the issue against the collector 216 # TOOD: determine if an event for the collector AND the device should be sent 217 evt= { "eventClass":"/Status/Update", "component":component, 218 "agent":collector_host, "device":collector_host, 219 "severity":Error } 220 221 info= "Problem loading plugin %s" % import_error.plugin 222 self.log.error( info ) 223 evt[ 'summary' ]= info 224 225 info= import_error.traceback 226 self.log.error( info ) 227 evt[ 'message' ]= info 228 229 info= ("Due to import errors, removing the %s plugin" 230 " from this collection cycle.") % import_error.plugin 231 self.log.error( info ) 232 evt[ 'message' ] += "%s\n" % info 233 self.sendEvent( evt ) 234 235 # Make sure that we don't generate messages for bad loaders again 236 # NB: doesn't update the device's zProperties 237 if len( device.plugins ) != len( valid_loaders ): 238 device.plugins= valid_loaders 239 240 # Create functions to search for what plugins we will and 241 # won't supply to the device 242 collectTest = lambda x: False 243 ignoreTest = lambda x: False 244 if self.options.collectPlugins: 245 collectTest = re.compile(self.options.collectPlugins).search 246 elif self.options.ignorePlugins: 247 ignoreTest = re.compile(self.options.ignorePlugins).search 248 249 result = [] 250 for plugin in plugins: 251 if plugin.transport != transport: 252 continue 253 name = plugin.name() 254 if ignoreTest(name): 255 self.log.debug("Ignoring %s on %s because of --ignore flag", 256 name, device.id) 257 elif collectTest(name): 258 self.log.debug("Using %s on %s because of --collect flag", 259 name, device.id) 260 result.append(plugin) 261 elif not self.options.collectPlugins: 262 self.log.debug("Using %s on %s", name, device.id) 263 result.append(plugin) 264 return result
265 266 267
268 - def collectDevice(self, device):
269 """ 270 Collect data from a single device. 271 272 @param device: device to collect against 273 @type device: string 274 """ 275 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 276 ip = device.manageIp 277 timeout = clientTimeout + time.time() 278 self.wmiCollect(device, ip, timeout) 279 self.pythonCollect(device, ip, timeout) 280 self.cmdCollect(device, ip, timeout) 281 self.snmpCollect(device, ip, timeout) 282 self.portscanCollect(device, ip, timeout)
283 284 285
286 - def wmiCollect(self, device, ip, timeout):
287 """ 288 Start the Windows Management Instrumentation (WMI) collector 289 290 @param device: device to collect against 291 @type device: string 292 @param ip: IP address of device to collect against 293 @type ip: string 294 @param timeout: timeout before failing the connection 295 @type timeout: integer 296 """ 297 if self.options.nowmi: 298 return 299 300 client = None 301 try: 302 plugins = self.selectPlugins(device, 'wmi') 303 if not plugins: 304 self.log.info("No WMI plugins found for %s" % device.id) 305 return 306 if self.checkCollection(device): 307 self.log.info('WMI collector method for device %s' % device.id) 308 self.log.info("plugins: %s", 309 ", ".join(map(lambda p: p.name(), plugins))) 310 client = WMIClient(device, self, plugins) 311 if not client or not plugins: 312 self.log.warn("WMI collector creation failed") 313 return 314 except (SystemExit, KeyboardInterrupt): 315 raise 316 except Exception: 317 self.log.exception("Error opening WMI collector") 318 self.addClient(client, timeout, 'WMI', device.id)
319 320 321
322 - def pythonCollect(self, device, ip, timeout):
323 """ 324 Start local Python collection client. 325 326 @param device: device to collect against 327 @type device: string 328 @param ip: IP address of device to collect against 329 @type ip: string 330 @param timeout: timeout before failing the connection 331 @type timeout: integer 332 """ 333 client = None 334 try: 335 plugins = self.selectPlugins(device, "python") 336 if not plugins: 337 self.log.info("No Python plugins found for %s" % device.id) 338 return 339 if self.checkCollection(device): 340 self.log.info('Python collection device %s' % device.id) 341 self.log.info("plugins: %s", 342 ", ".join(map(lambda p: p.name(), plugins))) 343 client = PythonClient(device, self, plugins) 344 if not client or not plugins: 345 self.log.warn("Python client creation failed") 346 return 347 except (SystemExit, KeyboardInterrupt): raise 348 except: 349 self.log.exception("Error opening pythonclient") 350 self.addClient(client, timeout, 'python', device.id)
351 352
353 - def cmdCollect(self, device, ip, timeout):
354 """ 355 Start shell command collection client. 356 357 @param device: device to collect against 358 @type device: string 359 @param ip: IP address of device to collect against 360 @type ip: string 361 @param timeout: timeout before failing the connection 362 @type timeout: integer 363 """ 364 client = None 365 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 366 367 hostname = device.id 368 try: 369 plugins = self.selectPlugins(device,"command") 370 if not plugins: 371 self.log.info("No command plugins found for %s" % hostname) 372 return 373 374 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 375 commandPort = getattr(device, 'zCommandPort', defaultPort) 376 377 if protocol == "ssh": 378 client = SshClient(hostname, ip, commandPort, 379 options=self.options, 380 plugins=plugins, device=device, 381 datacollector=self) 382 clientType = 'ssh' 383 self.log.info('Using SSH collection method for device %s' 384 % hostname) 385 386 elif protocol == 'telnet': 387 if commandPort == 22: commandPort = 23 #set default telnet 388 client = TelnetClient(hostname, ip, commandPort, 389 options=self.options, 390 plugins=plugins, device=device, 391 datacollector=self) 392 clientType = 'telnet' 393 self.log.info('Using telnet collection method for device %s' 394 % hostname) 395 396 else: 397 info = ("Unknown protocol %s for device %s -- " 398 "defaulting to %s collection method" % 399 (protocol, hostname, clientType )) 400 self.log.warn( info ) 401 import socket 402 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 403 collector_host= socket.gethostname() 404 evt= { "eventClass":"/Status/Update", "agent":collector_host, 405 "device":hostname, "severity":Error } 406 evt[ 'summary' ]= info 407 self.sendEvent( evt ) 408 return 409 410 if not client: 411 self.log.warn("Shell command collector creation failed") 412 else: 413 self.log.info("plugins: %s", 414 ", ".join(map(lambda p: p.name(), plugins))) 415 except (SystemExit, KeyboardInterrupt): raise 416 except: 417 self.log.exception("Error opening command collector") 418 self.addClient(client, timeout, clientType, device.id)
419 420 421
422 - def snmpCollect(self, device, ip, timeout):
423 """ 424 Start SNMP collection client. 425 426 @param device: device to collect against 427 @type device: string 428 @param ip: IP address of device to collect against 429 @type ip: string 430 @param timeout: timeout before failing the connection 431 @type timeout: integer 432 """ 433 client = None 434 try: 435 hostname = device.id 436 if getattr( device, "zSnmpMonitorIgnore", True ): 437 self.log.info("SNMP monitoring off for %s" % hostname) 438 return 439 440 plugins = [] 441 plugins = self.selectPlugins(device,"snmp") 442 if not plugins: 443 self.log.info("No SNMP plugins found for %s" % hostname) 444 return 445 446 if self.checkCollection(device): 447 self.log.info('SNMP collection device %s' % hostname) 448 self.log.info("plugins: %s", 449 ", ".join(map(lambda p: p.name(), plugins))) 450 client = SnmpClient(device.id, ip, self.options, 451 device, self, plugins) 452 if not client or not plugins: 453 self.log.warn("SNMP collector creation failed") 454 return 455 except (SystemExit, KeyboardInterrupt): raise 456 except: 457 self.log.exception("Error opening the SNMP collector") 458 self.addClient(client, timeout, 'SNMP', device.id)
459 460 461 ######## need to make async test for snmp work at some point -EAD ######### 462 # def checkSnmpConnection(self, device): 463 # """ 464 # Check to see if our current community string is still valid 465 # 466 # @param device: the device against which we will check 467 # @type device: a Device instance 468 # @return: result is None or a tuple containing 469 # (community, port, version, snmp name) 470 # @rtype: deferred: Twisted deferred 471 # """ 472 # from pynetsnmp.twistedsnmp import AgentProxy 473 # 474 # def inner(driver): 475 # import pdb; pdb.set_trace() 476 # self.log.debug("Checking SNMP community %s on %s", 477 # device.zSnmpCommunity, device.id) 478 # 479 # oid = ".1.3.6.1.2.1.1.5.0" 480 # proxy = AgentProxy(device.id, 481 # device.zSnmpPort, 482 # timeout=device.zSnmpTimeout, 483 # community=device.zSnmpCommunity, 484 # snmpVersion=device.zSnmpVer, 485 # tries=2) 486 # proxy.open() 487 # yield proxy.get([oid]) 488 # devname = driver.next().values()[0] 489 # if devname: 490 # yield succeed(True) 491 # yield succeed(False) 492 # 493 # return drive(inner) 494 495
496 - def addClient(self, device, timeout, clientType, name):
497 """ 498 If device is not None, schedule the device to be collected. 499 Otherwise log an error. 500 501 @param device: device to collect against 502 @type device: string 503 @param timeout: timeout before failing the connection 504 @type timeout: integer 505 @param clientType: description of the plugin type 506 @type clientType: string 507 @param name: plugin name 508 @type name: string 509 """ 510 if device: 511 device.timeout = timeout 512 device.timedOut = False 513 self.clients.append(device) 514 device.run() 515 else: 516 self.log.warn('Unable to create a %s collector for %s', 517 clientType, name)
518 519 520 # XXX double-check this, once the implementation is in place
521 - def portscanCollect(self, device, ip, timeout):
522 """ 523 Start portscan collection client. 524 525 @param device: device to collect against 526 @type device: string 527 @param ip: IP address of device to collect against 528 @type ip: string 529 @param timeout: timeout before failing the connection 530 @type timeout: integer 531 """ 532 client = None 533 try: 534 hostname = device.id 535 plugins = self.selectPlugins(device, "portscan") 536 if not plugins: 537 self.log.info("No portscan plugins found for %s" % hostname) 538 return 539 if self.checkCollection(device): 540 self.log.info('Portscan collector method for device %s' 541 % hostname) 542 self.log.info("plugins: %s", 543 ", ".join(map(lambda p: p.name(), plugins))) 544 client = PortscanClient(device.id, ip, self.options, 545 device, self, plugins) 546 if not client or not plugins: 547 self.log.warn("Portscan collector creation failed") 548 return 549 except (SystemExit, KeyboardInterrupt): raise 550 except: 551 self.log.exception("Error opening portscan collector") 552 self.addClient(client, timeout, 'portscan', device.id)
553 554
555 - def checkCollection(self, device):
556 """ 557 See how old the data is that we've collected 558 559 @param device: device to collect against 560 @type device: string 561 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 562 @type: boolean 563 """ 564 age = device.getSnmpLastCollection() + self.collage 565 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 566 self.log.info("Skipped collection of %s" % device.id) 567 return False 568 return True
569 570
571 - def clientFinished(self, collectorClient):
572 """ 573 Callback that processes the return values from a device. 574 Python iterable. 575 576 @param collectorClient: collector instance 577 @type collectorClient: collector class 578 @return: Twisted deferred object 579 @type: Twisted deferred object 580 """ 581 device = collectorClient.device 582 self.log.debug("Client for %s finished collecting", device.id) 583 def processClient(driver): 584 try: 585 pluginStats = {} 586 self.log.debug("Processing data for device %s", device.id) 587 devchanged = False 588 maps = [] 589 for plugin, results in collectorClient.getResults(): 590 if plugin is None: continue 591 self.log.debug("Processing plugin %s on device %s ...", 592 plugin.name(), device.id) 593 if not results: 594 self.log.warn("The plugin %s returned no results.", 595 plugin.name()) 596 continue 597 598 datamaps = [] 599 try: 600 results = plugin.preprocess(results, self.log) 601 if results: 602 datamaps = plugin.process(device, results, self.log) 603 if datamaps: 604 pluginStats.setdefault(plugin.name(), plugin.weight) 605 606 except (SystemExit, KeyboardInterrupt), ex: 607 self.log.info( "Plugin %s terminated due to external" 608 " signal (%s)" % (plugin.name(), str(ex) ) 609 ) 610 continue 611 612 except Exception, ex: 613 # NB: don't discard the plugin, as it might be a 614 # temporary issue 615 # Also, report it against the device, rather than at 616 # a collector as it might be just for this device. 617 import socket 618 component= os.path.splitext( 619 os.path.basename( sys.argv[0] ) 620 )[0] 621 collector_host= socket.gethostname() 622 evt= { "eventClass":"/Status/Update", 623 "agent":collector_host, "device":device.id, 624 "severity":Error } 625 626 info= "Problem while executing plugin %s" %plugin.name() 627 self.log.error( info ) 628 evt[ 'summary' ]= info 629 630 info= traceback.format_exc() 631 self.log.error( info ) 632 evt[ 'message' ]= info 633 self.sendEvent( evt ) 634 continue 635 636 # allow multiple maps to be returned from one plugin 637 if type(datamaps) not in (types.ListType, types.TupleType): 638 datamaps = [datamaps,] 639 if datamaps: 640 maps += [m for m in datamaps if m] 641 if maps: 642 deviceClass = Classifier.classifyDevice(pluginStats, 643 self.classCollectorPlugins) 644 yield self.config().callRemote( 645 'applyDataMaps', device.id, 646 maps, deviceClass) 647 if driver.next(): 648 devchanged = True 649 if devchanged: 650 self.log.info("Changes in configuration applied") 651 else: 652 self.log.info("No change in configuration detected") 653 yield self.config().callRemote('setSnmpLastCollection', 654 device.id) 655 driver.next() 656 except Exception, ex: 657 self.log.exception(ex) 658 raise
659 660 def processClientFinished(result): 661 """ 662 Called after the client collection finishes 663 664 @param result: object (unused) 665 @type result: object 666 """ 667 if not result: 668 self.log.debug("Client %s finished" % device.id) 669 else: 670 self.log.error("Client %s finished with message: %s" % 671 (device.id, result)) 672 try: 673 self.clients.remove(collectorClient) 674 self.finished.append(collectorClient) 675 except ValueError: 676 self.log.debug("Client %s not found in in the list" 677 " of active clients", 678 device.id) 679 d = drive(self.fillCollectionSlots) 680 d.addErrback(self.fillError) 681 682 d = drive(processClient) 683 d.addBoth(processClientFinished) 684 685 686
687 - def fillError(self, reason):
688 """ 689 Twisted errback routine to log an error when 690 unable to collect some data 691 692 @param reason: error message 693 @type reason: string 694 """ 695 self.log.error("Unable to fill collection slots: %s" % reason)
696 697
698 - def cycleTime(self):
699 """ 700 Return our cycle time (in minutes) 701 702 @return: cycle time 703 @rtype: integer 704 """ 705 return self.modelerCycleInterval * 60
706 707
708 - def heartbeat(self, ignored=None):
709 """ 710 Twisted keep-alive mechanism to ensure that 711 we're still connected to zenhub 712 713 @param ignored: object (unused) 714 @type ignored: object 715 """ 716 ARBITRARY_BEAT = 30 717 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 718 if self.options.cycle: 719 evt = dict(eventClass=Heartbeat, 720 component='zenmodeler', 721 device=self.options.monitor, 722 timeout=3*ARBITRARY_BEAT) 723 self.sendEvent(evt) 724 self.niceDoggie(self.cycleTime())
725 726
727 - def checkStop(self, unused = None):
728 """ 729 Check to see if there's anything to do. 730 If there isn't, report our statistics and exit. 731 732 @param unused: unused (unused) 733 @type unused: string 734 """ 735 if self.clients: return 736 if self.devicegen: return 737 738 if self.start: 739 runTime = time.time() - self.start 740 self.start = None 741 self.log.info("Scan time: %0.2f seconds", runTime) 742 devices = len(self.finished) 743 timedOut = len([c for c in self.finished if c.timedOut]) 744 self.sendEvents( 745 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 746 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 747 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 748 ) 749 if not self.options.cycle: 750 self.stop() 751 self.finished = []
752 753
754 - def fillCollectionSlots(self, driver):
755 """ 756 An iterator which either returns a device to collect or 757 calls checkStop() 758 759 @param driver: driver object 760 @type driver: driver object 761 """ 762 count = len(self.clients) 763 while count < self.options.parallel and self.devicegen \ 764 and not self.pendingNewClients: 765 766 self.pendingNewClients = True 767 try: 768 device = self.devicegen.next() 769 yield self.config().callRemote('getDeviceConfig', [device]) 770 # just collect one device, and let the timer add more 771 devices = driver.next() 772 if devices: 773 self.collectDevice(devices[0]) 774 except StopIteration: 775 self.devicegen = None 776 777 self.pendingNewClients = False 778 break 779 780 update = len(self.clients) 781 if update != count and update != 1: 782 self.log.info('Running %d clients', update) 783 else: 784 self.log.debug('Running %d clients', update) 785 self.checkStop()
786 787
788 - def buildOptions(self):
789 """ 790 Build our list of command-line options 791 """ 792 PBDaemon.buildOptions(self) 793 self.parser.add_option('--debug', 794 dest='debug', action="store_true", default=False, 795 help="Don't fork threads for processing") 796 self.parser.add_option('--nowmi', 797 dest='nowmi', action="store_true", default=False, 798 help="Do not execute WMI plugins") 799 self.parser.add_option('--parallel', dest='parallel', 800 type='int', default=defaultParallel, 801 help="Number of devices to collect from in parallel") 802 self.parser.add_option('--cycletime', 803 dest='cycletime',default=0,type='int', 804 help="Run collection every x minutes") 805 self.parser.add_option('--ignore', 806 dest='ignorePlugins',default="", 807 help="Modeler plugins to ignore. Takes a regular expression") 808 self.parser.add_option('--collect', 809 dest='collectPlugins',default="", 810 help="Modeler plugins to use. Takes a regular expression") 811 self.parser.add_option('-p', '--path', dest='path', 812 help="Start class path for collection ie /NetworkDevices") 813 self.parser.add_option('-d', '--device', dest='device', 814 help="Fully qualified device name ie www.confmon.com") 815 self.parser.add_option('-a', '--collage', 816 dest='collage', default=0, type='float', 817 help="Do not collect from devices whose collect date " + 818 "is within this many minutes") 819 self.parser.add_option('--writetries', 820 dest='writetries',default=2,type='int', 821 help="Number of times to try to write if a " 822 "read conflict is found") 823 # FIXME: cleanup --force option #2660 824 self.parser.add_option("-F", "--force", 825 dest="force", action='store_true', default=True, 826 help="Force collection of config data (deprecated)") 827 self.parser.add_option('--portscantimeout', dest='portscantimeout', 828 type='int', default=defaultPortScanTimeout, 829 help="Time to wait for connection failures when port scanning") 830 self.parser.add_option('--now', 831 dest='now', action="store_true", default=False, 832 help="Start daemon now, do not sleep before starting") 833 TCbuildOptions(self.parser, self.usage)
834 835 836
837 - def processOptions(self):
838 """ 839 Check what the user gave us vs what we'll accept 840 for command-line options 841 """ 842 if not self.options.path and not self.options.device: 843 self.options.path = "/Devices" 844 if self.options.ignorePlugins and self.options.collectPlugins: 845 raise SystemExit( "Only one of --ignore or --collect" 846 " can be used at a time")
847 848
849 - def _timeoutClients(self):
850 """ 851 The guts of the timeoutClients method (minus the twisted reactor 852 stuff). Breaking this part out as a separate method facilitates unit 853 testing. 854 """ 855 active = [] 856 for client in self.clients: 857 if client.timeout < time.time(): 858 self.log.warn("Client %s timeout", client.hostname) 859 self.finished.append(client) 860 client.timedOut = True 861 client.stop() 862 else: 863 active.append(client) 864 self.clients = active
865 866 867
868 - def timeoutClients(self, unused=None):
869 """ 870 Check to see which clients have timed out and which ones haven't. 871 Stop processing anything that's timed out. 872 873 @param unused: unused (unused) 874 @type unused: string 875 """ 876 reactor.callLater(1, self.timeoutClients) 877 self._timeoutClients() 878 d = drive(self.fillCollectionSlots) 879 d.addCallback(self.checkStop) 880 d.addErrback(self.fillError)
881 882 883
884 - def reactorLoop(self):
885 """ 886 Twisted main loop 887 """ 888 reactor.startRunning() 889 while reactor.running: 890 try: 891 while reactor.running: 892 reactor.runUntilCurrent() 893 timeout = reactor.timeout() 894 reactor.doIteration(timeout) 895 except: 896 if reactor.running: 897 self.log.exception("Unexpected error in main loop.")
898 899 900
901 - def getDeviceList(self):
902 """ 903 Get the list of devices for which we are collecting: 904 * if -d devicename was used, use the devicename 905 * if a class path flag was supplied, gather the devices 906 along that organizer 907 * otherwise get all of the devices associated with our collector 908 909 @return: list of devices 910 @rtype: list 911 """ 912 if self.options.device: 913 self.log.info("Collecting for device %s", self.options.device) 914 return succeed([self.options.device]) 915 916 self.log.info("Collecting for path %s", self.options.path) 917 return self.config().callRemote('getDeviceListByOrganizer', 918 self.options.path, 919 self.options.monitor)
920 921
922 - def mainLoop(self, driver):
923 """ 924 Main collection loop, a Python iterable 925 926 @param driver: driver object 927 @type driver: driver object 928 @return: Twisted deferred object 929 @rtype: Twisted deferred object 930 """ 931 if self.options.cycle: 932 driveLater(self.cycleTime(), self.mainLoop) 933 934 if self.clients: 935 self.log.error("Modeling cycle taking too long") 936 return 937 938 self.start = time.time() 939 940 self.log.debug("Starting collector loop...") 941 yield self.getDeviceList() 942 self.devicegen = iter(driver.next()) 943 d = drive(self.fillCollectionSlots) 944 d.addErrback(self.fillError) 945 yield d 946 driver.next() 947 self.log.debug("Collection slots filled")
948 949 950
951 - def main(self, unused=None):
952 """ 953 Wrapper around the mainLoop 954 955 @param unused: unused (unused) 956 @type unused: string 957 @return: Twisted deferred object 958 @rtype: Twisted deferred object 959 """ 960 self.finished = [] 961 d = drive(self.mainLoop) 962 d.addCallback(self.timeoutClients) 963 return d
964 965 966
967 - def remote_deleteDevice(self, device):
968 """ 969 Stub function 970 971 @param device: device name (unused) 972 @type device: string 973 @todo: implement 974 """ 975 # we fetch the device list before every scan 976 self.log.debug("Asynch deleteDevice %s" % device)
977 978 979 if __name__ == '__main__': 980 dc = ZenModeler() 981 dc.processOptions() 982 reactor.run = dc.reactorLoop 983 dc.run() 984