Package Products :: Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.zenmodeler

   1  ########################################################################## 
   2  # 
   3  # This program is part of Zenoss Core, an open source monitoring platform. 
   4  # Copyright (C) 2007, Zenoss Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify it 
   7  # under the terms of the GNU General Public License version 2 as published by 
   8  # the Free Software Foundation. 
   9  # 
  10  # For complete information please visit: http://www.zenoss.com/oss/ 
  11  # 
  12  ########################################################################### 
  13   
  14  __doc__= """Discover (aka model) a device and its components. 
  15  For instance, find out what Ethernet interfaces and hard disks a server 
  16  has available. 
  17  This information should change much less frequently than performance metrics. 
  18  """ 
  19   
  20  # IMPORTANT! The import of the pysamba.twisted.reactor module should come before 
  21  # any other libraries that might possibly use twisted. This will ensure that 
  22  # the proper WmiReactor is installed before anyone else grabs a reference to 
  23  # the wrong reactor. 
  24  import pysamba.twisted.reactor 
  25   
  26  import Globals 
  27  from Products.ZenWin.WMIClient import WMIClient 
  28  from Products.ZenWin.utils import addNTLMv2Option, setNTLMv2Auth 
  29  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
  30  from Products.ZenUtils.DaemonStats import DaemonStats 
  31  from Products.ZenUtils.Driver import drive, driveLater 
  32  from Products.ZenUtils.Utils import unused 
  33  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
  34   
  35  from PythonClient   import PythonClient 
  36  from SshClient      import SshClient 
  37  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
  38  from SnmpClient     import SnmpClient 
  39  from PortscanClient import PortscanClient 
  40                            
  41  from Products.DataCollector import Classifier 
  42   
  43  from twisted.internet import reactor 
  44  from twisted.internet.defer import succeed 
  45   
  46  import time 
  47  import types 
  48  import re 
  49  import DateTime 
  50   
  51  import os 
  52  import os.path 
  53  import sys 
  54  import traceback 
  55  from random import randint 
  56          
  57  defaultPortScanTimeout = 5 
  58  defaultParallel = 1 
  59  defaultProtocol = "ssh" 
  60  defaultPort = 22 
  61   
  62  # needed for Twisted's PB (Perspective Broker) to work 
  63  from Products.DataCollector import DeviceProxy 
  64  from Products.DataCollector import Plugins 
  65  unused(DeviceProxy, Plugins) 
  66   
67 -class ZenModeler(PBDaemon):
68 """ 69 Daemon class to attach to zenhub and pass along 70 device configuration information. 71 """ 72 73 name = 'zenmodeler' 74 initialServices = PBDaemon.initialServices + ['ModelerService'] 75 76 generateEvents = True 77 configCycleInterval = 360 78 79 classCollectorPlugins = () 80
81 - def __init__(self, single=False ):
82 """ 83 Initalizer 84 85 @param single: collect from a single device? 86 @type single: boolean 87 """ 88 PBDaemon.__init__(self) 89 # FIXME: cleanup --force option #2660 90 self.options.force = True 91 self.start = None 92 self.rrdStats = DaemonStats() 93 self.single = single 94 if self.options.device: 95 self.single = True 96 self.modelerCycleInterval = self.options.cycletime 97 self.collage = float( self.options.collage ) / 1440.0 98 self.pendingNewClients = False 99 self.clients = [] 100 self.finished = [] 101 self.devicegen = None 102 103 # Delay start for between 10 and 60 minutes when run as a daemon. 104 self.started = False 105 self.startDelay = 0 106 if self.options.daemon: 107 if self.options.now: 108 self.log.debug("Run as a daemon, starting immediately.") 109 else: 110 # self.startDelay = randint(10, 60) * 60 111 self.startDelay = randint(10, 60) * 1 112 self.log.info("Run as a daemon, waiting %s seconds to start." % 113 self.startDelay) 114 else: 115 self.log.debug("Run in foreground, starting immediately.")
116 117
118 - def reportError(self, error):
119 """ 120 Log errors that have occurred 121 122 @param error: error message 123 @type error: string 124 """ 125 self.log.error("Error occured: %s", error)
126 127
128 - def connected(self):
129 """ 130 Called after connected to the zenhub service 131 """ 132 d = self.configure() 133 d.addCallback(self.heartbeat) 134 d.addErrback(self.reportError)
135 136
137 - def configure(self):
138 """ 139 Get our configuration from zenhub 140 """ 141 # add in the code to fetch cycle time, etc. 142 def inner(driver): 143 """ 144 Generator function to gather our configuration 145 146 @param driver: driver object 147 @type driver: driver object 148 """ 149 self.log.debug('fetching monitor properties') 150 yield self.config().callRemote('propertyItems') 151 items = dict(driver.next()) 152 if self.options.cycletime == 0: 153 self.modelerCycleInterval = items.get('modelerCycleInterval', 154 self.modelerCycleInterval) 155 self.configCycleInterval = items.get('configCycleInterval', 156 self.configCycleInterval) 157 reactor.callLater(self.configCycleInterval * 60, self.configure) 158 159 self.log.debug("Getting threshold classes...") 160 yield self.config().callRemote('getThresholdClasses') 161 self.remote_updateThresholdClasses(driver.next()) 162 163 self.log.debug("Fetching default RRDCreateCommand...") 164 yield self.config().callRemote('getDefaultRRDCreateCommand') 165 createCommand = driver.next() 166 167 self.log.debug("Getting collector thresholds...") 168 yield self.config().callRemote('getCollectorThresholds') 169 self.rrdStats.config(self.options.monitor, 170 self.name, 171 driver.next(), 172 createCommand) 173 174 self.log.debug("Getting collector plugins for each DeviceClass") 175 yield self.config().callRemote('getClassCollectorPlugins') 176 self.classCollectorPlugins = driver.next()
177 178 return drive(inner)
179 180
181 - def config(self):
182 """ 183 Get the ModelerService 184 """ 185 return self.services.get('ModelerService', FakeRemote())
186 187
188 - def selectPlugins(self, device, transport):
189 """ 190 Build a list of active plugins for a device, based on: 191 192 * the --collect command-line option which is a regex 193 * the --ignore command-line option which is a regex 194 * transport which is a string describing the type of plugin 195 196 @param device: device to collect against 197 @type device: string 198 @param transport: python, ssh, snmp, telnet, cmd 199 @type transport: string 200 @return: results of the plugin 201 @type: string 202 @todo: determine if an event for the collector AND the device should be sent 203 """ 204 plugins = [] 205 valid_loaders = [] 206 for loader in device.plugins: 207 try: 208 plugin= loader.create() 209 self.log.debug( "Loaded plugin %s" % plugin.name() ) 210 plugins.append( plugin ) 211 valid_loaders.append( loader ) 212 213 except (SystemExit, KeyboardInterrupt), ex: 214 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 215 raise 216 217 except Plugins.PluginImportError, import_error: 218 import socket 219 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 220 collector_host= socket.gethostname() 221 # NB: an import errror affects all devices, 222 # so report the issue against the collector 223 # TOOD: determine if an event for the collector AND the device should be sent 224 evt= { "eventClass":"/Status/Update", "component":component, 225 "agent":collector_host, "device":collector_host, 226 "severity":Error } 227 228 info= "Problem loading plugin %s" % import_error.plugin 229 self.log.error( info ) 230 evt[ 'summary' ]= info 231 232 info= import_error.traceback 233 self.log.error( info ) 234 evt[ 'message' ]= info 235 236 info= ("Due to import errors, removing the %s plugin" 237 " from this collection cycle.") % import_error.plugin 238 self.log.error( info ) 239 evt[ 'message' ] += "%s\n" % info 240 self.sendEvent( evt ) 241 242 # Make sure that we don't generate messages for bad loaders again 243 # NB: doesn't update the device's zProperties 244 if len( device.plugins ) != len( valid_loaders ): 245 device.plugins= valid_loaders 246 247 # Create functions to search for what plugins we will and 248 # won't supply to the device 249 collectTest = lambda x: False 250 ignoreTest = lambda x: False 251 if self.options.collectPlugins: 252 collectTest = re.compile(self.options.collectPlugins).search 253 elif self.options.ignorePlugins: 254 ignoreTest = re.compile(self.options.ignorePlugins).search 255 256 result = [] 257 for plugin in plugins: 258 if plugin.transport != transport: 259 continue 260 name = plugin.name() 261 if ignoreTest(name): 262 self.log.debug("Ignoring %s on %s because of --ignore flag", 263 name, device.id) 264 elif collectTest(name): 265 self.log.debug("Using %s on %s because of --collect flag", 266 name, device.id) 267 result.append(plugin) 268 elif not self.options.collectPlugins: 269 self.log.debug("Using %s on %s", name, device.id) 270 result.append(plugin) 271 return result
272 273 274
275 - def collectDevice(self, device):
276 """ 277 Collect data from a single device. 278 279 @param device: device to collect against 280 @type device: string 281 """ 282 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 283 ip = device.manageIp 284 timeout = clientTimeout + time.time() 285 self.wmiCollect(device, ip, timeout) 286 self.pythonCollect(device, ip, timeout) 287 self.cmdCollect(device, ip, timeout) 288 self.snmpCollect(device, ip, timeout) 289 self.portscanCollect(device, ip, timeout)
290 291 292
293 - def wmiCollect(self, device, ip, timeout):
294 """ 295 Start the Windows Management Instrumentation (WMI) collector 296 297 @param device: device to collect against 298 @type device: string 299 @param ip: IP address of device to collect against 300 @type ip: string 301 @param timeout: timeout before failing the connection 302 @type timeout: integer 303 """ 304 if self.options.nowmi: 305 return 306 307 client = None 308 try: 309 plugins = self.selectPlugins(device, 'wmi') 310 if not plugins: 311 self.log.info("No WMI plugins found for %s" % device.id) 312 return 313 if self.checkCollection(device): 314 self.log.info('WMI collector method for device %s' % device.id) 315 self.log.info("plugins: %s", 316 ", ".join(map(lambda p: p.name(), plugins))) 317 client = WMIClient(device, self, plugins) 318 if not client or not plugins: 319 self.log.warn("WMI collector creation failed") 320 return 321 except (SystemExit, KeyboardInterrupt): 322 raise 323 except Exception: 324 self.log.exception("Error opening WMI collector") 325 self.addClient(client, timeout, 'WMI', device.id)
326 327 328
329 - def pythonCollect(self, device, ip, timeout):
330 """ 331 Start local Python collection client. 332 333 @param device: device to collect against 334 @type device: string 335 @param ip: IP address of device to collect against 336 @type ip: string 337 @param timeout: timeout before failing the connection 338 @type timeout: integer 339 """ 340 client = None 341 try: 342 plugins = self.selectPlugins(device, "python") 343 if not plugins: 344 self.log.info("No Python plugins found for %s" % device.id) 345 return 346 if self.checkCollection(device): 347 self.log.info('Python collection device %s' % device.id) 348 self.log.info("plugins: %s", 349 ", ".join(map(lambda p: p.name(), plugins))) 350 client = PythonClient(device, self, plugins) 351 if not client or not plugins: 352 self.log.warn("Python client creation failed") 353 return 354 except (SystemExit, KeyboardInterrupt): raise 355 except: 356 self.log.exception("Error opening pythonclient") 357 self.addClient(client, timeout, 'python', device.id)
358 359
360 - def cmdCollect(self, device, ip, timeout):
361 """ 362 Start shell command collection client. 363 364 @param device: device to collect against 365 @type device: string 366 @param ip: IP address of device to collect against 367 @type ip: string 368 @param timeout: timeout before failing the connection 369 @type timeout: integer 370 """ 371 client = None 372 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 373 374 hostname = device.id 375 try: 376 plugins = self.selectPlugins(device,"command") 377 if not plugins: 378 self.log.info("No command plugins found for %s" % hostname) 379 return 380 381 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 382 commandPort = getattr(device, 'zCommandPort', defaultPort) 383 384 if protocol == "ssh": 385 client = SshClient(hostname, ip, commandPort, 386 options=self.options, 387 plugins=plugins, device=device, 388 datacollector=self, isLoseConnection=True) 389 clientType = 'ssh' 390 self.log.info('Using SSH collection method for device %s' 391 % hostname) 392 393 elif protocol == 'telnet': 394 if commandPort == 22: commandPort = 23 #set default telnet 395 client = TelnetClient(hostname, ip, commandPort, 396 options=self.options, 397 plugins=plugins, device=device, 398 datacollector=self) 399 clientType = 'telnet' 400 self.log.info('Using telnet collection method for device %s' 401 % hostname) 402 403 else: 404 info = ("Unknown protocol %s for device %s -- " 405 "defaulting to %s collection method" % 406 (protocol, hostname, clientType )) 407 self.log.warn( info ) 408 import socket 409 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 410 collector_host= socket.gethostname() 411 evt= { "eventClass":"/Status/Update", "agent":collector_host, 412 "device":hostname, "severity":Error } 413 evt[ 'summary' ]= info 414 self.sendEvent( evt ) 415 return 416 417 if not client: 418 self.log.warn("Shell command collector creation failed") 419 else: 420 self.log.info("plugins: %s", 421 ", ".join(map(lambda p: p.name(), plugins))) 422 except (SystemExit, KeyboardInterrupt): raise 423 except: 424 self.log.exception("Error opening command collector") 425 self.addClient(client, timeout, clientType, device.id)
426 427 428
429 - def snmpCollect(self, device, ip, timeout):
430 """ 431 Start SNMP collection client. 432 433 @param device: device to collect against 434 @type device: string 435 @param ip: IP address of device to collect against 436 @type ip: string 437 @param timeout: timeout before failing the connection 438 @type timeout: integer 439 """ 440 client = None 441 try: 442 hostname = device.id 443 if getattr( device, "zSnmpMonitorIgnore", True ): 444 self.log.info("SNMP monitoring off for %s" % hostname) 445 return 446 447 if not ip: 448 self.log.info("No manage IP for %s" % hostname) 449 return 450 451 plugins = [] 452 plugins = self.selectPlugins(device,"snmp") 453 if not plugins: 454 self.log.info("No SNMP plugins found for %s" % hostname) 455 return 456 457 if self.checkCollection(device): 458 self.log.info('SNMP collection device %s' % hostname) 459 self.log.info("plugins: %s", 460 ", ".join(map(lambda p: p.name(), plugins))) 461 client = SnmpClient(device.id, ip, self.options, 462 device, self, plugins) 463 if not client or not plugins: 464 self.log.warn("SNMP collector creation failed") 465 return 466 except (SystemExit, KeyboardInterrupt): raise 467 except: 468 self.log.exception("Error opening the SNMP collector") 469 self.addClient(client, timeout, 'SNMP', device.id)
470 471 472 ######## need to make async test for snmp work at some point -EAD ######### 473 # def checkSnmpConnection(self, device): 474 # """ 475 # Check to see if our current community string is still valid 476 # 477 # @param device: the device against which we will check 478 # @type device: a Device instance 479 # @return: result is None or a tuple containing 480 # (community, port, version, snmp name) 481 # @rtype: deferred: Twisted deferred 482 # """ 483 # from pynetsnmp.twistedsnmp import AgentProxy 484 # 485 # def inner(driver): 486 # self.log.debug("Checking SNMP community %s on %s", 487 # device.zSnmpCommunity, device.id) 488 # 489 # oid = ".1.3.6.1.2.1.1.5.0" 490 # proxy = AgentProxy(device.id, 491 # device.zSnmpPort, 492 # timeout=device.zSnmpTimeout, 493 # community=device.zSnmpCommunity, 494 # snmpVersion=device.zSnmpVer, 495 # tries=2) 496 # proxy.open() 497 # yield proxy.get([oid]) 498 # devname = driver.next().values()[0] 499 # if devname: 500 # yield succeed(True) 501 # yield succeed(False) 502 # 503 # return drive(inner) 504 505
506 - def addClient(self, device, timeout, clientType, name):
507 """ 508 If device is not None, schedule the device to be collected. 509 Otherwise log an error. 510 511 @param device: device to collect against 512 @type device: string 513 @param timeout: timeout before failing the connection 514 @type timeout: integer 515 @param clientType: description of the plugin type 516 @type clientType: string 517 @param name: plugin name 518 @type name: string 519 """ 520 if device: 521 device.timeout = timeout 522 device.timedOut = False 523 self.clients.append(device) 524 device.run() 525 else: 526 self.log.warn('Unable to create a %s collector for %s', 527 clientType, name)
528 529 530 # XXX double-check this, once the implementation is in place
531 - def portscanCollect(self, device, ip, timeout):
532 """ 533 Start portscan collection client. 534 535 @param device: device to collect against 536 @type device: string 537 @param ip: IP address of device to collect against 538 @type ip: string 539 @param timeout: timeout before failing the connection 540 @type timeout: integer 541 """ 542 client = None 543 try: 544 hostname = device.id 545 plugins = self.selectPlugins(device, "portscan") 546 if not plugins: 547 self.log.info("No portscan plugins found for %s" % hostname) 548 return 549 if self.checkCollection(device): 550 self.log.info('Portscan collector method for device %s' 551 % hostname) 552 self.log.info("plugins: %s", 553 ", ".join(map(lambda p: p.name(), plugins))) 554 client = PortscanClient(device.id, ip, self.options, 555 device, self, plugins) 556 if not client or not plugins: 557 self.log.warn("Portscan collector creation failed") 558 return 559 except (SystemExit, KeyboardInterrupt): raise 560 except: 561 self.log.exception("Error opening portscan collector") 562 self.addClient(client, timeout, 'portscan', device.id)
563 564
565 - def checkCollection(self, device):
566 """ 567 See how old the data is that we've collected 568 569 @param device: device to collect against 570 @type device: string 571 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 572 @type: boolean 573 """ 574 age = device.getSnmpLastCollection() + self.collage 575 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 576 self.log.info("Skipped collection of %s" % device.id) 577 return False 578 return True
579 580
581 - def clientFinished(self, collectorClient):
582 """ 583 Callback that processes the return values from a device. 584 Python iterable. 585 586 @param collectorClient: collector instance 587 @type collectorClient: collector class 588 @return: Twisted deferred object 589 @type: Twisted deferred object 590 """ 591 device = collectorClient.device 592 self.log.debug("Client for %s finished collecting", device.id) 593 def processClient(driver): 594 try: 595 pluginStats = {} 596 self.log.debug("Processing data for device %s", device.id) 597 devchanged = False 598 maps = [] 599 for plugin, results in collectorClient.getResults(): 600 if plugin is None: continue 601 self.log.debug("Processing plugin %s on device %s ...", 602 plugin.name(), device.id) 603 if not results: 604 self.log.warn("The plugin %s returned no results.", 605 plugin.name()) 606 continue 607 608 datamaps = [] 609 try: 610 results = plugin.preprocess(results, self.log) 611 if results: 612 datamaps = plugin.process(device, results, self.log) 613 if datamaps: 614 pluginStats.setdefault(plugin.name(), plugin.weight) 615 616 except (SystemExit, KeyboardInterrupt), ex: 617 self.log.info( "Plugin %s terminated due to external" 618 " signal (%s)" % (plugin.name(), str(ex) ) 619 ) 620 continue 621 622 except Exception, ex: 623 # NB: don't discard the plugin, as it might be a 624 # temporary issue 625 # Also, report it against the device, rather than at 626 # a collector as it might be just for this device. 627 import socket 628 component= os.path.splitext( 629 os.path.basename( sys.argv[0] ) 630 )[0] 631 collector_host= socket.gethostname() 632 evt= { "eventClass":"/Status/Update", 633 "agent":collector_host, "device":device.id, 634 "severity":Error } 635 636 info= "Problem while executing plugin %s" %plugin.name() 637 self.log.error( info ) 638 evt[ 'summary' ]= info 639 640 info= traceback.format_exc() 641 self.log.error( info ) 642 evt[ 'message' ]= info 643 self.sendEvent( evt ) 644 continue 645 646 # allow multiple maps to be returned from one plugin 647 if type(datamaps) not in (types.ListType, types.TupleType): 648 datamaps = [datamaps,] 649 if datamaps: 650 maps += [m for m in datamaps if m] 651 if maps: 652 deviceClass = Classifier.classifyDevice(pluginStats, 653 self.classCollectorPlugins) 654 yield self.config().callRemote( 655 'applyDataMaps', device.id, 656 maps, deviceClass) 657 if driver.next(): 658 devchanged = True 659 if devchanged: 660 self.log.info("Changes in configuration applied") 661 else: 662 self.log.info("No change in configuration detected") 663 yield self.config().callRemote('setSnmpLastCollection', 664 device.id) 665 driver.next() 666 except Exception, ex: 667 self.log.exception(ex) 668 raise
669 670 def processClientFinished(result): 671 """ 672 Called after the client collection finishes 673 674 @param result: object (unused) 675 @type result: object 676 """ 677 if not result: 678 self.log.debug("Client %s finished" % device.id) 679 else: 680 self.log.error("Client %s finished with message: %s" % 681 (device.id, result)) 682 try: 683 self.clients.remove(collectorClient) 684 self.finished.append(collectorClient) 685 except ValueError: 686 self.log.debug("Client %s not found in in the list" 687 " of active clients", 688 device.id) 689 d = drive(self.fillCollectionSlots) 690 d.addErrback(self.fillError) 691 692 d = drive(processClient) 693 d.addBoth(processClientFinished) 694 695 696
697 - def fillError(self, reason):
698 """ 699 Twisted errback routine to log an error when 700 unable to collect some data 701 702 @param reason: error message 703 @type reason: string 704 """ 705 self.log.error("Unable to fill collection slots: %s" % reason)
706 707
708 - def cycleTime(self):
709 """ 710 Return our cycle time (in minutes) 711 712 @return: cycle time 713 @rtype: integer 714 """ 715 return self.modelerCycleInterval * 60
716 717
718 - def heartbeat(self, ignored=None):
719 """ 720 Twisted keep-alive mechanism to ensure that 721 we're still connected to zenhub 722 723 @param ignored: object (unused) 724 @type ignored: object 725 """ 726 ARBITRARY_BEAT = 30 727 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 728 if self.options.cycle: 729 evt = dict(eventClass=Heartbeat, 730 component='zenmodeler', 731 device=self.options.monitor, 732 timeout=3*ARBITRARY_BEAT) 733 self.sendEvent(evt) 734 self.niceDoggie(self.cycleTime()) 735 736 # We start modeling from here to accomodate the startup delay. 737 if not self.started: 738 self.started = True 739 reactor.callLater(self.startDelay, self.main)
740 741
742 - def checkStop(self, unused = None):
743 """ 744 Check to see if there's anything to do. 745 If there isn't, report our statistics and exit. 746 747 @param unused: unused (unused) 748 @type unused: string 749 """ 750 if self.clients: return 751 if self.devicegen: return 752 753 if self.start: 754 runTime = time.time() - self.start 755 self.start = None 756 self.log.info("Scan time: %0.2f seconds", runTime) 757 devices = len(self.finished) 758 timedOut = len([c for c in self.finished if c.timedOut]) 759 self.sendEvents( 760 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 761 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 762 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 763 ) 764 if not self.options.cycle: 765 self.stop() 766 self.finished = []
767 768
769 - def fillCollectionSlots(self, driver):
770 """ 771 An iterator which either returns a device to collect or 772 calls checkStop() 773 774 @param driver: driver object 775 @type driver: driver object 776 """ 777 count = len(self.clients) 778 while count < self.options.parallel and self.devicegen \ 779 and not self.pendingNewClients: 780 781 self.pendingNewClients = True 782 try: 783 device = self.devicegen.next() 784 yield self.config().callRemote('getDeviceConfig', [device]) 785 # just collect one device, and let the timer add more 786 devices = driver.next() 787 if devices: 788 self.collectDevice(devices[0]) 789 except StopIteration: 790 self.devicegen = None 791 792 self.pendingNewClients = False 793 break 794 795 update = len(self.clients) 796 if update != count and update != 1: 797 self.log.info('Running %d clients', update) 798 else: 799 self.log.debug('Running %d clients', update) 800 self.checkStop()
801 802
803 - def buildOptions(self):
804 """ 805 Build our list of command-line options 806 """ 807 PBDaemon.buildOptions(self) 808 self.parser.add_option('--debug', 809 dest='debug', action="store_true", default=False, 810 help="Don't fork threads for processing") 811 self.parser.add_option('--nowmi', 812 dest='nowmi', action="store_true", default=False, 813 help="Do not execute WMI plugins") 814 self.parser.add_option('--parallel', dest='parallel', 815 type='int', default=defaultParallel, 816 help="Number of devices to collect from in parallel") 817 self.parser.add_option('--cycletime', 818 dest='cycletime',default=720,type='int', 819 help="Run collection every x minutes") 820 self.parser.add_option('--ignore', 821 dest='ignorePlugins',default="", 822 help="Modeler plugins to ignore. Takes a regular expression") 823 self.parser.add_option('--collect', 824 dest='collectPlugins',default="", 825 help="Modeler plugins to use. Takes a regular expression") 826 self.parser.add_option('-p', '--path', dest='path', 827 help="Start class path for collection ie /NetworkDevices") 828 self.parser.add_option('-d', '--device', dest='device', 829 help="Fully qualified device name ie www.confmon.com") 830 self.parser.add_option('-a', '--collage', 831 dest='collage', default=0, type='float', 832 help="Do not collect from devices whose collect date " + 833 "is within this many minutes") 834 self.parser.add_option('--writetries', 835 dest='writetries',default=2,type='int', 836 help="Number of times to try to write if a " 837 "read conflict is found") 838 # FIXME: cleanup --force option #2660 839 self.parser.add_option("-F", "--force", 840 dest="force", action='store_true', default=True, 841 help="Force collection of config data (deprecated)") 842 self.parser.add_option('--portscantimeout', dest='portscantimeout', 843 type='int', default=defaultPortScanTimeout, 844 help="Time to wait for connection failures when port scanning") 845 self.parser.add_option('--now', 846 dest='now', action="store_true", default=False, 847 help="Start daemon now, do not sleep before starting") 848 TCbuildOptions(self.parser, self.usage) 849 addNTLMv2Option(self.parser)
850 851 852
853 - def processOptions(self):
854 """ 855 Check what the user gave us vs what we'll accept 856 for command-line options 857 """ 858 if not self.options.path and not self.options.device: 859 self.options.path = "/Devices" 860 if self.options.ignorePlugins and self.options.collectPlugins: 861 raise SystemExit( "Only one of --ignore or --collect" 862 " can be used at a time") 863 setNTLMv2Auth(self.options)
864
865 - def _timeoutClients(self):
866 """ 867 The guts of the timeoutClients method (minus the twisted reactor 868 stuff). Breaking this part out as a separate method facilitates unit 869 testing. 870 """ 871 active = [] 872 for client in self.clients: 873 if client.timeout < time.time(): 874 self.log.warn("Client %s timeout", client.hostname) 875 self.finished.append(client) 876 client.timedOut = True 877 client.stop() 878 else: 879 active.append(client) 880 self.clients = active
881 882 883
884 - def timeoutClients(self, unused=None):
885 """ 886 Check to see which clients have timed out and which ones haven't. 887 Stop processing anything that's timed out. 888 889 @param unused: unused (unused) 890 @type unused: string 891 """ 892 reactor.callLater(1, self.timeoutClients) 893 self._timeoutClients() 894 d = drive(self.fillCollectionSlots) 895 d.addCallback(self.checkStop) 896 d.addErrback(self.fillError)
897 898 899
900 - def reactorLoop(self):
901 """ 902 Twisted main loop 903 """ 904 reactor.startRunning() 905 while reactor.running: 906 try: 907 while reactor.running: 908 reactor.runUntilCurrent() 909 timeout = reactor.timeout() 910 reactor.doIteration(timeout) 911 except: 912 if reactor.running: 913 self.log.exception("Unexpected error in main loop.")
914 915 916
917 - def getDeviceList(self):
918 """ 919 Get the list of devices for which we are collecting: 920 * if -d devicename was used, use the devicename 921 * if a class path flag was supplied, gather the devices 922 along that organizer 923 * otherwise get all of the devices associated with our collector 924 925 @return: list of devices 926 @rtype: list 927 """ 928 if self.options.device: 929 self.log.info("Collecting for device %s", self.options.device) 930 return succeed([self.options.device]) 931 932 self.log.info("Collecting for path %s", self.options.path) 933 return self.config().callRemote('getDeviceListByOrganizer', 934 self.options.path, 935 self.options.monitor)
936 937
938 - def mainLoop(self, driver):
939 """ 940 Main collection loop, a Python iterable 941 942 @param driver: driver object 943 @type driver: driver object 944 @return: Twisted deferred object 945 @rtype: Twisted deferred object 946 """ 947 if self.options.cycle: 948 driveLater(self.cycleTime(), self.mainLoop) 949 950 if self.clients: 951 self.log.error("Modeling cycle taking too long") 952 return 953 954 self.start = time.time() 955 956 self.log.debug("Starting collector loop...") 957 yield self.getDeviceList() 958 self.devicegen = iter(driver.next()) 959 d = drive(self.fillCollectionSlots) 960 d.addErrback(self.fillError) 961 yield d 962 driver.next() 963 self.log.debug("Collection slots filled")
964 965 966
967 - def main(self, unused=None):
968 """ 969 Wrapper around the mainLoop 970 971 @param unused: unused (unused) 972 @type unused: string 973 @return: Twisted deferred object 974 @rtype: Twisted deferred object 975 """ 976 self.finished = [] 977 d = drive(self.mainLoop) 978 d.addCallback(self.timeoutClients) 979 return d
980 981 982
983 - def remote_deleteDevice(self, device):
984 """ 985 Stub function 986 987 @param device: device name (unused) 988 @type device: string 989 @todo: implement 990 """ 991 # we fetch the device list before every scan 992 self.log.debug("Asynch deleteDevice %s" % device)
993 994 995 if __name__ == '__main__': 996 dc = ZenModeler() 997 dc.processOptions() 998 reactor.run = dc.reactorLoop 999 dc.run() 1000