Package Products :: Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.zenmodeler

   1  ############################################################################## 
   2  #  
   3  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
   4  #  
   5  # This content is made available according to terms specified in 
   6  # License.zenoss under the directory where your Zenoss product is installed. 
   7  #  
   8  ############################################################################## 
   9   
  10   
  11  __doc__= """Discover (aka model) a device and its components. 
  12  For instance, find out what Ethernet interfaces and hard disks a server 
  13  has available. 
  14  This information should change much less frequently than performance metrics. 
  15  """ 
  16  import Globals 
  17   
  18  # IMPORTANT! The import of the pysamba.twisted.reactor module should come before 
  19  # any other libraries that might possibly use twisted. This will ensure that 
  20  # the proper WmiReactor is installed before anyone else grabs a reference to 
  21  # the wrong reactor. 
  22  try: 
  23      import pysamba.twisted.reactor 
  24      from ZenPacks.zenoss.WindowsMonitor.WMIClient import WMIClient 
  25      from ZenPacks.zenoss.WindowsMonitor.utils import addNTLMv2Option, setNTLMv2Auth 
  26      USE_WMI = True 
  27  except ImportError: 
  28      USE_WMI = False 
  29   
  30  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
  31  from Products.ZenUtils.DaemonStats import DaemonStats 
  32  from Products.ZenUtils.Driver import drive, driveLater 
  33  from Products.ZenUtils.Utils import unused, atomicWrite, zenPath 
  34  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
  35  from twisted.python.failure import Failure 
  36   
  37  from PythonClient   import PythonClient 
  38  from SshClient      import SshClient 
  39  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
  40  from SnmpClient     import SnmpClient 
  41  from PortscanClient import PortscanClient 
  42   
  43  from Products.DataCollector import Classifier 
  44   
  45  from twisted.internet import reactor 
  46  from twisted.internet.defer import succeed 
  47   
  48  import collections 
  49  import cPickle as pickle 
  50  import time 
  51  import re 
  52  import DateTime 
  53   
  54  import os 
  55  import os.path 
  56  import sys 
  57  import traceback 
  58  from random import randint 
  59  from itertools import chain 
  60   
  61  defaultPortScanTimeout = 5 
  62  defaultParallel = 1 
  63  defaultProtocol = "ssh" 
  64  defaultPort = 22 
  65   
  66  # needed for Twisted's PB (Perspective Broker) to work 
  67  from Products.DataCollector import DeviceProxy 
  68  from Products.DataCollector import Plugins 
  69  unused(DeviceProxy, Plugins) 
70 71 -class ZenModeler(PBDaemon):
72 """ 73 Daemon class to attach to zenhub and pass along 74 device configuration information. 75 """ 76 77 name = 'zenmodeler' 78 initialServices = PBDaemon.initialServices + ['ModelerService'] 79 80 generateEvents = True 81 configCycleInterval = 360 82 83 classCollectorPlugins = () 84
85 - def __init__(self, single=False ):
86 """ 87 Initalizer 88 89 @param single: collect from a single device? 90 @type single: boolean 91 """ 92 PBDaemon.__init__(self) 93 # FIXME: cleanup --force option #2660 94 self.options.force = True 95 self.start = None 96 self.rrdStats = DaemonStats() 97 self.single = single 98 if self.options.device: 99 self.single = True 100 self.modelerCycleInterval = self.options.cycletime 101 self.collage = float( self.options.collage ) / 1440.0 102 self.pendingNewClients = False 103 self.clients = [] 104 self.finished = [] 105 self.devicegen = None 106 self.counters = collections.Counter() 107 108 # Delay start for between 10 and 60 minutes when run as a daemon. 109 self.started = False 110 self.startDelay = 0 111 if self.options.daemon: 112 if self.options.now: 113 self.log.debug("Run as a daemon, starting immediately.") 114 else: 115 # self.startDelay = randint(10, 60) * 60 116 self.startDelay = randint(10, 60) * 1 117 self.log.info("Run as a daemon, waiting %s seconds to start." % 118 self.startDelay) 119 else: 120 self.log.debug("Run in foreground, starting immediately.") 121 122 # load performance counters 123 self.loadCounters()
124
125 - def reportError(self, error):
126 """ 127 Log errors that have occurred 128 129 @param error: error message 130 @type error: string 131 """ 132 self.log.error("Error occured: %s", error)
133 134
135 - def connected(self):
136 """ 137 Called after connected to the zenhub service 138 """ 139 d = self.configure() 140 d.addCallback(self.heartbeat) 141 d.addErrback(self.reportError)
142 143
144 - def configure(self):
145 """ 146 Get our configuration from zenhub 147 """ 148 # add in the code to fetch cycle time, etc. 149 def inner(driver): 150 """ 151 Generator function to gather our configuration 152 153 @param driver: driver object 154 @type driver: driver object 155 """ 156 self.log.debug('fetching monitor properties') 157 yield self.config().callRemote('propertyItems') 158 items = dict(driver.next()) 159 if self.options.cycletime == 0: 160 self.modelerCycleInterval = items.get('modelerCycleInterval', 161 self.modelerCycleInterval) 162 self.configCycleInterval = items.get('configCycleInterval', 163 self.configCycleInterval) 164 reactor.callLater(self.configCycleInterval * 60, self.configure) 165 166 self.log.debug("Getting threshold classes...") 167 yield self.config().callRemote('getThresholdClasses') 168 self.remote_updateThresholdClasses(driver.next()) 169 170 self.log.debug("Fetching default RRDCreateCommand...") 171 yield self.config().callRemote('getDefaultRRDCreateCommand') 172 createCommand = driver.next() 173 174 self.log.debug("Getting collector thresholds...") 175 yield self.config().callRemote('getCollectorThresholds') 176 self.rrdStats.config(self.options.monitor, 177 self.name, 178 driver.next(), 179 createCommand) 180 181 self.log.debug("Getting collector plugins for each DeviceClass") 182 yield self.config().callRemote('getClassCollectorPlugins') 183 self.classCollectorPlugins = driver.next()
184 185 return drive(inner)
186 187
188 - def config(self):
189 """ 190 Get the ModelerService 191 """ 192 return self.services.get('ModelerService', FakeRemote())
193 194
195 - def selectPlugins(self, device, transport):
196 """ 197 Build a list of active plugins for a device, based on: 198 199 * the --collect command-line option which is a regex 200 * the --ignore command-line option which is a regex 201 * transport which is a string describing the type of plugin 202 203 @param device: device to collect against 204 @type device: string 205 @param transport: python, ssh, snmp, telnet, cmd 206 @type transport: string 207 @return: results of the plugin 208 @type: string 209 @todo: determine if an event for the collector AND the device should be sent 210 """ 211 plugins = [] 212 valid_loaders = [] 213 for loader in device.plugins: 214 try: 215 plugin= loader.create() 216 self.log.debug( "Loaded plugin %s" % plugin.name() ) 217 plugins.append( plugin ) 218 valid_loaders.append( loader ) 219 220 except (SystemExit, KeyboardInterrupt), ex: 221 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 222 raise 223 224 except Plugins.PluginImportError, import_error: 225 import socket 226 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 227 collector_host= socket.gethostname() 228 # NB: an import errror affects all devices, 229 # so report the issue against the collector 230 # TOOD: determine if an event for the collector AND the device should be sent 231 evt= { "eventClass":"/Status/Update", "component":component, 232 "agent":collector_host, "device":collector_host, 233 "severity":Error } 234 235 info= "Problem loading plugin %s" % import_error.plugin 236 self.log.error( info ) 237 evt[ 'summary' ]= info 238 239 info= import_error.traceback 240 self.log.error( info ) 241 evt[ 'message' ]= info 242 243 info= ("Due to import errors, removing the %s plugin" 244 " from this collection cycle.") % import_error.plugin 245 self.log.error( info ) 246 evt[ 'message' ] += "%s\n" % info 247 self.sendEvent( evt ) 248 249 # Make sure that we don't generate messages for bad loaders again 250 # NB: doesn't update the device's zProperties 251 if len( device.plugins ) != len( valid_loaders ): 252 device.plugins= valid_loaders 253 254 # Create functions to search for what plugins we will and 255 # won't supply to the device 256 collectTest = lambda x: False 257 ignoreTest = lambda x: False 258 if self.options.collectPlugins: 259 collectTest = re.compile(self.options.collectPlugins).search 260 elif self.options.ignorePlugins: 261 ignoreTest = re.compile(self.options.ignorePlugins).search 262 263 result = [] 264 for plugin in plugins: 265 if plugin.transport != transport: 266 continue 267 name = plugin.name() 268 if ignoreTest(name): 269 self.log.debug("Ignoring %s on %s because of --ignore flag", 270 name, device.id) 271 elif collectTest(name): 272 self.log.debug("Using %s on %s because of --collect flag", 273 name, device.id) 274 result.append(plugin) 275 elif not self.options.collectPlugins: 276 self.log.debug("Using %s on %s", name, device.id) 277 result.append(plugin) 278 return result
279 280 281
282 - def collectDevice(self, device):
283 """ 284 Collect data from a single device. 285 286 @param device: device to collect against 287 @type device: string 288 """ 289 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 290 ip = device.manageIp 291 timeout = clientTimeout + time.time() 292 if USE_WMI: 293 self.wmiCollect(device, ip, timeout) 294 else: 295 self.log.info("skipping WMI-based collection, PySamba zenpack not installed") 296 self.pythonCollect(device, ip, timeout) 297 self.cmdCollect(device, ip, timeout) 298 self.snmpCollect(device, ip, timeout) 299 self.portscanCollect(device, ip, timeout)
300 301 302
303 - def wmiCollect(self, device, ip, timeout):
304 """ 305 Start the Windows Management Instrumentation (WMI) collector 306 307 @param device: device to collect against 308 @type device: string 309 @param ip: IP address of device to collect against 310 @type ip: string 311 @param timeout: timeout before failing the connection 312 @type timeout: integer 313 """ 314 if self.options.nowmi: 315 return 316 317 client = None 318 try: 319 plugins = self.selectPlugins(device, 'wmi') 320 if not plugins: 321 self.log.info("No WMI plugins found for %s" % device.id) 322 return 323 if self.checkCollection(device): 324 self.log.info('WMI collector method for device %s' % device.id) 325 self.log.info("plugins: %s", 326 ", ".join(map(lambda p: p.name(), plugins))) 327 client = WMIClient(device, self, plugins) 328 if not client or not plugins: 329 self.log.warn("WMI collector creation failed") 330 return 331 except (SystemExit, KeyboardInterrupt): 332 raise 333 except Exception: 334 self.log.exception("Error opening WMI collector") 335 self.addClient(client, timeout, 'WMI', device.id)
336 337 338
339 - def pythonCollect(self, device, ip, timeout):
340 """ 341 Start local Python collection client. 342 343 @param device: device to collect against 344 @type device: string 345 @param ip: IP address of device to collect against 346 @type ip: string 347 @param timeout: timeout before failing the connection 348 @type timeout: integer 349 """ 350 client = None 351 try: 352 plugins = self.selectPlugins(device, "python") 353 if not plugins: 354 self.log.info("No Python plugins found for %s" % device.id) 355 return 356 if self.checkCollection(device): 357 self.log.info('Python collection device %s' % device.id) 358 self.log.info("plugins: %s", 359 ", ".join(map(lambda p: p.name(), plugins))) 360 client = PythonClient(device, self, plugins) 361 if not client or not plugins: 362 self.log.warn("Python client creation failed") 363 return 364 except (SystemExit, KeyboardInterrupt): raise 365 except: 366 self.log.exception("Error opening pythonclient") 367 self.addClient(client, timeout, 'python', device.id)
368 369
370 - def cmdCollect(self, device, ip, timeout):
371 """ 372 Start shell command collection client. 373 374 @param device: device to collect against 375 @type device: string 376 @param ip: IP address of device to collect against 377 @type ip: string 378 @param timeout: timeout before failing the connection 379 @type timeout: integer 380 """ 381 client = None 382 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 383 384 hostname = device.id 385 try: 386 plugins = self.selectPlugins(device,"command") 387 if not plugins: 388 self.log.info("No command plugins found for %s" % hostname) 389 return 390 391 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 392 commandPort = getattr(device, 'zCommandPort', defaultPort) 393 394 if protocol == "ssh": 395 client = SshClient(hostname, ip, commandPort, 396 options=self.options, 397 plugins=plugins, device=device, 398 datacollector=self, isLoseConnection=True) 399 clientType = 'ssh' 400 self.log.info('Using SSH collection method for device %s' 401 % hostname) 402 403 elif protocol == 'telnet': 404 if commandPort == 22: commandPort = 23 #set default telnet 405 client = TelnetClient(hostname, ip, commandPort, 406 options=self.options, 407 plugins=plugins, device=device, 408 datacollector=self) 409 clientType = 'telnet' 410 self.log.info('Using telnet collection method for device %s' 411 % hostname) 412 413 else: 414 info = ("Unknown protocol %s for device %s -- " 415 "defaulting to %s collection method" % 416 (protocol, hostname, clientType )) 417 self.log.warn( info ) 418 import socket 419 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 420 collector_host= socket.gethostname() 421 evt= { "eventClass":"/Status/Update", "agent":collector_host, 422 "device":hostname, "severity":Error } 423 evt[ 'summary' ]= info 424 self.sendEvent( evt ) 425 return 426 427 if not client: 428 self.log.warn("Shell command collector creation failed") 429 else: 430 self.log.info("plugins: %s", 431 ", ".join(map(lambda p: p.name(), plugins))) 432 except (SystemExit, KeyboardInterrupt): raise 433 except: 434 self.log.exception("Error opening command collector") 435 self.addClient(client, timeout, clientType, device.id)
436 437 438
439 - def snmpCollect(self, device, ip, timeout):
440 """ 441 Start SNMP collection client. 442 443 @param device: device to collect against 444 @type device: string 445 @param ip: IP address of device to collect against 446 @type ip: string 447 @param timeout: timeout before failing the connection 448 @type timeout: integer 449 """ 450 client = None 451 try: 452 hostname = device.id 453 if getattr( device, "zSnmpMonitorIgnore", True ): 454 self.log.info("SNMP monitoring off for %s" % hostname) 455 return 456 457 if not ip: 458 self.log.info("No manage IP for %s" % hostname) 459 return 460 461 plugins = [] 462 plugins = self.selectPlugins(device,"snmp") 463 if not plugins: 464 self.log.info("No SNMP plugins found for %s" % hostname) 465 return 466 467 if self.checkCollection(device): 468 self.log.info('SNMP collection device %s' % hostname) 469 self.log.info("plugins: %s", 470 ", ".join(map(lambda p: p.name(), plugins))) 471 client = SnmpClient(device.id, ip, self.options, 472 device, self, plugins) 473 if not client or not plugins: 474 self.log.warn("SNMP collector creation failed") 475 return 476 except (SystemExit, KeyboardInterrupt): raise 477 except: 478 self.log.exception("Error opening the SNMP collector") 479 self.addClient(client, timeout, 'SNMP', device.id)
480 481 482 ######## need to make async test for snmp work at some point -EAD ######### 483 # def checkSnmpConnection(self, device): 484 # """ 485 # Check to see if our current community string is still valid 486 # 487 # @param device: the device against which we will check 488 # @type device: a Device instance 489 # @return: result is None or a tuple containing 490 # (community, port, version, snmp name) 491 # @rtype: deferred: Twisted deferred 492 # """ 493 # from pynetsnmp.twistedsnmp import AgentProxy 494 # 495 # def inner(driver): 496 # self.log.debug("Checking SNMP community %s on %s", 497 # device.zSnmpCommunity, device.id) 498 # 499 # oid = ".1.3.6.1.2.1.1.5.0" 500 # proxy = AgentProxy(device.id, 501 # device.zSnmpPort, 502 # timeout=device.zSnmpTimeout, 503 # community=device.zSnmpCommunity, 504 # snmpVersion=device.zSnmpVer, 505 # tries=2) 506 # proxy.open() 507 # yield proxy.get([oid]) 508 # devname = driver.next().values()[0] 509 # if devname: 510 # yield succeed(True) 511 # yield succeed(False) 512 # 513 # return drive(inner) 514 515
516 - def addClient(self, device, timeout, clientType, name):
517 """ 518 If device is not None, schedule the device to be collected. 519 Otherwise log an error. 520 521 @param device: device to collect against 522 @type device: string 523 @param timeout: timeout before failing the connection 524 @type timeout: integer 525 @param clientType: description of the plugin type 526 @type clientType: string 527 @param name: plugin name 528 @type name: string 529 """ 530 if device: 531 device.timeout = timeout 532 device.timedOut = False 533 self.clients.append(device) 534 device.run() 535 else: 536 self.log.warn('Unable to create a %s collector for %s', 537 clientType, name)
538 539 540 # XXX double-check this, once the implementation is in place
541 - def portscanCollect(self, device, ip, timeout):
542 """ 543 Start portscan collection client. 544 545 @param device: device to collect against 546 @type device: string 547 @param ip: IP address of device to collect against 548 @type ip: string 549 @param timeout: timeout before failing the connection 550 @type timeout: integer 551 """ 552 client = None 553 try: 554 hostname = device.id 555 plugins = self.selectPlugins(device, "portscan") 556 if not plugins: 557 self.log.info("No portscan plugins found for %s" % hostname) 558 return 559 if self.checkCollection(device): 560 self.log.info('Portscan collector method for device %s' 561 % hostname) 562 self.log.info("plugins: %s", 563 ", ".join(map(lambda p: p.name(), plugins))) 564 client = PortscanClient(device.id, ip, self.options, 565 device, self, plugins) 566 if not client or not plugins: 567 self.log.warn("Portscan collector creation failed") 568 return 569 except (SystemExit, KeyboardInterrupt): raise 570 except: 571 self.log.exception("Error opening portscan collector") 572 self.addClient(client, timeout, 'portscan', device.id)
573 574
575 - def checkCollection(self, device):
576 """ 577 See how old the data is that we've collected 578 579 @param device: device to collect against 580 @type device: string 581 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 582 @type: boolean 583 """ 584 age = device.getSnmpLastCollection() + self.collage 585 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 586 self.log.info("Skipped collection of %s" % device.id) 587 return False 588 return True
589
590 - def clientFinished(self, collectorClient):
591 """ 592 Callback that processes the return values from a device. 593 Python iterable. 594 @param collectorClient: collector instance 595 @type collectorClient: collector class 596 @return: Twisted deferred object 597 @type: Twisted deferred object 598 """ 599 device = collectorClient.device 600 self.log.debug("Client for %s finished collecting", device.id) 601 def processClient(driver): 602 try: 603 if (isinstance(collectorClient, SnmpClient) 604 and collectorClient.connInfo.changed == True): 605 self.log.info( 606 "SNMP connection info for %s changed. Updating...", 607 device.id) 608 yield self.config().callRemote('setSnmpConnectionInfo', 609 device.id, 610 collectorClient.connInfo.zSnmpVer, 611 collectorClient.connInfo.zSnmpPort, 612 collectorClient.connInfo.zSnmpCommunity 613 ) 614 driver.next() 615 616 pluginStats = {} 617 self.log.debug("Processing data for device %s", device.id) 618 devchanged = False 619 maps = [] 620 for plugin, results in collectorClient.getResults(): 621 if plugin is None: continue 622 self.log.debug("Processing plugin %s on device %s ...", 623 plugin.name(), device.id) 624 if not results: 625 self.log.warn("The plugin %s returned no results.", 626 plugin.name()) 627 continue 628 self.log.debug("Plugin %s results = %s", plugin.name(), results) 629 datamaps = [] 630 try: 631 results = plugin.preprocess(results, self.log) 632 if results: 633 datamaps = plugin.process(device, results, self.log) 634 if datamaps: 635 pluginStats.setdefault(plugin.name(), plugin.weight) 636 637 except (SystemExit, KeyboardInterrupt), ex: 638 self.log.info( "Plugin %s terminated due to external" 639 " signal (%s)" % (plugin.name(), str(ex) ) 640 ) 641 continue 642 643 except Exception, ex: 644 # NB: don't discard the plugin, as it might be a 645 # temporary issue 646 # Also, report it against the device, rather than at 647 # a collector as it might be just for this device. 648 import socket 649 component= os.path.splitext( 650 os.path.basename( sys.argv[0] ) 651 )[0] 652 collector_host= socket.gethostname() 653 evt= { "eventClass":"/Status/Update", 654 "agent":collector_host, "device":device.id, 655 "severity":Error } 656 657 info= "Problem while executing plugin %s" %plugin.name() 658 self.log.error( info ) 659 evt[ 'summary' ]= info 660 661 info= traceback.format_exc() 662 self.log.error( info ) 663 evt[ 'message' ]= info 664 self.sendEvent( evt ) 665 continue 666 667 # allow multiple maps to be returned from one plugin 668 if not isinstance(datamaps, (list, tuple)): 669 datamaps = [datamaps,] 670 if datamaps: 671 maps += [m for m in datamaps if m] 672 if maps: 673 deviceClass = Classifier.classifyDevice(pluginStats, 674 self.classCollectorPlugins) 675 yield self.config().callRemote( 676 'applyDataMaps', device.id, 677 maps, deviceClass, True) 678 679 if driver.next(): 680 devchanged = True 681 if devchanged: 682 self.log.info("Changes in configuration applied") 683 else: 684 self.log.info("No change in configuration detected") 685 686 except Exception, ex: 687 self.log.exception(ex) 688 raise
689 690 def processClientFinished(result): 691 """ 692 Called after the client collection finishes 693 694 @param result: object (unused) 695 @type result: object 696 """ 697 self.counters['modeledDevicesCount'] += 1 698 # result is now the result of remote_applyDataMaps (from processClient) 699 if result and isinstance(result, (basestring, Failure)): 700 self.log.error("Client %s finished with message: %s" % 701 (device.id, result)) 702 else: 703 self.log.debug("Client %s finished" % device.id) 704 705 try: 706 self.clients.remove(collectorClient) 707 self.finished.append(collectorClient) 708 except ValueError: 709 self.log.debug("Client %s not found in in the list" 710 " of active clients", 711 device.id) 712 d = drive(self.fillCollectionSlots) 713 d.addErrback(self.fillError) 714 715 d = drive(processClient) 716 d.addBoth(processClientFinished) 717 718 719
720 - def fillError(self, reason):
721 """ 722 Twisted errback routine to log an error when 723 unable to collect some data 724 725 @param reason: error message 726 @type reason: string 727 """ 728 self.log.error("Unable to fill collection slots: %s" % reason)
729 730
731 - def cycleTime(self):
732 """ 733 Return our cycle time (in minutes) 734 735 @return: cycle time 736 @rtype: integer 737 """ 738 return self.modelerCycleInterval * 60
739 740
741 - def heartbeat(self, ignored=None):
742 """ 743 Twisted keep-alive mechanism to ensure that 744 we're still connected to zenhub 745 746 @param ignored: object (unused) 747 @type ignored: object 748 """ 749 ARBITRARY_BEAT = 30 750 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 751 if self.options.cycle: 752 evt = dict(eventClass=Heartbeat, 753 component='zenmodeler', 754 device=self.options.monitor, 755 timeout=3*ARBITRARY_BEAT) 756 self.sendEvent(evt) 757 self.niceDoggie(self.cycleTime()) 758 759 # We start modeling from here to accomodate the startup delay. 760 if not self.started: 761 self.started = True 762 reactor.callLater(self.startDelay, self.main) 763 764 # save modeled device rate 765 self.rrdStats.derive('modeledDevices', ARBITRARY_BEAT, self.counters['modeledDevicesCount']) 766 767 # save running count 768 self.rrdStats.gauge('modeledDevicesCount', ARBITRARY_BEAT, self.counters['modeledDevicesCount']) 769 770 # persist counters values 771 self.saveCounters()
772
773 - def _getCountersFile(self):
774 return zenPath('var/%s_%s.pickle' % (self.name, self.options.monitor,))
775
776 - def saveCounters(self):
777 atomicWrite( 778 self._getCountersFile(), 779 pickle.dumps(self.counters), 780 raiseException=False, 781 )
782
783 - def loadCounters(self):
784 try: 785 self.counters = pickle.load(open(self._getCountersFile())) 786 except Exception: 787 self.counters = collections.Counter()
788 789 @property
790 - def _devicegen_has_items(self):
791 """check it self.devicegen (an iterator) is not empty and has at least 792 one value. doing this check changes the iterator, so this method 793 restores it to its original state before returning""" 794 result = False 795 if self.devicegen is not None: 796 try: 797 first = self.devicegen.next() 798 except StopIteration: 799 pass 800 else: 801 result = True 802 self.devicegen = chain([first], self.devicegen) 803 return result
804 805
806 - def checkStop(self, unused = None):
807 """ 808 Check to see if there's anything to do. 809 If there isn't, report our statistics and exit. 810 811 @param unused: unused (unused) 812 @type unused: string 813 """ 814 if self.pendingNewClients or self.clients: return 815 if self._devicegen_has_items: return 816 817 if self.start: 818 runTime = time.time() - self.start 819 self.start = None 820 self.log.info("Scan time: %0.2f seconds", runTime) 821 devices = len(self.finished) 822 timedOut = len([c for c in self.finished if c.timedOut]) 823 self.sendEvents( 824 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 825 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 826 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 827 ) 828 if not self.options.cycle: 829 self.stop() 830 self.finished = []
831
832 - def fillCollectionSlots(self, driver):
833 """ 834 An iterator which either returns a device to collect or 835 calls checkStop() 836 @param driver: driver object 837 @type driver: driver object 838 """ 839 count = len(self.clients) 840 while count < self.options.parallel and self._devicegen_has_items \ 841 and not self.pendingNewClients: 842 self.pendingNewClients = True 843 try: 844 device = self.devicegen.next() 845 yield self.config().callRemote('getDeviceConfig', [device], 846 self.options.checkStatus) 847 # just collect one device, and let the timer add more 848 devices = driver.next() 849 if devices: 850 d = devices[0] 851 if d.skipModelMsg: 852 self.log.info(d.skipModelMsg) 853 else: 854 self.collectDevice(d) 855 else: 856 self.log.info("Device %s not returned is it down?", device) 857 except StopIteration: 858 self.devicegen = None 859 finally: 860 self.pendingNewClients = False 861 break 862 update = len(self.clients) 863 if update != count and update != 1: 864 self.log.info('Running %d clients', update) 865 else: 866 self.log.debug('Running %d clients', update) 867 self.checkStop()
868
869 - def buildOptions(self):
870 """ 871 Build our list of command-line options 872 """ 873 PBDaemon.buildOptions(self) 874 self.parser.add_option('--debug', 875 dest='debug', action="store_true", default=False, 876 help="Don't fork threads for processing") 877 self.parser.add_option('--nowmi', 878 dest='nowmi', action="store_true", default=not USE_WMI, 879 help="Do not execute WMI plugins") 880 self.parser.add_option('--parallel', dest='parallel', 881 type='int', default=defaultParallel, 882 help="Number of devices to collect from in parallel") 883 self.parser.add_option('--cycletime', 884 dest='cycletime',default=720,type='int', 885 help="Run collection every x minutes") 886 self.parser.add_option('--ignore', 887 dest='ignorePlugins',default="", 888 help="Modeler plugins to ignore. Takes a regular expression") 889 self.parser.add_option('--collect', 890 dest='collectPlugins',default="", 891 help="Modeler plugins to use. Takes a regular expression") 892 self.parser.add_option('-p', '--path', dest='path', 893 help="Start class path for collection ie /NetworkDevices") 894 self.parser.add_option('-d', '--device', dest='device', 895 help="Fully qualified device name ie www.confmon.com") 896 self.parser.add_option('-a', '--collage', 897 dest='collage', default=0, type='float', 898 help="Do not collect from devices whose collect date " + 899 "is within this many minutes") 900 self.parser.add_option('--writetries', 901 dest='writetries',default=2,type='int', 902 help="Number of times to try to write if a " 903 "read conflict is found") 904 # FIXME: cleanup --force option #2660 905 self.parser.add_option("-F", "--force", 906 dest="force", action='store_true', default=True, 907 help="Force collection of config data (deprecated)") 908 self.parser.add_option('--portscantimeout', dest='portscantimeout', 909 type='int', default=defaultPortScanTimeout, 910 help="Time to wait for connection failures when port scanning") 911 self.parser.add_option('--now', 912 dest='now', action="store_true", default=False, 913 help="Start daemon now, do not sleep before starting") 914 self.parser.add_option('--communities', 915 dest='discoverCommunity', action="store_true", default=False, 916 help="If an snmp connection fails try and rediscover it's connection info") 917 self.parser.add_option('--checkstatus', 918 dest='checkStatus', action="store_true", default=False, 919 help="Don't model if the device is ping or snmp down") 920 TCbuildOptions(self.parser, self.usage) 921 if USE_WMI: 922 addNTLMv2Option(self.parser)
923
924 - def processOptions(self):
925 """ 926 Check what the user gave us vs what we'll accept 927 for command-line options 928 """ 929 if not self.options.path and not self.options.device: 930 self.options.path = "/Devices" 931 if self.options.ignorePlugins and self.options.collectPlugins: 932 raise SystemExit( "Only one of --ignore or --collect" 933 " can be used at a time") 934 if USE_WMI: 935 setNTLMv2Auth(self.options)
936
937 - def _timeoutClients(self):
938 """ 939 The guts of the timeoutClients method (minus the twisted reactor 940 stuff). Breaking this part out as a separate method facilitates unit 941 testing. 942 """ 943 active = [] 944 for client in self.clients: 945 if client.timeout < time.time(): 946 self.log.warn("Client %s timeout", client.hostname) 947 self.finished.append(client) 948 client.timedOut = True 949 try: 950 client.stop() 951 except AssertionError, ex: 952 pass # session closed twice http://dev.zenoss.org/trac/ticket/6354 953 else: 954 active.append(client) 955 self.clients = active
956 957 958
959 - def timeoutClients(self, unused=None):
960 """ 961 Check to see which clients have timed out and which ones haven't. 962 Stop processing anything that's timed out. 963 964 @param unused: unused (unused) 965 @type unused: string 966 """ 967 reactor.callLater(1, self.timeoutClients) 968 self._timeoutClients() 969 d = drive(self.fillCollectionSlots) 970 d.addCallback(self.checkStop) 971 d.addErrback(self.fillError)
972 973 974
975 - def reactorLoop(self):
976 """ 977 Twisted main loop 978 """ 979 reactor.startRunning() 980 while reactor.running: 981 try: 982 while reactor.running: 983 reactor.runUntilCurrent() 984 timeout = reactor.timeout() 985 reactor.doIteration(timeout) 986 except Exception: 987 if reactor.running: 988 self.log.exception("Unexpected error in main loop.")
989 990 991
992 - def getDeviceList(self):
993 """ 994 Get the list of devices for which we are collecting: 995 * if -d devicename was used, use the devicename 996 * if a class path flag was supplied, gather the devices 997 along that organizer 998 * otherwise get all of the devices associated with our collector 999 1000 @return: list of devices 1001 @rtype: list 1002 """ 1003 if self.options.device: 1004 self.log.info("Collecting for device %s", self.options.device) 1005 return succeed([self.options.device]) 1006 1007 self.log.info("Collecting for path %s", self.options.path) 1008 1009 d = self.config().callRemote('getDeviceListByOrganizer', 1010 self.options.path, 1011 self.options.monitor) 1012 def handle(results): 1013 self.log.critical("%s is not a valid organizer." % (self.options.path)) 1014 reactor.running = False 1015 sys.exit(1)
1016 1017 1018 d.addErrback(handle) 1019 return d 1020
1021 - def mainLoop(self, driver):
1022 """ 1023 Main collection loop, a Python iterable 1024 1025 @param driver: driver object 1026 @type driver: driver object 1027 @return: Twisted deferred object 1028 @rtype: Twisted deferred object 1029 """ 1030 if self.options.cycle: 1031 driveLater(self.cycleTime(), self.mainLoop) 1032 1033 if self.clients: 1034 self.log.error("Modeling cycle taking too long") 1035 return 1036 1037 self.start = time.time() 1038 1039 self.log.debug("Starting collector loop...") 1040 yield self.getDeviceList() 1041 self.devicegen = iter(driver.next()) 1042 d = drive(self.fillCollectionSlots) 1043 d.addErrback(self.fillError) 1044 yield d 1045 driver.next() 1046 self.log.debug("Collection slots filled")
1047 1048 1049
1050 - def main(self, unused=None):
1051 """ 1052 Wrapper around the mainLoop 1053 1054 @param unused: unused (unused) 1055 @type unused: string 1056 @return: Twisted deferred object 1057 @rtype: Twisted deferred object 1058 """ 1059 self.finished = [] 1060 d = drive(self.mainLoop) 1061 d.addCallback(self.timeoutClients) 1062 return d
1063 1064 1065
1066 - def remote_deleteDevice(self, device):
1067 """ 1068 Stub function 1069 1070 @param device: device name (unused) 1071 @type device: string 1072 @todo: implement 1073 """ 1074 # we fetch the device list before every scan 1075 self.log.debug("Asynch deleteDevice %s" % device)
1076 1077 1078 if __name__ == '__main__': 1079 dc = ZenModeler() 1080 dc.processOptions() 1081 reactor.run = dc.reactorLoop 1082 try: 1083 dc.run() 1084 finally: 1085 dc.saveCounters() 1086