Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__=''' ZenCommand 
 15   
 16  Run Command pluggins periodically. 
 17   
 18  $Id$''' 
 19   
 20  __version__ = "$Revision$"[11:-2] 
 21   
 22  import time 
 23  import logging 
 24  log = logging.getLogger("zen.zencommand") 
 25   
 26  from twisted.internet import reactor, defer 
 27  from twisted.internet.protocol import ProcessProtocol 
 28  from twisted.python import failure 
 29   
 30  import Globals 
 31  from Products.ZenUtils.Driver import drive, driveLater 
 32  from Products.ZenEvents import Event 
 33   
 34  from Products.ZenRRD.RRDDaemon import RRDDaemon 
 35  from Products.ZenRRD.Thresholds import Thresholds 
 36  from Products.ZenRRD.RRDUtil import RRDUtil 
 37  from Products.DataCollector.SshClient import SshClient 
 38   
 39  from sets import Set 
 40   
 41  import re 
 42  # how to parse each value from a nagios command 
 43  NagParser = re.compile(r"""([^ =']+|'(.*)'+)=([-0-9.]+)([^;]*;?){0,5}""") 
 44  # how to parse each value from a cacti command 
 45  CacParser = re.compile(r"""([^ :']+|'(.*)'+):([-0-9.]+)""") 
 46   
 47  MAX_CONNECTIONS=256 
 48   
 49  EXIT_CODE_MAPPING = { 
 50      0:'Success', 
 51      1:'General error', 
 52      2:'Misuse of shell builtins', 
 53      126:'Command invoked cannot execute, permissions problem or command is not an executable', 
 54      127:'Command not found', 
 55      128:'Invalid argument to exit, exit takes only integers in the range 0-255', 
 56      130:'Fatal error signal: 2, Command terminated by Control-C' 
 57  } 
 58   
59 -class CommandConfig:
60 - def __init__(self, dictionary):
61 d = dictionary.copy() 62 d['self'] = self 63 self.__dict__.update(dictionary)
64 65
66 -class TimeoutError(Exception):
67 "Error for a defered call taking too long to complete" 68
69 - def __init__(self, *args):
70 Exception.__init__(self) 71 self.args = args
72 73
74 -def Timeout(deferred, seconds, obj):
75 "Cause an error on a deferred when it is taking too long to complete" 76 77 def _timeout(deferred, obj): 78 "took too long... call an errback" 79 deferred.errback(failure.Failure(TimeoutError(obj)))
80 81 82 def _cb(arg, timer): 83 "the command finished, possibly by timing out" 84 if not timer.called: 85 timer.cancel() 86 return arg 87 88 89 timer = reactor.callLater(seconds, _timeout, deferred, obj) 90 deferred.addBoth(_cb, timer) 91 return deferred 92 93
94 -class ProcessRunner(ProcessProtocol):
95 "Provide deferred process execution" 96 stopped = None 97 exitCode = None 98 output = '' 99
100 - def start(self, cmd):
101 "Kick off the process: run it local" 102 log.debug('running %r' % cmd.command) 103 104 import string 105 shell = '/bin/sh' 106 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 107 self.command = string.join(self.cmdline, ' ') 108 log.debug('cmd line: %r' % self.command) 109 reactor.spawnProcess(self, shell, self.cmdline, env=None) 110 111 d = Timeout(defer.Deferred(), cmd.commandTimeout, cmd) 112 self.stopped = d 113 self.stopped.addErrback(self.timeout) 114 return d
115
116 - def timeout(self, value):
117 "Kill a process if it takes too long" 118 try: 119 self.transport.signalProcess('KILL') 120 except ProcessExitedAlready: 121 log.debug("Command already exited on device %s: %s" % (cmd.device, 122 cmd.command)) 123 return value
124 125
126 - def outReceived(self, data):
127 "Store up the output as it arrives from the process" 128 self.output += data
129 130
131 - def processEnded(self, reason):
132 "notify the starter that their process is complete" 133 self.exitCode = reason.value.exitCode 134 log.debug('Received exit code: %s' % self.exitCode) 135 log.debug('Command: %r' % self.command) 136 log.debug('Output: %r' % self.output) 137 138 self.output = [s.strip() for s in self.output.split('\n')][0] 139 if self.stopped: 140 d, self.stopped = self.stopped, None 141 if not d.called: 142 d.callback(self)
143 144
145 -class MySshClient(SshClient):
146 "Connection to SSH server at the remote device" 147
148 - def __init__(self, *args, **kw):
149 SshClient.__init__(self, *args, **kw) 150 self.defers = {}
151 152
153 - def addCommand(self, command):
154 "Run a command against the server" 155 d = defer.Deferred() 156 self.defers[command] = d 157 SshClient.addCommand(self, command) 158 return d
159 160
161 - def addResult(self, command, data, code):
162 "Forward the results of the command execution to the starter" 163 SshClient.addResult(self, command, data, code) 164 d = self.defers.pop(command) 165 if not d.called: 166 d.callback((data, code))
167 168
169 - def check(self, ip, timeout=2):
170 "Turn off blocking SshClient.test method" 171 return True
172 173
174 - def clientFinished(self):
175 "We don't need to track commands/results when they complete" 176 SshClient.clientFinished(self) 177 self.commands = [] 178 self.results = []
179 180
181 -class SshPool:
182 "Cache all the Ssh connections so they can be managed" 183
184 - def __init__(self):
185 self.pool = {}
186 187
188 - def get(self, cmd):
189 "Make an ssh connection if there isn't one available" 190 result = self.pool.get(cmd.device, None) 191 if result is None: 192 log.debug("Creating connection to %s", cmd.device) 193 options = Options(cmd.username, cmd.password, 194 cmd.loginTimeout, cmd.commandTimeout, cmd.keyPath) 195 # New param KeyPath 196 result = MySshClient(cmd.device, cmd.ipAddress, cmd.port, 197 options=options) 198 result.run() 199 self.pool[cmd.device] = result 200 return result
201 202
203 - def _close(self, device):
204 "close the SSH connection to a device, if it exists" 205 c = self.pool.get(device, None) 206 if c: 207 log.debug("Closing connection to %s", device) 208 if c.transport: 209 c.transport.loseConnection() 210 del self.pool[device]
211 212
213 - def close(self, cmd):
214 "symetric close that matches get() method" 215 self._close(cmd.device)
216 217
218 - def trimConnections(self, schedule):
219 "reduce the number of connections using the schedule for guidance" 220 # compute device list in order of next use 221 devices = [] 222 for c in schedule: 223 if c.device not in devices: 224 devices.append(c.device) 225 # close as many devices as needed 226 while devices and len(self.pool) > MAX_CONNECTIONS: 227 self._close(devices.pop())
228 229
230 -class SshRunner:
231 "Run a single command across a cached Ssh connection" 232 exitCode = None 233 output = None 234
235 - def __init__(self, pool):
236 self.pool = pool
237 238
239 - def start(self, cmd):
240 "Initiate a command on the remote device" 241 self.defer = defer.Deferred() 242 c = self.pool.get(cmd) 243 try: 244 d = Timeout(c.addCommand(cmd.command), cmd.commandTimeout, cmd) 245 except Exception, ex: 246 log.warning('Error starting command: %s', ex) 247 self.pool.close(cmd) 248 return defer.fail(ex) 249 d.addErrback(self.timeout) 250 d.addBoth(self.processEnded) 251 return d
252 253
254 - def timeout(self, arg):
255 "Deal with slow executing command/connection (close it)" 256 cmd, = arg.value.args 257 # we could send a kill signal, but then we would need to track 258 # the command channel to send it to: just close the connection 259 self.pool.close(cmd) 260 return arg
261 262
263 - def processEnded(self, value):
264 "Deliver ourselves to the starter with the proper attributes" 265 if isinstance(value, failure.Failure): 266 return value 267 self.output, self.exitCode = value 268 return self
269 270
271 -class Cmd:
272 "Holds the config of every command to be run" 273 device = None 274 ipAddress = None 275 port = 22 276 username = None 277 password = None 278 command = None 279 useSsh = False 280 cycleTime = None 281 eventClass = None 282 eventKey = None 283 component = None 284 severity = 3 285 lastStart = 0 286 lastStop = 0 287 result = None 288 289
290 - def __init__(self):
291 self.points = {}
292
293 - def running(self):
294 return self.lastStop < self.lastStart
295 296
297 - def name(self):
298 cmd, args = (self.command + ' ').split(' ', 1) 299 cmd = cmd.split('/')[-1] 300 return '%s %s' % (cmd, args)
301 302
303 - def nextRun(self):
304 if self.running(): 305 return self.lastStart + self.cycleTime 306 return self.lastStop + self.cycleTime
307 308
309 - def start(self, pool):
310 if self.useSsh: 311 pr = SshRunner(pool) 312 else: 313 pr = ProcessRunner() 314 d = pr.start(self) 315 self.lastStart = time.time() 316 log.debug('Process %s started' % self.name()) 317 d.addBoth(self.processEnded) 318 return d
319 320
321 - def processEnded(self, pr):
322 self.result = pr 323 self.lastStop = time.time() 324 if not isinstance(pr, failure.Failure): 325 log.debug('Process %s stopped (%s), %f elapsed' % ( 326 self.name(), 327 pr.exitCode, 328 self.lastStop - self.lastStart)) 329 return self 330 return pr
331 332
333 - def updateConfig(self, cfg):
334 self.lastChange = cfg.lastChange 335 self.device = cfg.device 336 self.ipAddress = cfg.ipAddress 337 self.port = cfg.port 338 self.username = str(cfg.username) 339 self.password = str(cfg.password) 340 self.loginTimeout = cfg.loginTimeout 341 self.commandTimeout = cfg.commandTimeout 342 self.keyPath = cfg.keyPath 343 self.useSsh = cfg.useSsh 344 self.cycleTime = max(cfg.cycleTime, 1) 345 self.eventKey = cfg.eventKey 346 self.eventClass = cfg.eventClass 347 self.component = cfg.component 348 self.severity = cfg.severity 349 self.command = str(cfg.command) 350 self.points, before = {}, self.points 351 for p in cfg.points: 352 self.points[p[0]] = p[1:]
353
354 -class Options:
355 loginTries=1 356 searchPath='' 357 existenceTest=None 358
359 - def __init__(self, username, password, loginTimeout, commandTimeout, keyPath):
360 self.username = username 361 self.password = password 362 self.loginTimeout=loginTimeout 363 self.commandTimeout=commandTimeout 364 self.keyPath = keyPath
365 366
367 -class zencommand(RRDDaemon):
368 369 initialServices = RRDDaemon.initialServices + ['CommandConfig'] 370
371 - def __init__(self):
372 RRDDaemon.__init__(self, 'zencommand') 373 self.schedule = [] 374 self.timeout = None 375 self.deviceIssues = Set() 376 self.pool = SshPool()
377
378 - def remote_deleteDevice(self, doomed):
379 self.log.debug("Async delete device %s" % doomed) 380 self.schedule = [c for c in self.schedule if c.device != doomed]
381
382 - def remote_updateConfig(self, config):
383 self.log.debug("Async configuration update") 384 self.updateConfig([config], [config[1]])
385
386 - def remote_updateDeviceList(self, devices):
387 self.log.debug("Async update device list %s" % devices) 388 updated = [] 389 lastChanges = dict(devices) # map device name to last change 390 keep = [] 391 for cmd in self.schedule: 392 if cmd.device in lastChanges: 393 if cmd.lastChange > lastChanges[cmd.device]: 394 updated.append(cmd.device) 395 keep.append(cmd) 396 else: 397 log.info("Removing all commands for %s", cmd.device) 398 self.schedule = keep 399 if updated: 400 log.info("Fetching the config for %s", updated) 401 d = self.model().callRemote('getDataSourceCommands', devices) 402 d.addCallback(self.updateConfig, updated) 403 d.addErrback(self.error)
404
405 - def updateConfig(self, config, expected):
406 expected = Set(expected) 407 current = {} 408 for c in self.schedule: 409 if c.device in expected: 410 current[c.device,c.command] = c 411 # keep all the commands we didn't ask for 412 update = [c for c in self.schedule if c.device not in expected] 413 for c in config: 414 (lastChange, device, ipAddress, port, 415 username, password, 416 loginTimeout, commandTimeout, 417 keyPath, maxOids, commandPart, threshs) = c 418 self.thresholds.updateList(threshs) 419 if self.options.device and self.options.device != device: 420 continue 421 for cmd in commandPart: 422 (useSsh, cycleTime, 423 component, eventClass, eventKey, severity, command, points) = cmd 424 obj = current.setdefault((device,command), Cmd()) 425 del current[(device,command)] 426 obj.updateConfig(CommandConfig(locals())) 427 update.append(obj) 428 for device, command in current.keys(): 429 log.info("Deleting command %s from %s", device, command) 430 self.schedule = update 431 self.processSchedule()
432
433 - def heartbeatCycle(self, *ignored):
434 "There is no master 'cycle' to send the hearbeat" 435 self.heartbeat() 436 reactor.callLater(self.heartBeatTimeout/3, self.heartbeatCycle)
437
438 - def processSchedule(self, *unused):
439 """Run through the schedule and start anything that needs to be done. 440 Set a timer if we have nothing to do. 441 """ 442 log.info("%s - schedule has %d commands", '-'*10, len(self.schedule)) 443 if not self.options.cycle: 444 for cmd in self.schedule: 445 if cmd.running() or cmd.lastStart == 0: 446 break 447 else: 448 self.stop() 449 return 450 try: 451 if self.timeout and not self.timeout.called: 452 self.timeout.cancel() 453 self.timeout = None 454 def compare(x, y): 455 return cmp(x.nextRun(), y.nextRun())
456 self.schedule.sort(compare) 457 self.pool.trimConnections(self.schedule) 458 earliest = None 459 running = 0 460 now = time.time() 461 for c in self.schedule: 462 if c.running(): 463 running += 1 464 for c in self.schedule: 465 if running >= self.options.parallel: 466 break 467 if c.nextRun() <= now: 468 c.start(self.pool).addBoth(self.finished) 469 running += 1 470 else: 471 earliest = c.nextRun() - now 472 break 473 if earliest is not None: 474 log.debug("Next command in %f seconds", earliest) 475 self.timeout = reactor.callLater(max(1, earliest), 476 self.processSchedule) 477 except Exception, ex: 478 log.exception(ex)
479 480
481 - def finished(self, cmd):
482 if isinstance(cmd, failure.Failure): 483 self.error(cmd) 484 else: 485 self.parseResults(cmd) 486 self.processSchedule()
487 488
489 - def error(self, err):
490 if isinstance(err.value, TimeoutError): 491 cmd, = err.value.args 492 msg = "Command timed out on device %s: %s" % (cmd.device, cmd.command) 493 log.warning(msg) 494 issueKey = cmd.device, cmd.eventClass, cmd.eventKey 495 self.deviceIssues.add(issueKey) 496 self.sendEvent(dict(device=cmd.device, 497 eventClass=cmd.eventClass, 498 eventKey=cmd.eventKey, 499 component=cmd.component, 500 severity=cmd.severity, 501 summary=msg)) 502 else: 503 log.exception(err.value)
504
505 - def getExitMessage(self, exitCode):
506 if exitCode in EXIT_CODE_MAPPING.keys(): 507 return EXIT_CODE_MAPPING[exitCode] 508 elif exitCode >= 255: 509 return 'Exit status out of range, exit takes only integer arguments in the range 0-255' 510 elif exitCode > 128: 511 return 'Fatal error signal: %s' % (exitCode-128) 512 return 'Unknown error code: %s' % exitCode
513
514 - def parseResults(self, cmd):
515 log.debug('The result of "%s" was "%s"', cmd.command, cmd.result.output) 516 output = cmd.result.output 517 exitCode = cmd.result.exitCode 518 severity = cmd.severity 519 issueKey = cmd.device, cmd.eventClass, cmd.eventKey 520 if output.find('|') >= 0: 521 msg, values = output.split('|', 1) 522 elif CacParser.search(output): 523 msg, values = '', output 524 else: 525 msg, values = output, '' 526 msg = msg.strip() or 'Cmd: %s - Code: %s - Msg: %s' % (cmd.command, exitCode, self.getExitMessage(exitCode)) 527 if exitCode == 0: 528 severity = 0 529 elif exitCode == 2: 530 severity = min(severity + 1, 5) 531 if severity or issueKey in self.deviceIssues: 532 self.sendThresholdEvent(device=cmd.device, 533 summary=msg, 534 severity=severity, 535 message=msg, 536 performanceData=values, 537 eventKey=cmd.eventKey, 538 eventClass=cmd.eventClass, 539 component=cmd.component) 540 self.deviceIssues.add(issueKey) 541 if severity == 0: 542 self.deviceIssues.discard(issueKey) 543 544 for value in values.split(' '): 545 if value.find('=') > 0: 546 parts = NagParser.match(value) 547 else: 548 parts = CacParser.match(value) 549 if not parts: continue 550 label = parts.group(1).replace("''", "'") 551 try: 552 value = float(parts.group(3)) 553 except: 554 value = 'U' 555 if cmd.points.has_key(label): 556 path, type, command, (minv, maxv) = cmd.points[label] 557 log.debug("storing %s = %s in: %s" % (label, value, path)) 558 value = self.rrd.save(path, value, type, command, 559 cmd.cycleTime, minv, maxv) 560 log.debug("rrd save result: %s" % value) 561 for ev in self.thresholds.check(path, time.time(), value): 562 ev['eventKey'] = cmd.eventKey 563 ev['eventClass'] = cmd.eventClass 564 ev['component'] = cmd.component 565 self.sendThresholdEvent(**ev)
566
567 - def fetchConfig(self):
568 def doFetchConfig(driver): 569 try: 570 yield self.model().callRemote('propertyItems') 571 self.setPropertyItems(driver.next()) 572 573 yield self.model().callRemote('getDefaultRRDCreateCommand') 574 createCommand = driver.next() 575 576 yield self.model().callRemote('getThresholdClasses') 577 self.remote_updateThresholdClasses(driver.next()) 578 579 devices = [] 580 if self.options.device: 581 devices = [self.options.device] 582 yield self.model().callRemote('getDataSourceCommands', 583 devices) 584 if not devices: 585 devices = list(Set([c.device for c in self.schedule])) 586 self.updateConfig(driver.next(), devices) 587 588 self.rrd = RRDUtil(createCommand, 60) 589 590 except Exception, ex: 591 log.exception(ex) 592 raise
593 594 return drive(doFetchConfig) 595 596
597 - def start(self, driver):
598 """Fetch the configuration and return a deferred for its completion. 599 Also starts the config cycle""" 600 ex = None 601 try: 602 log.debug('Fetching config') 603 yield self.fetchConfig() 604 driver.next() 605 log.debug('Finished config fetch') 606 except Exception, ex: 607 log.exception(ex) 608 driveLater(self.configCycleInterval * 60, self.start) 609 if ex: 610 raise ex
611 612
613 - def buildOptions(self):
614 RRDDaemon.buildOptions(self) 615 616 self.parser.add_option('--parallel', dest='parallel', 617 default=10, type='int', 618 help="number of devices to collect at one time")
619
620 - def connected(self):
621 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop) 622 if self.options.cycle: 623 d.addCallback(self.heartbeatCycle)
624 625 626 if __name__ == '__main__': 627 z = zencommand() 628 z.run() 629