Package Products :: Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.zenmodeler

   1  ########################################################################## 
   2  # 
   3  # This program is part of Zenoss Core, an open source monitoring platform. 
   4  # Copyright (C) 2007, Zenoss Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify it 
   7  # under the terms of the GNU General Public License version 2 as published by 
   8  # the Free Software Foundation. 
   9  # 
  10  # For complete information please visit: http://www.zenoss.com/oss/ 
  11  # 
  12  ########################################################################### 
  13   
  14  __doc__= """Discover (aka model) a device and its components. 
  15  For instance, find out what Ethernet interfaces and hard disks a server 
  16  has available. 
  17  This information should change much less frequently than performance metrics. 
  18  """ 
  19   
  20  # IMPORTANT! The import of the pysamba.twisted.reactor module should come before 
  21  # any other libraries that might possibly use twisted. This will ensure that 
  22  # the proper WmiReactor is installed before anyone else grabs a reference to 
  23  # the wrong reactor. 
  24  import pysamba.twisted.reactor 
  25   
  26  import Globals 
  27  from Products.ZenWin.WMIClient import WMIClient 
  28  from Products.ZenWin.utils import addNTLMv2Option, setNTLMv2Auth 
  29  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
  30  from Products.ZenUtils.DaemonStats import DaemonStats 
  31  from Products.ZenUtils.Driver import drive, driveLater 
  32  from Products.ZenUtils.Utils import unused 
  33  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
  34   
  35  from PythonClient   import PythonClient 
  36  from SshClient      import SshClient 
  37  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
  38  from SnmpClient     import SnmpClient 
  39  from PortscanClient import PortscanClient 
  40                            
  41  from Products.DataCollector import Classifier 
  42   
  43  from twisted.internet import reactor 
  44  from twisted.internet.defer import succeed 
  45   
  46  import time 
  47  import types 
  48  import re 
  49  import DateTime 
  50   
  51  import os 
  52  import os.path 
  53  import sys 
  54  import traceback 
  55  from random import randint 
  56          
  57  defaultPortScanTimeout = 5 
  58  defaultParallel = 1 
  59  defaultProtocol = "ssh" 
  60  defaultPort = 22 
  61   
  62  # needed for Twisted's PB (Perspective Broker) to work 
  63  from Products.DataCollector import DeviceProxy 
  64  from Products.DataCollector import Plugins 
  65  unused(DeviceProxy, Plugins) 
  66   
67 -class ZenModeler(PBDaemon):
68 """ 69 Daemon class to attach to zenhub and pass along 70 device configuration information. 71 """ 72 73 name = 'zenmodeler' 74 initialServices = PBDaemon.initialServices + ['ModelerService'] 75 76 generateEvents = True 77 configCycleInterval = 360 78 79 classCollectorPlugins = () 80
81 - def __init__(self, single=False ):
82 """ 83 Initalizer 84 85 @param single: collect from a single device? 86 @type single: boolean 87 """ 88 PBDaemon.__init__(self) 89 # FIXME: cleanup --force option #2660 90 self.options.force = True 91 self.start = None 92 self.rrdStats = DaemonStats() 93 self.single = single 94 if self.options.device: 95 self.single = True 96 self.modelerCycleInterval = self.options.cycletime 97 self.collage = float( self.options.collage ) / 1440.0 98 self.pendingNewClients = False 99 self.clients = [] 100 self.finished = [] 101 self.devicegen = None 102 103 # Delay start for between 10 and 60 minutes when run as a daemon. 104 self.started = False 105 self.startDelay = 0 106 if self.options.daemon: 107 if self.options.now: 108 self.log.debug("Run as a daemon, starting immediately.") 109 else: 110 # self.startDelay = randint(10, 60) * 60 111 self.startDelay = randint(10, 60) * 1 112 self.log.info("Run as a daemon, waiting %s seconds to start." % 113 self.startDelay) 114 else: 115 self.log.debug("Run in foreground, starting immediately.")
116 117
118 - def reportError(self, error):
119 """ 120 Log errors that have occurred 121 122 @param error: error message 123 @type error: string 124 """ 125 self.log.error("Error occured: %s", error)
126 127
128 - def connected(self):
129 """ 130 Called after connected to the zenhub service 131 """ 132 d = self.configure() 133 d.addCallback(self.heartbeat) 134 d.addErrback(self.reportError)
135 136
137 - def configure(self):
138 """ 139 Get our configuration from zenhub 140 """ 141 # add in the code to fetch cycle time, etc. 142 def inner(driver): 143 """ 144 Generator function to gather our configuration 145 146 @param driver: driver object 147 @type driver: driver object 148 """ 149 self.log.debug('fetching monitor properties') 150 yield self.config().callRemote('propertyItems') 151 items = dict(driver.next()) 152 if self.options.cycletime == 0: 153 self.modelerCycleInterval = items.get('modelerCycleInterval', 154 self.modelerCycleInterval) 155 self.configCycleInterval = items.get('configCycleInterval', 156 self.configCycleInterval) 157 reactor.callLater(self.configCycleInterval * 60, self.configure) 158 159 self.log.debug("Getting threshold classes...") 160 yield self.config().callRemote('getThresholdClasses') 161 self.remote_updateThresholdClasses(driver.next()) 162 163 self.log.debug("Fetching default RRDCreateCommand...") 164 yield self.config().callRemote('getDefaultRRDCreateCommand') 165 createCommand = driver.next() 166 167 self.log.debug("Getting collector thresholds...") 168 yield self.config().callRemote('getCollectorThresholds') 169 self.rrdStats.config(self.options.monitor, 170 self.name, 171 driver.next(), 172 createCommand) 173 174 self.log.debug("Getting collector plugins for each DeviceClass") 175 yield self.config().callRemote('getClassCollectorPlugins') 176 self.classCollectorPlugins = driver.next()
177 178 return drive(inner)
179 180
181 - def config(self):
182 """ 183 Get the ModelerService 184 """ 185 return self.services.get('ModelerService', FakeRemote())
186 187
188 - def selectPlugins(self, device, transport):
189 """ 190 Build a list of active plugins for a device, based on: 191 192 * the --collect command-line option which is a regex 193 * the --ignore command-line option which is a regex 194 * transport which is a string describing the type of plugin 195 196 @param device: device to collect against 197 @type device: string 198 @param transport: python, ssh, snmp, telnet, cmd 199 @type transport: string 200 @return: results of the plugin 201 @type: string 202 @todo: determine if an event for the collector AND the device should be sent 203 """ 204 plugins = [] 205 valid_loaders = [] 206 for loader in device.plugins: 207 try: 208 plugin= loader.create() 209 self.log.debug( "Loaded plugin %s" % plugin.name() ) 210 plugins.append( plugin ) 211 valid_loaders.append( loader ) 212 213 except (SystemExit, KeyboardInterrupt), ex: 214 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 215 raise 216 217 except Plugins.PluginImportError, import_error: 218 import socket 219 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 220 collector_host= socket.gethostname() 221 # NB: an import errror affects all devices, 222 # so report the issue against the collector 223 # TOOD: determine if an event for the collector AND the device should be sent 224 evt= { "eventClass":"/Status/Update", "component":component, 225 "agent":collector_host, "device":collector_host, 226 "severity":Error } 227 228 info= "Problem loading plugin %s" % import_error.plugin 229 self.log.error( info ) 230 evt[ 'summary' ]= info 231 232 info= import_error.traceback 233 self.log.error( info ) 234 evt[ 'message' ]= info 235 236 info= ("Due to import errors, removing the %s plugin" 237 " from this collection cycle.") % import_error.plugin 238 self.log.error( info ) 239 evt[ 'message' ] += "%s\n" % info 240 self.sendEvent( evt ) 241 242 # Make sure that we don't generate messages for bad loaders again 243 # NB: doesn't update the device's zProperties 244 if len( device.plugins ) != len( valid_loaders ): 245 device.plugins= valid_loaders 246 247 # Create functions to search for what plugins we will and 248 # won't supply to the device 249 collectTest = lambda x: False 250 ignoreTest = lambda x: False 251 if self.options.collectPlugins: 252 collectTest = re.compile(self.options.collectPlugins).search 253 elif self.options.ignorePlugins: 254 ignoreTest = re.compile(self.options.ignorePlugins).search 255 256 result = [] 257 for plugin in plugins: 258 if plugin.transport != transport: 259 continue 260 name = plugin.name() 261 if ignoreTest(name): 262 self.log.debug("Ignoring %s on %s because of --ignore flag", 263 name, device.id) 264 elif collectTest(name): 265 self.log.debug("Using %s on %s because of --collect flag", 266 name, device.id) 267 result.append(plugin) 268 elif not self.options.collectPlugins: 269 self.log.debug("Using %s on %s", name, device.id) 270 result.append(plugin) 271 return result
272 273 274
275 - def collectDevice(self, device):
276 """ 277 Collect data from a single device. 278 279 @param device: device to collect against 280 @type device: string 281 """ 282 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 283 ip = device.manageIp 284 timeout = clientTimeout + time.time() 285 self.wmiCollect(device, ip, timeout) 286 self.pythonCollect(device, ip, timeout) 287 self.cmdCollect(device, ip, timeout) 288 self.snmpCollect(device, ip, timeout) 289 self.portscanCollect(device, ip, timeout)
290 291 292
293 - def wmiCollect(self, device, ip, timeout):
294 """ 295 Start the Windows Management Instrumentation (WMI) collector 296 297 @param device: device to collect against 298 @type device: string 299 @param ip: IP address of device to collect against 300 @type ip: string 301 @param timeout: timeout before failing the connection 302 @type timeout: integer 303 """ 304 if self.options.nowmi: 305 return 306 307 client = None 308 try: 309 plugins = self.selectPlugins(device, 'wmi') 310 if not plugins: 311 self.log.info("No WMI plugins found for %s" % device.id) 312 return 313 if self.checkCollection(device): 314 self.log.info('WMI collector method for device %s' % device.id) 315 self.log.info("plugins: %s", 316 ", ".join(map(lambda p: p.name(), plugins))) 317 client = WMIClient(device, self, plugins) 318 if not client or not plugins: 319 self.log.warn("WMI collector creation failed") 320 return 321 except (SystemExit, KeyboardInterrupt): 322 raise 323 except Exception: 324 self.log.exception("Error opening WMI collector") 325 self.addClient(client, timeout, 'WMI', device.id)
326 327 328
329 - def pythonCollect(self, device, ip, timeout):
330 """ 331 Start local Python collection client. 332 333 @param device: device to collect against 334 @type device: string 335 @param ip: IP address of device to collect against 336 @type ip: string 337 @param timeout: timeout before failing the connection 338 @type timeout: integer 339 """ 340 client = None 341 try: 342 plugins = self.selectPlugins(device, "python") 343 if not plugins: 344 self.log.info("No Python plugins found for %s" % device.id) 345 return 346 if self.checkCollection(device): 347 self.log.info('Python collection device %s' % device.id) 348 self.log.info("plugins: %s", 349 ", ".join(map(lambda p: p.name(), plugins))) 350 client = PythonClient(device, self, plugins) 351 if not client or not plugins: 352 self.log.warn("Python client creation failed") 353 return 354 except (SystemExit, KeyboardInterrupt): raise 355 except: 356 self.log.exception("Error opening pythonclient") 357 self.addClient(client, timeout, 'python', device.id)
358 359
360 - def cmdCollect(self, device, ip, timeout):
361 """ 362 Start shell command collection client. 363 364 @param device: device to collect against 365 @type device: string 366 @param ip: IP address of device to collect against 367 @type ip: string 368 @param timeout: timeout before failing the connection 369 @type timeout: integer 370 """ 371 client = None 372 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 373 374 hostname = device.id 375 try: 376 plugins = self.selectPlugins(device,"command") 377 if not plugins: 378 self.log.info("No command plugins found for %s" % hostname) 379 return 380 381 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 382 commandPort = getattr(device, 'zCommandPort', defaultPort) 383 384 if protocol == "ssh": 385 client = SshClient(hostname, ip, commandPort, 386 options=self.options, 387 plugins=plugins, device=device, 388 datacollector=self, isLoseConnection=True) 389 clientType = 'ssh' 390 self.log.info('Using SSH collection method for device %s' 391 % hostname) 392 393 elif protocol == 'telnet': 394 if commandPort == 22: commandPort = 23 #set default telnet 395 client = TelnetClient(hostname, ip, commandPort, 396 options=self.options, 397 plugins=plugins, device=device, 398 datacollector=self) 399 clientType = 'telnet' 400 self.log.info('Using telnet collection method for device %s' 401 % hostname) 402 403 else: 404 info = ("Unknown protocol %s for device %s -- " 405 "defaulting to %s collection method" % 406 (protocol, hostname, clientType )) 407 self.log.warn( info ) 408 import socket 409 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 410 collector_host= socket.gethostname() 411 evt= { "eventClass":"/Status/Update", "agent":collector_host, 412 "device":hostname, "severity":Error } 413 evt[ 'summary' ]= info 414 self.sendEvent( evt ) 415 return 416 417 if not client: 418 self.log.warn("Shell command collector creation failed") 419 else: 420 self.log.info("plugins: %s", 421 ", ".join(map(lambda p: p.name(), plugins))) 422 except (SystemExit, KeyboardInterrupt): raise 423 except: 424 self.log.exception("Error opening command collector") 425 self.addClient(client, timeout, clientType, device.id)
426 427 428
429 - def snmpCollect(self, device, ip, timeout):
430 """ 431 Start SNMP collection client. 432 433 @param device: device to collect against 434 @type device: string 435 @param ip: IP address of device to collect against 436 @type ip: string 437 @param timeout: timeout before failing the connection 438 @type timeout: integer 439 """ 440 client = None 441 try: 442 hostname = device.id 443 if getattr( device, "zSnmpMonitorIgnore", True ): 444 self.log.info("SNMP monitoring off for %s" % hostname) 445 return 446 447 if not ip: 448 self.log.info("No manage IP for %s" % hostname) 449 return 450 451 plugins = [] 452 plugins = self.selectPlugins(device,"snmp") 453 if not plugins: 454 self.log.info("No SNMP plugins found for %s" % hostname) 455 return 456 457 if self.checkCollection(device): 458 self.log.info('SNMP collection device %s' % hostname) 459 self.log.info("plugins: %s", 460 ", ".join(map(lambda p: p.name(), plugins))) 461 client = SnmpClient(device.id, ip, self.options, 462 device, self, plugins) 463 if not client or not plugins: 464 self.log.warn("SNMP collector creation failed") 465 return 466 except (SystemExit, KeyboardInterrupt): raise 467 except: 468 self.log.exception("Error opening the SNMP collector") 469 self.addClient(client, timeout, 'SNMP', device.id)
470 471 472 ######## need to make async test for snmp work at some point -EAD ######### 473 # def checkSnmpConnection(self, device): 474 # """ 475 # Check to see if our current community string is still valid 476 # 477 # @param device: the device against which we will check 478 # @type device: a Device instance 479 # @return: result is None or a tuple containing 480 # (community, port, version, snmp name) 481 # @rtype: deferred: Twisted deferred 482 # """ 483 # from pynetsnmp.twistedsnmp import AgentProxy 484 # 485 # def inner(driver): 486 # self.log.debug("Checking SNMP community %s on %s", 487 # device.zSnmpCommunity, device.id) 488 # 489 # oid = ".1.3.6.1.2.1.1.5.0" 490 # proxy = AgentProxy(device.id, 491 # device.zSnmpPort, 492 # timeout=device.zSnmpTimeout, 493 # community=device.zSnmpCommunity, 494 # snmpVersion=device.zSnmpVer, 495 # tries=2) 496 # proxy.open() 497 # yield proxy.get([oid]) 498 # devname = driver.next().values()[0] 499 # if devname: 500 # yield succeed(True) 501 # yield succeed(False) 502 # 503 # return drive(inner) 504 505
506 - def addClient(self, device, timeout, clientType, name):
507 """ 508 If device is not None, schedule the device to be collected. 509 Otherwise log an error. 510 511 @param device: device to collect against 512 @type device: string 513 @param timeout: timeout before failing the connection 514 @type timeout: integer 515 @param clientType: description of the plugin type 516 @type clientType: string 517 @param name: plugin name 518 @type name: string 519 """ 520 if device: 521 device.timeout = timeout 522 device.timedOut = False 523 self.clients.append(device) 524 device.run() 525 else: 526 self.log.warn('Unable to create a %s collector for %s', 527 clientType, name)
528 529 530 # XXX double-check this, once the implementation is in place
531 - def portscanCollect(self, device, ip, timeout):
532 """ 533 Start portscan collection client. 534 535 @param device: device to collect against 536 @type device: string 537 @param ip: IP address of device to collect against 538 @type ip: string 539 @param timeout: timeout before failing the connection 540 @type timeout: integer 541 """ 542 client = None 543 try: 544 hostname = device.id 545 plugins = self.selectPlugins(device, "portscan") 546 if not plugins: 547 self.log.info("No portscan plugins found for %s" % hostname) 548 return 549 if self.checkCollection(device): 550 self.log.info('Portscan collector method for device %s' 551 % hostname) 552 self.log.info("plugins: %s", 553 ", ".join(map(lambda p: p.name(), plugins))) 554 client = PortscanClient(device.id, ip, self.options, 555 device, self, plugins) 556 if not client or not plugins: 557 self.log.warn("Portscan collector creation failed") 558 return 559 except (SystemExit, KeyboardInterrupt): raise 560 except: 561 self.log.exception("Error opening portscan collector") 562 self.addClient(client, timeout, 'portscan', device.id)
563 564
565 - def checkCollection(self, device):
566 """ 567 See how old the data is that we've collected 568 569 @param device: device to collect against 570 @type device: string 571 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 572 @type: boolean 573 """ 574 age = device.getSnmpLastCollection() + self.collage 575 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 576 self.log.info("Skipped collection of %s" % device.id) 577 return False 578 return True
579 580
581 - def clientFinished(self, collectorClient):
582 """ 583 Callback that processes the return values from a device. 584 Python iterable. 585 586 @param collectorClient: collector instance 587 @type collectorClient: collector class 588 @return: Twisted deferred object 589 @type: Twisted deferred object 590 """ 591 device = collectorClient.device 592 self.log.debug("Client for %s finished collecting", device.id) 593 def processClient(driver): 594 try: 595 pluginStats = {} 596 self.log.debug("Processing data for device %s", device.id) 597 devchanged = False 598 maps = [] 599 for plugin, results in collectorClient.getResults(): 600 if plugin is None: continue 601 self.log.debug("Processing plugin %s on device %s ...", 602 plugin.name(), device.id) 603 if not results: 604 self.log.warn("The plugin %s returned no results.", 605 plugin.name()) 606 continue 607 608 self.log.debug("Plugin %s results = %s", plugin.name(), results) 609 datamaps = [] 610 try: 611 results = plugin.preprocess(results, self.log) 612 if results: 613 datamaps = plugin.process(device, results, self.log) 614 if datamaps: 615 pluginStats.setdefault(plugin.name(), plugin.weight) 616 617 except (SystemExit, KeyboardInterrupt), ex: 618 self.log.info( "Plugin %s terminated due to external" 619 " signal (%s)" % (plugin.name(), str(ex) ) 620 ) 621 continue 622 623 except Exception, ex: 624 # NB: don't discard the plugin, as it might be a 625 # temporary issue 626 # Also, report it against the device, rather than at 627 # a collector as it might be just for this device. 628 import socket 629 component= os.path.splitext( 630 os.path.basename( sys.argv[0] ) 631 )[0] 632 collector_host= socket.gethostname() 633 evt= { "eventClass":"/Status/Update", 634 "agent":collector_host, "device":device.id, 635 "severity":Error } 636 637 info= "Problem while executing plugin %s" %plugin.name() 638 self.log.error( info ) 639 evt[ 'summary' ]= info 640 641 info= traceback.format_exc() 642 self.log.error( info ) 643 evt[ 'message' ]= info 644 self.sendEvent( evt ) 645 continue 646 647 # allow multiple maps to be returned from one plugin 648 if type(datamaps) not in (types.ListType, types.TupleType): 649 datamaps = [datamaps,] 650 if datamaps: 651 maps += [m for m in datamaps if m] 652 if maps: 653 deviceClass = Classifier.classifyDevice(pluginStats, 654 self.classCollectorPlugins) 655 yield self.config().callRemote( 656 'applyDataMaps', device.id, 657 maps, deviceClass) 658 if driver.next(): 659 devchanged = True 660 if devchanged: 661 self.log.info("Changes in configuration applied") 662 else: 663 self.log.info("No change in configuration detected") 664 yield self.config().callRemote('setSnmpLastCollection', 665 device.id) 666 driver.next() 667 except Exception, ex: 668 self.log.exception(ex) 669 raise
670 671 def processClientFinished(result): 672 """ 673 Called after the client collection finishes 674 675 @param result: object (unused) 676 @type result: object 677 """ 678 if not result: 679 self.log.debug("Client %s finished" % device.id) 680 else: 681 self.log.error("Client %s finished with message: %s" % 682 (device.id, result)) 683 try: 684 self.clients.remove(collectorClient) 685 self.finished.append(collectorClient) 686 except ValueError: 687 self.log.debug("Client %s not found in in the list" 688 " of active clients", 689 device.id) 690 d = drive(self.fillCollectionSlots) 691 d.addErrback(self.fillError) 692 693 d = drive(processClient) 694 d.addBoth(processClientFinished) 695 696 697
698 - def fillError(self, reason):
699 """ 700 Twisted errback routine to log an error when 701 unable to collect some data 702 703 @param reason: error message 704 @type reason: string 705 """ 706 self.log.error("Unable to fill collection slots: %s" % reason)
707 708
709 - def cycleTime(self):
710 """ 711 Return our cycle time (in minutes) 712 713 @return: cycle time 714 @rtype: integer 715 """ 716 return self.modelerCycleInterval * 60
717 718
719 - def heartbeat(self, ignored=None):
720 """ 721 Twisted keep-alive mechanism to ensure that 722 we're still connected to zenhub 723 724 @param ignored: object (unused) 725 @type ignored: object 726 """ 727 ARBITRARY_BEAT = 30 728 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 729 if self.options.cycle: 730 evt = dict(eventClass=Heartbeat, 731 component='zenmodeler', 732 device=self.options.monitor, 733 timeout=3*ARBITRARY_BEAT) 734 self.sendEvent(evt) 735 self.niceDoggie(self.cycleTime()) 736 737 # We start modeling from here to accomodate the startup delay. 738 if not self.started: 739 self.started = True 740 reactor.callLater(self.startDelay, self.main)
741 742
743 - def checkStop(self, unused = None):
744 """ 745 Check to see if there's anything to do. 746 If there isn't, report our statistics and exit. 747 748 @param unused: unused (unused) 749 @type unused: string 750 """ 751 if self.clients: return 752 if self.devicegen: return 753 754 if self.start: 755 runTime = time.time() - self.start 756 self.start = None 757 self.log.info("Scan time: %0.2f seconds", runTime) 758 devices = len(self.finished) 759 timedOut = len([c for c in self.finished if c.timedOut]) 760 self.sendEvents( 761 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 762 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 763 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 764 ) 765 if not self.options.cycle: 766 self.stop() 767 self.finished = []
768 769
770 - def fillCollectionSlots(self, driver):
771 """ 772 An iterator which either returns a device to collect or 773 calls checkStop() 774 775 @param driver: driver object 776 @type driver: driver object 777 """ 778 count = len(self.clients) 779 while count < self.options.parallel and self.devicegen \ 780 and not self.pendingNewClients: 781 782 self.pendingNewClients = True 783 try: 784 device = self.devicegen.next() 785 yield self.config().callRemote('getDeviceConfig', [device]) 786 # just collect one device, and let the timer add more 787 devices = driver.next() 788 if devices: 789 self.collectDevice(devices[0]) 790 except StopIteration: 791 self.devicegen = None 792 793 self.pendingNewClients = False 794 break 795 796 update = len(self.clients) 797 if update != count and update != 1: 798 self.log.info('Running %d clients', update) 799 else: 800 self.log.debug('Running %d clients', update) 801 self.checkStop()
802 803
804 - def buildOptions(self):
805 """ 806 Build our list of command-line options 807 """ 808 PBDaemon.buildOptions(self) 809 self.parser.add_option('--debug', 810 dest='debug', action="store_true", default=False, 811 help="Don't fork threads for processing") 812 self.parser.add_option('--nowmi', 813 dest='nowmi', action="store_true", default=False, 814 help="Do not execute WMI plugins") 815 self.parser.add_option('--parallel', dest='parallel', 816 type='int', default=defaultParallel, 817 help="Number of devices to collect from in parallel") 818 self.parser.add_option('--cycletime', 819 dest='cycletime',default=720,type='int', 820 help="Run collection every x minutes") 821 self.parser.add_option('--ignore', 822 dest='ignorePlugins',default="", 823 help="Modeler plugins to ignore. Takes a regular expression") 824 self.parser.add_option('--collect', 825 dest='collectPlugins',default="", 826 help="Modeler plugins to use. Takes a regular expression") 827 self.parser.add_option('-p', '--path', dest='path', 828 help="Start class path for collection ie /NetworkDevices") 829 self.parser.add_option('-d', '--device', dest='device', 830 help="Fully qualified device name ie www.confmon.com") 831 self.parser.add_option('-a', '--collage', 832 dest='collage', default=0, type='float', 833 help="Do not collect from devices whose collect date " + 834 "is within this many minutes") 835 self.parser.add_option('--writetries', 836 dest='writetries',default=2,type='int', 837 help="Number of times to try to write if a " 838 "read conflict is found") 839 # FIXME: cleanup --force option #2660 840 self.parser.add_option("-F", "--force", 841 dest="force", action='store_true', default=True, 842 help="Force collection of config data (deprecated)") 843 self.parser.add_option('--portscantimeout', dest='portscantimeout', 844 type='int', default=defaultPortScanTimeout, 845 help="Time to wait for connection failures when port scanning") 846 self.parser.add_option('--now', 847 dest='now', action="store_true", default=False, 848 help="Start daemon now, do not sleep before starting") 849 TCbuildOptions(self.parser, self.usage) 850 addNTLMv2Option(self.parser)
851 852 853
854 - def processOptions(self):
855 """ 856 Check what the user gave us vs what we'll accept 857 for command-line options 858 """ 859 if not self.options.path and not self.options.device: 860 self.options.path = "/Devices" 861 if self.options.ignorePlugins and self.options.collectPlugins: 862 raise SystemExit( "Only one of --ignore or --collect" 863 " can be used at a time") 864 setNTLMv2Auth(self.options)
865
866 - def _timeoutClients(self):
867 """ 868 The guts of the timeoutClients method (minus the twisted reactor 869 stuff). Breaking this part out as a separate method facilitates unit 870 testing. 871 """ 872 active = [] 873 for client in self.clients: 874 if client.timeout < time.time(): 875 self.log.warn("Client %s timeout", client.hostname) 876 self.finished.append(client) 877 client.timedOut = True 878 client.stop() 879 else: 880 active.append(client) 881 self.clients = active
882 883 884
885 - def timeoutClients(self, unused=None):
886 """ 887 Check to see which clients have timed out and which ones haven't. 888 Stop processing anything that's timed out. 889 890 @param unused: unused (unused) 891 @type unused: string 892 """ 893 reactor.callLater(1, self.timeoutClients) 894 self._timeoutClients() 895 d = drive(self.fillCollectionSlots) 896 d.addCallback(self.checkStop) 897 d.addErrback(self.fillError)
898 899 900
901 - def reactorLoop(self):
902 """ 903 Twisted main loop 904 """ 905 reactor.startRunning() 906 while reactor.running: 907 try: 908 while reactor.running: 909 reactor.runUntilCurrent() 910 timeout = reactor.timeout() 911 reactor.doIteration(timeout) 912 except: 913 if reactor.running: 914 self.log.exception("Unexpected error in main loop.")
915 916 917
918 - def getDeviceList(self):
919 """ 920 Get the list of devices for which we are collecting: 921 * if -d devicename was used, use the devicename 922 * if a class path flag was supplied, gather the devices 923 along that organizer 924 * otherwise get all of the devices associated with our collector 925 926 @return: list of devices 927 @rtype: list 928 """ 929 if self.options.device: 930 self.log.info("Collecting for device %s", self.options.device) 931 return succeed([self.options.device]) 932 933 self.log.info("Collecting for path %s", self.options.path) 934 return self.config().callRemote('getDeviceListByOrganizer', 935 self.options.path, 936 self.options.monitor)
937 938
939 - def mainLoop(self, driver):
940 """ 941 Main collection loop, a Python iterable 942 943 @param driver: driver object 944 @type driver: driver object 945 @return: Twisted deferred object 946 @rtype: Twisted deferred object 947 """ 948 if self.options.cycle: 949 driveLater(self.cycleTime(), self.mainLoop) 950 951 if self.clients: 952 self.log.error("Modeling cycle taking too long") 953 return 954 955 self.start = time.time() 956 957 self.log.debug("Starting collector loop...") 958 yield self.getDeviceList() 959 self.devicegen = iter(driver.next()) 960 d = drive(self.fillCollectionSlots) 961 d.addErrback(self.fillError) 962 yield d 963 driver.next() 964 self.log.debug("Collection slots filled")
965 966 967
968 - def main(self, unused=None):
969 """ 970 Wrapper around the mainLoop 971 972 @param unused: unused (unused) 973 @type unused: string 974 @return: Twisted deferred object 975 @rtype: Twisted deferred object 976 """ 977 self.finished = [] 978 d = drive(self.mainLoop) 979 d.addCallback(self.timeoutClients) 980 return d
981 982 983
984 - def remote_deleteDevice(self, device):
985 """ 986 Stub function 987 988 @param device: device name (unused) 989 @type device: string 990 @todo: implement 991 """ 992 # we fetch the device list before every scan 993 self.log.debug("Asynch deleteDevice %s" % device)
994 995 996 if __name__ == '__main__': 997 dc = ZenModeler() 998 dc.processOptions() 999 reactor.run = dc.reactorLoop 1000 dc.run() 1001