Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.zenmodeler

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import time 
 15  import types 
 16  import re 
 17  import socket 
 18   
 19  import Globals 
 20  import transaction 
 21  import DateTime 
 22  from twisted.internet import reactor 
 23   
 24  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 25  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 26   
 27  from ApplyDataMap import ApplyDataMap, ApplyDataMapThread 
 28  import SshClient 
 29  import TelnetClient 
 30  import SnmpClient 
 31  import PortscanClient 
 32   
 33  from Exceptions import * 
 34   
 35  defaultPortScanTimeout = 5 
 36  defaultParallel = 1 
 37  defaultProtocol = "ssh" 
 38  defaultPort = 22 
 39  defaultStartSleep = 10 * 60 
 40   
 41  from Plugins import loadPlugins 
 42   
43 -class ZenModeler(ZCmdBase):
44 45 generateEvents = True 46
47 - def __init__(self,noopts=0,app=None,single=False, 48 threaded=None,keeproot=False):
49 ZCmdBase.__init__(self, noopts, app, keeproot) 50 51 if self.options.daemon: 52 if self.options.now: 53 self.log.debug("Run as a daemon, starting immediately.") 54 else: 55 self.log.debug("Run as a daemon, waiting %s sec to start." % defaultStartSleep) 56 time.sleep(defaultStartSleep) 57 self.log.debug("Run as a daemon, slept %s sec, starting now." % defaultStartSleep) 58 else: 59 self.log.debug("Run in foreground, starting immediately.") 60 61 self.single = single 62 if self.options.device: 63 self.single = True 64 self.threaded = threaded 65 if self.threaded is None: 66 self.threaded = not self.options.nothread 67 self.cycletime = self.options.cycletime*60 68 self.collage = self.options.collage / 1440.0 69 self.clients = [] 70 self.finished = [] 71 self.collectorPlugins = {} 72 self.devicegen = None 73 self.loadPlugins() 74 self.slowDown = False 75 if self.threaded and not self.single: 76 self.log.info("starting apply in separate thread.") 77 self.applyData = ApplyDataMapThread(self, self.getConnection()) 78 self.applyData.start() 79 else: 80 self.log.debug("in debug mode starting apply in main thread.") 81 self.applyData = ApplyDataMap(self)
82
83 - def loadPlugins(self):
84 """Load plugins from the plugin directory. 85 """ 86 self.collectorPlugins = loadPlugins(self.dmd)
87 88
89 - def selectPlugins(self, device, transport):
90 """Build a list of active plugins for a device. 91 """ 92 names = getattr(device, 'zCollectorPlugins', []) 93 result = [] 94 collectTest = lambda x: False 95 ignoreTest = lambda x: False 96 if self.options.collectPlugins: 97 collectTest = re.compile(self.options.collectPlugins).search 98 elif self.options.ignorePlugins: 99 ignoreTest = re.compile(self.options.ignorePlugins).search 100 for plugin in self.collectorPlugins.values(): 101 if plugin.transport != transport: 102 continue 103 name = plugin.name() 104 try: 105 if not plugin.condition(device, self.log): 106 self.log.debug("condition failed %s on %s",name,device.id) 107 elif ignoreTest(name): 108 self.log.debug("ignoring %s on %s",name, device.id) 109 elif name in names and not self.options.collectPlugins: 110 self.log.debug("using %s on %s",name, device.id) 111 result.append(plugin) 112 elif collectTest(name): 113 self.log.debug("--collect %s on %s", name, device.id) 114 result.append(plugin) 115 else: 116 self.log.debug("skipping %s for %s", name, device.id) 117 except (SystemExit, KeyboardInterrupt): raise 118 except: 119 self.log.exception("failed to select plugin %s", name) 120 return result
121 122
123 - def resolveDevice(self, device):
124 """If device is a string look it up in the dmd. 125 """ 126 if type(device) == types.StringType: 127 dname = device 128 device = self.dmd.Devices.findDevice(device) 129 if not device: 130 raise DataCollectorError("device %s not found" % dname) 131 return device
132 133
134 - def collectDevice(self, device):
135 """Collect data from a single device. 136 """ 137 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 138 ip = device.getManageIp() 139 if not ip: 140 ip = device.setManageIp() 141 timeout = clientTimeout + time.time() 142 self.cmdCollect(device, ip, timeout) 143 self.snmpCollect(device, ip, timeout) 144 self.portscanCollect(device, ip, timeout)
145 146
147 - def cmdCollect(self, device, ip, timeout):
148 """Start command collection client. 149 """ 150 client = None 151 clientType = 'snmp' 152 hostname = device.id 153 try: 154 plugins = self.selectPlugins(device,"command") 155 commands = map(lambda x: (x.name(), x.command), plugins) 156 if not commands: 157 self.log.info("no cmd plugins found for %s" % hostname) 158 return 159 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 160 commandPort = getattr(device, 'zCommandPort', defaultPort) 161 if protocol == "ssh": 162 client = SshClient.SshClient(hostname, ip, commandPort, 163 options=self.options, 164 commands=commands, device=device, 165 datacollector=self) 166 clientType = 'ssh' 167 self.log.info('using ssh collection device %s' % hostname) 168 elif protocol == 'telnet': 169 if commandPort == 22: commandPort = 23 #set default telnet 170 client = TelnetClient.TelnetClient(hostname, ip, commandPort, 171 options=self.options, 172 commands=commands, device=device, 173 datacollector=self) 174 clientType = 'telnet' 175 self.log.info('using telnet collection device %s' % hostname) 176 else: 177 self.log.warn("unknown protocol %s for device %s", 178 protocol, hostname) 179 if not client: 180 self.log.warn("cmd client creation failed") 181 else: 182 self.log.info("plugins: %s", 183 ", ".join(map(lambda p: p.name(), plugins))) 184 except (SystemExit, KeyboardInterrupt): raise 185 except: 186 self.log.exception("error opening cmdclient") 187 self.addClient(client, timeout, clientType, device.id)
188 189
190 - def snmpCollect(self, device, ip, timeout):
191 """Start snmp collection client. 192 """ 193 client = None 194 try: 195 plugins = [] 196 hostname = device.id 197 plugins = self.selectPlugins(device,"snmp") 198 if not plugins: 199 self.log.info("no snmp plugins found for %s" % hostname) 200 return 201 if self.checkCollection(device): 202 self.log.info('snmp collection device %s' % hostname) 203 self.log.info("plugins: %s", 204 ", ".join(map(lambda p: p.name(), plugins))) 205 client = SnmpClient.SnmpClient(device.id, 206 ip, 207 self.options, 208 device, 209 self, 210 plugins) 211 if not client or not plugins: 212 self.log.warn("snmp client creation failed") 213 return 214 except (SystemExit, KeyboardInterrupt): raise 215 except: 216 self.log.exception("error opening snmpclient") 217 self.addClient(client, timeout, 'snmp', device.id)
218
219 - def addClient(self, obj, timeout, clientType, name):
220 if obj: 221 obj.timeout = timeout 222 self.clients.append(obj) 223 obj.run() 224 else: 225 self.log.warn('Unable to create a %s client for %s', 226 clientType, name)
227 228 229 # XXX double-check this, once the implementation is in place
230 - def portscanCollect(self, device, ip, timeout):
231 """ 232 Start portscan collection client. 233 """ 234 client = None 235 try: 236 plugins = [] 237 hostname = device.id 238 plugins = self.selectPlugins(device, "portscan") 239 if not plugins: 240 self.log.info("no portscan plugins found for %s" % hostname) 241 return 242 if self.checkCollection(device): 243 self.log.info('portscan collection device %s' % hostname) 244 self.log.info("plugins: %s", 245 ", ".join(map(lambda p: p.name(), plugins))) 246 client = PortscanClient.PortscanClient(device.id, ip, 247 self.options, device, self, plugins) 248 if not client or not plugins: 249 self.log.warn("portscan client creation failed") 250 return 251 except (SystemExit, KeyboardInterrupt): raise 252 except: 253 self.log.exception("error opening portscanclient") 254 self.addClient(client, timeout, 'portscan', device.id)
255 256
257 - def checkCollection(self, device):
258 age = device.getSnmpLastCollection() + self.collage 259 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 260 self.log.info("skipped collection of %s" % device.getId()) 261 return False 262 return True
263 264
265 - def clientFinished(self, collectorClient):
266 """Callback that processes the return values from a device. 267 """ 268 try: 269 self.log.debug("client for %s finished collecting", 270 collectorClient.hostname) 271 device = collectorClient.device 272 self.applyData.processClient(device, collectorClient) 273 finally: 274 try: 275 self.clients.remove(collectorClient) 276 self.finished.append(collectorClient) 277 except ValueError: 278 self.log.warn("client %s not found in active clients", 279 collectorClient.hostname) 280 self.fillCollectionSlots()
281 282
283 - def checkStop(self):
284 "if there's nothing left to do, maybe we should terminate" 285 if self.clients: return 286 if self.devicegen: return 287 288 if self.start: 289 runTime = time.time() - self.start 290 self.log.info("scan time: %0.2f seconds", runTime) 291 self.start = None 292 if self.options.cycle: 293 evt = dict(eventClass=Heartbeat, 294 component='zenmodeler', 295 device=socket.getfqdn(), 296 timeout=self.cycletime*3) 297 if self.dmd: 298 self.dmd.ZenEventManager.sendEvent(evt) 299 else: 300 self.stop()
301
302 - def fillCollectionSlots(self):
303 """If there are any free collection slots fill them up 304 """ 305 count = len(self.clients) 306 while (count < self.options.parallel and 307 self.devicegen and not self.slowDown): 308 try: 309 device = self.devicegen.next() 310 if (device.productionState <= 311 getattr(device,'zProdStateThreshold',0)): 312 self.log.info("skipping %s production state too low", 313 device.id) 314 continue 315 # just collect one device, and let the timer add more 316 self.collectDevice(device) 317 except StopIteration: 318 self.devicegen = None 319 break 320 321 update = len(self.clients) 322 if update != count and update != 1: 323 self.log.info('Running %d clients', update) 324 self.checkStop()
325 326
327 - def buildOptions(self):
328 ZCmdBase.buildOptions(self) 329 self.parser.add_option('--debug', 330 dest='debug', action="store_true", default=False, 331 help="don't fork threads for processing") 332 self.parser.add_option('--nothread', 333 dest='nothread', action="store_true", default=True, 334 help="do not use threads when applying updates") 335 self.parser.add_option('--parallel', dest='parallel', 336 type='int', default=defaultParallel, 337 help="number of devices to collect from in parallel") 338 self.parser.add_option('--cycletime', 339 dest='cycletime',default=720,type='int', 340 help="run collection every x minutes") 341 self.parser.add_option('--ignore', 342 dest='ignorePlugins',default="", 343 help="Comma separated list of collection maps to ignore") 344 self.parser.add_option('--collect', 345 dest='collectPlugins',default="", 346 help="Comma separated list of collection maps to use") 347 self.parser.add_option('-p', '--path', dest='path', 348 help="start path for collection ie /NetworkDevices") 349 self.parser.add_option('-d', '--device', dest='device', 350 help="fully qualified device name ie www.confmon.com") 351 self.parser.add_option('-a', '--collage', 352 dest='collage', default=0, type='float', 353 help="do not collect from devices whose collect date " + 354 "is within this many minutes") 355 self.parser.add_option('--writetries', 356 dest='writetries',default=2,type='int', 357 help="number of times to try to write if a " 358 "read conflict is found") 359 self.parser.add_option("-F", "--force", 360 dest="force", action='store_true', default=False, 361 help="force collection of config data " 362 "(even without change to the device)") 363 self.parser.add_option('--portscantimeout', dest='portscantimeout', 364 type='int', default=defaultPortScanTimeout, 365 help="time to wait for connection failures when port scanning") 366 self.parser.add_option('--now', 367 dest='now', action="store_true", default=False, 368 help="start daemon now, do not sleep before starting") 369 self.parser.add_option('--monitor', 370 dest='monitor', 371 help='Name of monitor instance to use for configuration.') 372 TelnetClient.buildOptions(self.parser, self.usage)
373 374 375
376 - def processOptions(self):
377 if not self.options.path and not self.options.device: 378 self.options.path = "/Devices" 379 if self.options.ignorePlugins and self.options.collectPlugins: 380 raise SystemExit("--ignore and --collect are mutually exclusive")
381 382
383 - def timeoutClients(self):
384 reactor.callLater(1, self.timeoutClients) 385 active = [] 386 for client in self.clients: 387 if client.timeout < time.time(): 388 self.log.warn("client %s timeout", client.hostname) 389 self.finished.append(client) 390 else: 391 active.append(client) 392 self.clients = active 393 self.fillCollectionSlots() 394 self.checkStop()
395 396
397 - def reactorLoop(self):
398 reactor.startRunning(installSignalHandlers=False) 399 while reactor.running: 400 try: 401 while reactor.running: 402 reactor.runUntilCurrent() 403 reactor.doIteration(0) 404 timeout = reactor.timeout() 405 self.slowDown = timeout < 0.01 406 reactor.doIteration(timeout) 407 except: 408 self.log.exception("Unexpected error in main loop.")
409 410
411 - def stop(self):
412 """Stop ZenModeler make sure reactor is stopped, join with 413 applyData thread and close the zeo connection. 414 """ 415 self.log.info("stopping...") 416 self.applyData.stop() 417 transaction.abort() 418 reactor.callLater(0., reactor.crash)
419 420
421 - def mainLoop(self):
422 if self.options.cycle: 423 reactor.callLater(self.cycletime, self.mainLoop) 424 425 if self.clients: 426 self.log.error("modeling cycle taking too long") 427 return 428 429 self.log.info("starting collector loop") 430 self.app._p_jar.sync() 431 self.start = time.time() 432 if self.options.device: 433 self.log.info("collecting for device %s", self.options.device) 434 self.devicegen = iter([self.resolveDevice(self.options.device)]) 435 elif self.options.monitor: 436 self.log.info("collecting for monitor %s", self.options.monitor) 437 self.devicegen = getattr(self.dmd.Monitors.Performance, 438 self.options.monitor).devices.objectValuesGen() 439 else: 440 self.log.info("collecting for path %s", self.options.path) 441 root = self.dmd.Devices.getOrganizer(self.options.path) 442 self.devicegen = root.getSubDevicesGen() 443 self.fillCollectionSlots()
444 445
446 - def sigTerm(self, *unused):
447 'controlled shutdown of main loop on interrupt' 448 try: 449 ZCmdBase.sigTerm(self, *unused) 450 except SystemExit: 451 pass
452
453 - def main(self):
454 self.finished = [] 455 self.mainLoop() 456 self.timeoutClients()
457
458 - def collectSingle(self, device):
459 self.finished = [] 460 self.start = time.time() 461 self.devicegen = iter([self.resolveDevice(device)]) 462 self.fillCollectionSlots() 463 self.timeoutClients()
464 465 if __name__ == '__main__': 466 dc = ZenModeler() 467 dc.processOptions() 468 dc.main() 469 dc.reactorLoop() 470