Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.zenmodeler

  1  ########################################################################## 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__= """Discover (aka model) a device and its components. 
 15  For instance, find out what Ethernet interfaces and hard disks a server 
 16  has available. 
 17  This information should change much less frequently than performance metrics. 
 18  """ 
 19   
 20  import Globals 
 21  from Products.ZenWin.WMIClient import WMIClient 
 22  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
 23  from Products.ZenUtils.DaemonStats import DaemonStats 
 24  from Products.ZenUtils.Driver import drive, driveLater 
 25  from Products.ZenUtils.Utils import unused 
 26  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
 27   
 28  from PythonClient   import PythonClient 
 29  from SshClient      import SshClient 
 30  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
 31  from SnmpClient     import SnmpClient 
 32  from PortscanClient import PortscanClient 
 33                            
 34  from Products.DataCollector import Classifier 
 35   
 36  from twisted.internet import reactor 
 37  from twisted.internet.defer import succeed 
 38   
 39  import time 
 40  import types 
 41  import re 
 42  import DateTime 
 43   
 44  import os 
 45  import os.path 
 46  import sys 
 47  import traceback   
 48          
 49  defaultPortScanTimeout = 5 
 50  defaultParallel = 1 
 51  defaultProtocol = "ssh" 
 52  defaultPort = 22 
 53  defaultStartSleep = 10 * 60 
 54   
 55  # needed for Twisted's PB (Perspective Broker) to work 
 56  from Products.DataCollector import DeviceProxy 
 57  from Products.DataCollector import Plugins 
 58  unused(DeviceProxy, Plugins) 
 59   
60 -class ZenModeler(PBDaemon):
61 """ 62 Daemon class to attach to zenhub and pass along 63 device configuration information. 64 """ 65 66 name = 'zenmodeler' 67 initialServices = PBDaemon.initialServices + ['ModelerService'] 68 69 generateEvents = True 70 configCycleInterval = 360 71 72 classCollectorPlugins = () 73
74 - def __init__(self, single=False ):
75 """ 76 Initalizer 77 78 @param single: collect from a single device? 79 @type single: boolean 80 """ 81 PBDaemon.__init__(self) 82 # FIXME: cleanup --force option #2660 83 self.options.force = True 84 if self.options.daemon: 85 if self.options.now: 86 self.log.debug("Run as a daemon, starting immediately.") 87 else: 88 self.log.info("Run as a daemon, waiting %s sec to start." % 89 defaultStartSleep) 90 time.sleep(defaultStartSleep) 91 self.log.debug("Run as a daemon, slept %s sec, starting now." % 92 defaultStartSleep) 93 else: 94 self.log.debug("Run in foreground, starting immediately.") 95 96 self.start = None 97 self.rrdStats = DaemonStats() 98 self.single = single 99 if self.options.device: 100 self.single = True 101 self.modelerCycleInterval = self.options.cycletime 102 self.collage = float( self.options.collage ) / 1440.0 103 self.pendingNewClients = False 104 self.clients = [] 105 self.finished = [] 106 self.devicegen = None
107 108 109
110 - def reportError(self, error):
111 """ 112 Log errors that have occurred 113 114 @param error: error message 115 @type error: string 116 """ 117 self.log.error("Error occured: %s", error)
118 119
120 - def connected(self):
121 """ 122 Called after connected to the zenhub service 123 """ 124 d = self.configure() 125 d.addCallback(self.heartbeat) 126 d.addErrback(self.reportError) 127 d.addCallback(self.main)
128 129
130 - def configure(self):
131 """ 132 Get our configuration from zenhub 133 """ 134 # add in the code to fetch cycle time, etc. 135 def inner(driver): 136 """ 137 Generator function to gather our configuration 138 139 @param driver: driver object 140 @type driver: driver object 141 """ 142 self.log.debug('fetching monitor properties') 143 yield self.config().callRemote('propertyItems') 144 items = dict(driver.next()) 145 if self.options.cycletime == 0: 146 self.modelerCycleInterval = items.get('modelerCycleInterval', 147 self.modelerCycleInterval) 148 self.configCycleInterval = items.get('configCycleInterval', 149 self.configCycleInterval) 150 reactor.callLater(self.configCycleInterval * 60, self.configure) 151 152 self.log.debug("Getting threshold classes...") 153 yield self.config().callRemote('getThresholdClasses') 154 self.remote_updateThresholdClasses(driver.next()) 155 156 self.log.debug("Fetching default RRDCreateCommand...") 157 yield self.config().callRemote('getDefaultRRDCreateCommand') 158 createCommand = driver.next() 159 160 self.log.debug("Getting collector thresholds...") 161 yield self.config().callRemote('getCollectorThresholds') 162 self.rrdStats.config(self.options.monitor, 163 self.name, 164 driver.next(), 165 createCommand) 166 167 self.log.debug("Getting collector plugins for each DeviceClass") 168 yield self.config().callRemote('getClassCollectorPlugins') 169 self.classCollectorPlugins = driver.next()
170 171 return drive(inner)
172 173
174 - def config(self):
175 """ 176 Get the ModelerService 177 """ 178 return self.services.get('ModelerService', FakeRemote())
179 180
181 - def selectPlugins(self, device, transport):
182 """ 183 Build a list of active plugins for a device, based on: 184 185 * the --collect command-line option which is a regex 186 * the --ignore command-line option which is a regex 187 * transport which is a string describing the type of plugin 188 189 @param device: device to collect against 190 @type device: string 191 @param transport: python, ssh, snmp, telnet, cmd 192 @type transport: string 193 @return: results of the plugin 194 @type: string 195 @todo: determine if an event for the collector AND the device should be sent 196 """ 197 plugins = [] 198 valid_loaders = [] 199 for loader in device.plugins: 200 try: 201 plugin= loader.create() 202 self.log.debug( "Loaded plugin %s" % plugin.name() ) 203 plugins.append( plugin ) 204 valid_loaders.append( loader ) 205 206 except (SystemExit, KeyboardInterrupt), ex: 207 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 208 raise 209 210 except Plugins.PluginImportError, import_error: 211 import socket 212 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 213 collector_host= socket.gethostname() 214 # NB: an import errror affects all devices, 215 # so report the issue against the collector 216 # TOOD: determine if an event for the collector AND the device should be sent 217 evt= { "eventClass":"/Status/Update", "component":component, 218 "agent":collector_host, "device":collector_host, 219 "severity":Error } 220 221 info= "Problem loading plugin %s" % import_error.plugin 222 self.log.error( info ) 223 evt[ 'summary' ]= info 224 225 info= import_error.traceback 226 self.log.error( info ) 227 evt[ 'message' ]= info 228 229 info= ("Due to import errors, removing the %s plugin" 230 " from this collection cycle.") % import_error.plugin 231 self.log.error( info ) 232 evt[ 'message' ] += "%s\n" % info 233 self.sendEvent( evt ) 234 235 # Make sure that we don't generate messages for bad loaders again 236 # NB: doesn't update the device's zProperties 237 if len( device.plugins ) != len( valid_loaders ): 238 device.plugins= valid_loaders 239 240 # Create functions to search for what plugins we will and 241 # won't supply to the device 242 collectTest = lambda x: False 243 ignoreTest = lambda x: False 244 if self.options.collectPlugins: 245 collectTest = re.compile(self.options.collectPlugins).search 246 elif self.options.ignorePlugins: 247 ignoreTest = re.compile(self.options.ignorePlugins).search 248 249 result = [] 250 for plugin in plugins: 251 if plugin.transport != transport: 252 continue 253 name = plugin.name() 254 if ignoreTest(name): 255 self.log.debug("Ignoring %s on %s because of --ignore flag", 256 name, device.id) 257 elif collectTest(name): 258 self.log.debug("Using %s on %s because of --collect flag", 259 name, device.id) 260 result.append(plugin) 261 elif not self.options.collectPlugins: 262 self.log.debug("Using %s on %s", name, device.id) 263 result.append(plugin) 264 return result
265 266 267
268 - def collectDevice(self, device):
269 """ 270 Collect data from a single device. 271 272 @param device: device to collect against 273 @type device: string 274 """ 275 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 276 ip = device.manageIp 277 timeout = clientTimeout + time.time() 278 self.wmiCollect(device, ip, timeout) 279 self.pythonCollect(device, ip, timeout) 280 self.cmdCollect(device, ip, timeout) 281 self.snmpCollect(device, ip, timeout) 282 self.portscanCollect(device, ip, timeout)
283 284 285
286 - def wmiCollect(self, device, ip, timeout):
287 """ 288 Start the Windows Management Instrumentation (WMI) collector 289 290 @param device: device to collect against 291 @type device: string 292 @param ip: IP address of device to collect against 293 @type ip: string 294 @param timeout: timeout before failing the connection 295 @type timeout: integer 296 """ 297 if self.options.nowmi: 298 return 299 300 client = None 301 try: 302 plugins = self.selectPlugins(device, 'wmi') 303 if not plugins: 304 self.log.info("No WMI plugins found for %s" % device.id) 305 return 306 if self.checkCollection(device): 307 self.log.info('WMI collector method for device %s' % device.id) 308 self.log.info("plugins: %s", 309 ", ".join(map(lambda p: p.name(), plugins))) 310 client = WMIClient(device, self, plugins) 311 if not client or not plugins: 312 self.log.warn("WMI collector creation failed") 313 return 314 except (SystemExit, KeyboardInterrupt): 315 raise 316 except Exception: 317 self.log.exception("Error opening WMI collector") 318 self.addClient(client, timeout, 'WMI', device.id)
319 320 321
322 - def pythonCollect(self, device, ip, timeout):
323 """ 324 Start local Python collection client. 325 326 @param device: device to collect against 327 @type device: string 328 @param ip: IP address of device to collect against 329 @type ip: string 330 @param timeout: timeout before failing the connection 331 @type timeout: integer 332 """ 333 client = None 334 try: 335 plugins = self.selectPlugins(device, "python") 336 if not plugins: 337 self.log.info("No Python plugins found for %s" % device.id) 338 return 339 if self.checkCollection(device): 340 self.log.info('Python collection device %s' % device.id) 341 self.log.info("plugins: %s", 342 ", ".join(map(lambda p: p.name(), plugins))) 343 client = PythonClient(device, self, plugins) 344 if not client or not plugins: 345 self.log.warn("Python client creation failed") 346 return 347 except (SystemExit, KeyboardInterrupt): raise 348 except: 349 self.log.exception("Error opening pythonclient") 350 self.addClient(client, timeout, 'python', device.id)
351 352
353 - def cmdCollect(self, device, ip, timeout):
354 """ 355 Start shell command collection client. 356 357 @param device: device to collect against 358 @type device: string 359 @param ip: IP address of device to collect against 360 @type ip: string 361 @param timeout: timeout before failing the connection 362 @type timeout: integer 363 """ 364 client = None 365 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 366 367 hostname = device.id 368 try: 369 plugins = self.selectPlugins(device,"command") 370 if not plugins: 371 self.log.info("No command plugins found for %s" % hostname) 372 return 373 374 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 375 commandPort = getattr(device, 'zCommandPort', defaultPort) 376 377 if protocol == "ssh": 378 client = SshClient(hostname, ip, commandPort, 379 options=self.options, 380 plugins=plugins, device=device, 381 datacollector=self, isLoseConnection=True) 382 clientType = 'ssh' 383 self.log.info('Using SSH collection method for device %s' 384 % hostname) 385 386 elif protocol == 'telnet': 387 if commandPort == 22: commandPort = 23 #set default telnet 388 client = TelnetClient(hostname, ip, commandPort, 389 options=self.options, 390 plugins=plugins, device=device, 391 datacollector=self) 392 clientType = 'telnet' 393 self.log.info('Using telnet collection method for device %s' 394 % hostname) 395 396 else: 397 info = ("Unknown protocol %s for device %s -- " 398 "defaulting to %s collection method" % 399 (protocol, hostname, clientType )) 400 self.log.warn( info ) 401 import socket 402 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 403 collector_host= socket.gethostname() 404 evt= { "eventClass":"/Status/Update", "agent":collector_host, 405 "device":hostname, "severity":Error } 406 evt[ 'summary' ]= info 407 self.sendEvent( evt ) 408 return 409 410 if not client: 411 self.log.warn("Shell command collector creation failed") 412 else: 413 self.log.info("plugins: %s", 414 ", ".join(map(lambda p: p.name(), plugins))) 415 except (SystemExit, KeyboardInterrupt): raise 416 except: 417 self.log.exception("Error opening command collector") 418 self.addClient(client, timeout, clientType, device.id)
419 420 421
422 - def snmpCollect(self, device, ip, timeout):
423 """ 424 Start SNMP collection client. 425 426 @param device: device to collect against 427 @type device: string 428 @param ip: IP address of device to collect against 429 @type ip: string 430 @param timeout: timeout before failing the connection 431 @type timeout: integer 432 """ 433 client = None 434 try: 435 hostname = device.id 436 if getattr( device, "zSnmpMonitorIgnore", True ): 437 self.log.info("SNMP monitoring off for %s" % hostname) 438 return 439 440 if not ip: 441 self.log.info("No manage IP for %s" % hostname) 442 return 443 444 plugins = [] 445 plugins = self.selectPlugins(device,"snmp") 446 if not plugins: 447 self.log.info("No SNMP plugins found for %s" % hostname) 448 return 449 450 if self.checkCollection(device): 451 self.log.info('SNMP collection device %s' % hostname) 452 self.log.info("plugins: %s", 453 ", ".join(map(lambda p: p.name(), plugins))) 454 client = SnmpClient(device.id, ip, self.options, 455 device, self, plugins) 456 if not client or not plugins: 457 self.log.warn("SNMP collector creation failed") 458 return 459 except (SystemExit, KeyboardInterrupt): raise 460 except: 461 self.log.exception("Error opening the SNMP collector") 462 self.addClient(client, timeout, 'SNMP', device.id)
463 464 465 ######## need to make async test for snmp work at some point -EAD ######### 466 # def checkSnmpConnection(self, device): 467 # """ 468 # Check to see if our current community string is still valid 469 # 470 # @param device: the device against which we will check 471 # @type device: a Device instance 472 # @return: result is None or a tuple containing 473 # (community, port, version, snmp name) 474 # @rtype: deferred: Twisted deferred 475 # """ 476 # from pynetsnmp.twistedsnmp import AgentProxy 477 # 478 # def inner(driver): 479 # import pdb; pdb.set_trace() 480 # self.log.debug("Checking SNMP community %s on %s", 481 # device.zSnmpCommunity, device.id) 482 # 483 # oid = ".1.3.6.1.2.1.1.5.0" 484 # proxy = AgentProxy(device.id, 485 # device.zSnmpPort, 486 # timeout=device.zSnmpTimeout, 487 # community=device.zSnmpCommunity, 488 # snmpVersion=device.zSnmpVer, 489 # tries=2) 490 # proxy.open() 491 # yield proxy.get([oid]) 492 # devname = driver.next().values()[0] 493 # if devname: 494 # yield succeed(True) 495 # yield succeed(False) 496 # 497 # return drive(inner) 498 499
500 - def addClient(self, device, timeout, clientType, name):
501 """ 502 If device is not None, schedule the device to be collected. 503 Otherwise log an error. 504 505 @param device: device to collect against 506 @type device: string 507 @param timeout: timeout before failing the connection 508 @type timeout: integer 509 @param clientType: description of the plugin type 510 @type clientType: string 511 @param name: plugin name 512 @type name: string 513 """ 514 if device: 515 device.timeout = timeout 516 device.timedOut = False 517 self.clients.append(device) 518 device.run() 519 else: 520 self.log.warn('Unable to create a %s collector for %s', 521 clientType, name)
522 523 524 # XXX double-check this, once the implementation is in place
525 - def portscanCollect(self, device, ip, timeout):
526 """ 527 Start portscan collection client. 528 529 @param device: device to collect against 530 @type device: string 531 @param ip: IP address of device to collect against 532 @type ip: string 533 @param timeout: timeout before failing the connection 534 @type timeout: integer 535 """ 536 client = None 537 try: 538 hostname = device.id 539 plugins = self.selectPlugins(device, "portscan") 540 if not plugins: 541 self.log.info("No portscan plugins found for %s" % hostname) 542 return 543 if self.checkCollection(device): 544 self.log.info('Portscan collector method for device %s' 545 % hostname) 546 self.log.info("plugins: %s", 547 ", ".join(map(lambda p: p.name(), plugins))) 548 client = PortscanClient(device.id, ip, self.options, 549 device, self, plugins) 550 if not client or not plugins: 551 self.log.warn("Portscan collector creation failed") 552 return 553 except (SystemExit, KeyboardInterrupt): raise 554 except: 555 self.log.exception("Error opening portscan collector") 556 self.addClient(client, timeout, 'portscan', device.id)
557 558
559 - def checkCollection(self, device):
560 """ 561 See how old the data is that we've collected 562 563 @param device: device to collect against 564 @type device: string 565 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 566 @type: boolean 567 """ 568 age = device.getSnmpLastCollection() + self.collage 569 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 570 self.log.info("Skipped collection of %s" % device.id) 571 return False 572 return True
573 574
575 - def clientFinished(self, collectorClient):
576 """ 577 Callback that processes the return values from a device. 578 Python iterable. 579 580 @param collectorClient: collector instance 581 @type collectorClient: collector class 582 @return: Twisted deferred object 583 @type: Twisted deferred object 584 """ 585 device = collectorClient.device 586 self.log.debug("Client for %s finished collecting", device.id) 587 def processClient(driver): 588 try: 589 pluginStats = {} 590 self.log.debug("Processing data for device %s", device.id) 591 devchanged = False 592 maps = [] 593 for plugin, results in collectorClient.getResults(): 594 if plugin is None: continue 595 self.log.debug("Processing plugin %s on device %s ...", 596 plugin.name(), device.id) 597 if not results: 598 self.log.warn("The plugin %s returned no results.", 599 plugin.name()) 600 continue 601 602 datamaps = [] 603 try: 604 results = plugin.preprocess(results, self.log) 605 if results: 606 datamaps = plugin.process(device, results, self.log) 607 if datamaps: 608 pluginStats.setdefault(plugin.name(), plugin.weight) 609 610 except (SystemExit, KeyboardInterrupt), ex: 611 self.log.info( "Plugin %s terminated due to external" 612 " signal (%s)" % (plugin.name(), str(ex) ) 613 ) 614 continue 615 616 except Exception, ex: 617 # NB: don't discard the plugin, as it might be a 618 # temporary issue 619 # Also, report it against the device, rather than at 620 # a collector as it might be just for this device. 621 import socket 622 component= os.path.splitext( 623 os.path.basename( sys.argv[0] ) 624 )[0] 625 collector_host= socket.gethostname() 626 evt= { "eventClass":"/Status/Update", 627 "agent":collector_host, "device":device.id, 628 "severity":Error } 629 630 info= "Problem while executing plugin %s" %plugin.name() 631 self.log.error( info ) 632 evt[ 'summary' ]= info 633 634 info= traceback.format_exc() 635 self.log.error( info ) 636 evt[ 'message' ]= info 637 self.sendEvent( evt ) 638 continue 639 640 # allow multiple maps to be returned from one plugin 641 if type(datamaps) not in (types.ListType, types.TupleType): 642 datamaps = [datamaps,] 643 if datamaps: 644 maps += [m for m in datamaps if m] 645 if maps: 646 deviceClass = Classifier.classifyDevice(pluginStats, 647 self.classCollectorPlugins) 648 yield self.config().callRemote( 649 'applyDataMaps', device.id, 650 maps, deviceClass) 651 if driver.next(): 652 devchanged = True 653 if devchanged: 654 self.log.info("Changes in configuration applied") 655 else: 656 self.log.info("No change in configuration detected") 657 yield self.config().callRemote('setSnmpLastCollection', 658 device.id) 659 driver.next() 660 except Exception, ex: 661 self.log.exception(ex) 662 raise
663 664 def processClientFinished(result): 665 """ 666 Called after the client collection finishes 667 668 @param result: object (unused) 669 @type result: object 670 """ 671 if not result: 672 self.log.debug("Client %s finished" % device.id) 673 else: 674 self.log.error("Client %s finished with message: %s" % 675 (device.id, result)) 676 try: 677 self.clients.remove(collectorClient) 678 self.finished.append(collectorClient) 679 except ValueError: 680 self.log.debug("Client %s not found in in the list" 681 " of active clients", 682 device.id) 683 d = drive(self.fillCollectionSlots) 684 d.addErrback(self.fillError) 685 686 d = drive(processClient) 687 d.addBoth(processClientFinished) 688 689 690
691 - def fillError(self, reason):
692 """ 693 Twisted errback routine to log an error when 694 unable to collect some data 695 696 @param reason: error message 697 @type reason: string 698 """ 699 self.log.error("Unable to fill collection slots: %s" % reason)
700 701
702 - def cycleTime(self):
703 """ 704 Return our cycle time (in minutes) 705 706 @return: cycle time 707 @rtype: integer 708 """ 709 return self.modelerCycleInterval * 60
710 711
712 - def heartbeat(self, ignored=None):
713 """ 714 Twisted keep-alive mechanism to ensure that 715 we're still connected to zenhub 716 717 @param ignored: object (unused) 718 @type ignored: object 719 """ 720 ARBITRARY_BEAT = 30 721 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 722 if self.options.cycle: 723 evt = dict(eventClass=Heartbeat, 724 component='zenmodeler', 725 device=self.options.monitor, 726 timeout=3*ARBITRARY_BEAT) 727 self.sendEvent(evt) 728 self.niceDoggie(self.cycleTime())
729 730
731 - def checkStop(self, unused = None):
732 """ 733 Check to see if there's anything to do. 734 If there isn't, report our statistics and exit. 735 736 @param unused: unused (unused) 737 @type unused: string 738 """ 739 if self.clients: return 740 if self.devicegen: return 741 742 if self.start: 743 runTime = time.time() - self.start 744 self.start = None 745 self.log.info("Scan time: %0.2f seconds", runTime) 746 devices = len(self.finished) 747 timedOut = len([c for c in self.finished if c.timedOut]) 748 self.sendEvents( 749 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 750 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 751 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 752 ) 753 if not self.options.cycle: 754 self.stop() 755 self.finished = []
756 757
758 - def fillCollectionSlots(self, driver):
759 """ 760 An iterator which either returns a device to collect or 761 calls checkStop() 762 763 @param driver: driver object 764 @type driver: driver object 765 """ 766 count = len(self.clients) 767 while count < self.options.parallel and self.devicegen \ 768 and not self.pendingNewClients: 769 770 self.pendingNewClients = True 771 try: 772 device = self.devicegen.next() 773 yield self.config().callRemote('getDeviceConfig', [device]) 774 # just collect one device, and let the timer add more 775 devices = driver.next() 776 if devices: 777 self.collectDevice(devices[0]) 778 except StopIteration: 779 self.devicegen = None 780 781 self.pendingNewClients = False 782 break 783 784 update = len(self.clients) 785 if update != count and update != 1: 786 self.log.info('Running %d clients', update) 787 else: 788 self.log.debug('Running %d clients', update) 789 self.checkStop()
790 791
792 - def buildOptions(self):
793 """ 794 Build our list of command-line options 795 """ 796 PBDaemon.buildOptions(self) 797 self.parser.add_option('--debug', 798 dest='debug', action="store_true", default=False, 799 help="Don't fork threads for processing") 800 self.parser.add_option('--nowmi', 801 dest='nowmi', action="store_true", default=False, 802 help="Do not execute WMI plugins") 803 self.parser.add_option('--parallel', dest='parallel', 804 type='int', default=defaultParallel, 805 help="Number of devices to collect from in parallel") 806 self.parser.add_option('--cycletime', 807 dest='cycletime',default=720,type='int', 808 help="Run collection every x minutes") 809 self.parser.add_option('--ignore', 810 dest='ignorePlugins',default="", 811 help="Modeler plugins to ignore. Takes a regular expression") 812 self.parser.add_option('--collect', 813 dest='collectPlugins',default="", 814 help="Modeler plugins to use. Takes a regular expression") 815 self.parser.add_option('-p', '--path', dest='path', 816 help="Start class path for collection ie /NetworkDevices") 817 self.parser.add_option('-d', '--device', dest='device', 818 help="Fully qualified device name ie www.confmon.com") 819 self.parser.add_option('-a', '--collage', 820 dest='collage', default=0, type='float', 821 help="Do not collect from devices whose collect date " + 822 "is within this many minutes") 823 self.parser.add_option('--writetries', 824 dest='writetries',default=2,type='int', 825 help="Number of times to try to write if a " 826 "read conflict is found") 827 # FIXME: cleanup --force option #2660 828 self.parser.add_option("-F", "--force", 829 dest="force", action='store_true', default=True, 830 help="Force collection of config data (deprecated)") 831 self.parser.add_option('--portscantimeout', dest='portscantimeout', 832 type='int', default=defaultPortScanTimeout, 833 help="Time to wait for connection failures when port scanning") 834 self.parser.add_option('--now', 835 dest='now', action="store_true", default=False, 836 help="Start daemon now, do not sleep before starting") 837 TCbuildOptions(self.parser, self.usage)
838 839 840
841 - def processOptions(self):
842 """ 843 Check what the user gave us vs what we'll accept 844 for command-line options 845 """ 846 if not self.options.path and not self.options.device: 847 self.options.path = "/Devices" 848 if self.options.ignorePlugins and self.options.collectPlugins: 849 raise SystemExit( "Only one of --ignore or --collect" 850 " can be used at a time")
851 852
853 - def _timeoutClients(self):
854 """ 855 The guts of the timeoutClients method (minus the twisted reactor 856 stuff). Breaking this part out as a separate method facilitates unit 857 testing. 858 """ 859 active = [] 860 for client in self.clients: 861 if client.timeout < time.time(): 862 self.log.warn("Client %s timeout", client.hostname) 863 self.finished.append(client) 864 client.timedOut = True 865 client.stop() 866 else: 867 active.append(client) 868 self.clients = active
869 870 871
872 - def timeoutClients(self, unused=None):
873 """ 874 Check to see which clients have timed out and which ones haven't. 875 Stop processing anything that's timed out. 876 877 @param unused: unused (unused) 878 @type unused: string 879 """ 880 reactor.callLater(1, self.timeoutClients) 881 self._timeoutClients() 882 d = drive(self.fillCollectionSlots) 883 d.addCallback(self.checkStop) 884 d.addErrback(self.fillError)
885 886 887
888 - def reactorLoop(self):
889 """ 890 Twisted main loop 891 """ 892 reactor.startRunning() 893 while reactor.running: 894 try: 895 while reactor.running: 896 reactor.runUntilCurrent() 897 timeout = reactor.timeout() 898 reactor.doIteration(timeout) 899 except: 900 if reactor.running: 901 self.log.exception("Unexpected error in main loop.")
902 903 904
905 - def getDeviceList(self):
906 """ 907 Get the list of devices for which we are collecting: 908 * if -d devicename was used, use the devicename 909 * if a class path flag was supplied, gather the devices 910 along that organizer 911 * otherwise get all of the devices associated with our collector 912 913 @return: list of devices 914 @rtype: list 915 """ 916 if self.options.device: 917 self.log.info("Collecting for device %s", self.options.device) 918 return succeed([self.options.device]) 919 920 self.log.info("Collecting for path %s", self.options.path) 921 return self.config().callRemote('getDeviceListByOrganizer', 922 self.options.path, 923 self.options.monitor)
924 925
926 - def mainLoop(self, driver):
927 """ 928 Main collection loop, a Python iterable 929 930 @param driver: driver object 931 @type driver: driver object 932 @return: Twisted deferred object 933 @rtype: Twisted deferred object 934 """ 935 if self.options.cycle: 936 driveLater(self.cycleTime(), self.mainLoop) 937 938 if self.clients: 939 self.log.error("Modeling cycle taking too long") 940 return 941 942 self.start = time.time() 943 944 self.log.debug("Starting collector loop...") 945 yield self.getDeviceList() 946 self.devicegen = iter(driver.next()) 947 d = drive(self.fillCollectionSlots) 948 d.addErrback(self.fillError) 949 yield d 950 driver.next() 951 self.log.debug("Collection slots filled")
952 953 954
955 - def main(self, unused=None):
956 """ 957 Wrapper around the mainLoop 958 959 @param unused: unused (unused) 960 @type unused: string 961 @return: Twisted deferred object 962 @rtype: Twisted deferred object 963 """ 964 self.finished = [] 965 d = drive(self.mainLoop) 966 d.addCallback(self.timeoutClients) 967 return d
968 969 970
971 - def remote_deleteDevice(self, device):
972 """ 973 Stub function 974 975 @param device: device name (unused) 976 @type device: string 977 @todo: implement 978 """ 979 # we fetch the device list before every scan 980 self.log.debug("Asynch deleteDevice %s" % device)
981 982 983 if __name__ == '__main__': 984 dc = ZenModeler() 985 dc.processOptions() 986 reactor.run = dc.reactorLoop 987 dc.run() 988