Package Products :: Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """ZenCommand 
 15   
 16  Run Command plugins periodically. 
 17   
 18  """ 
 19   
 20  import time 
 21  from pprint import pformat 
 22  import logging 
 23  log = logging.getLogger("zen.zencommand") 
 24  from sets import Set 
 25   
 26  from twisted.internet import reactor, defer, error 
 27  from twisted.internet.protocol import ProcessProtocol 
 28  from twisted.python import failure 
 29  from twisted.spread import pb 
 30   
 31  import Globals 
 32  from Products.ZenUtils.Driver import drive, driveLater 
 33  from Products.ZenUtils.Utils import unused, getExitMessage 
 34  from Products.ZenRRD.RRDDaemon import RRDDaemon 
 35  from Products.ZenRRD.RRDUtil import RRDUtil 
 36  from Products.DataCollector.SshClient import SshClient 
 37  from Products.ZenEvents.ZenEventClasses import Clear, Error 
 38  from Products.ZenRRD.CommandParser import ParsedResults 
 39   
 40  from Products.DataCollector import Plugins 
 41  unused(Plugins) 
 42  MAX_CONNECTIONS = 256 
 43   
 44   
 45   
46 -class TimeoutError(Exception):
47 """ 48 Error for a defered call taking too long to complete 49 """ 50
51 - def __init__(self, *args):
52 Exception.__init__(self) 53 self.args = args
54 55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete" 58 59 def _timeout(deferred, obj): 60 "took too long... call an errback" 61 deferred.errback(failure.Failure(TimeoutError(obj)))
62 63 64 def _cb(arg, timer): 65 "the command finished, possibly by timing out" 66 if not timer.called: 67 timer.cancel() 68 return arg 69 70 71 timer = reactor.callLater(seconds, _timeout, deferred, obj) 72 deferred.addBoth(_cb, timer) 73 return deferred 74 75
76 -class ProcessRunner(ProcessProtocol):
77 """ 78 Provide deferred process execution 79 """ 80 stopped = None 81 exitCode = None 82 output = '' 83
84 - def start(self, cmd):
85 "Kick off the process: run it local" 86 log.debug('running %r' % cmd.command) 87 88 shell = '/bin/sh' 89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 90 self.command = ' '.join(self.cmdline) 91 log.debug('cmd line: %r' % self.command) 92 reactor.spawnProcess(self, shell, self.cmdline, env=None) 93 94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd) 95 self.stopped = d 96 self.stopped.addErrback(self.timeout) 97 return d
98
99 - def timeout(self, value):
100 "Kill a process if it takes too long" 101 try: 102 self.transport.signalProcess('KILL') 103 except error.ProcessExitedAlready: 104 log.debug("Command already exited: %s" % self.command) 105 return value
106 107
108 - def outReceived(self, data):
109 "Store up the output as it arrives from the process" 110 self.output += data
111 112
113 - def processEnded(self, reason):
114 "notify the starter that their process is complete" 115 self.exitCode = reason.value.exitCode 116 log.debug('Received exit code: %s' % self.exitCode) 117 log.debug('Command: %r' % self.command) 118 log.debug('Output: %r' % self.output) 119 120 if self.stopped: 121 d, self.stopped = self.stopped, None 122 if not d.called: 123 d.callback(self)
124 125
126 -class MySshClient(SshClient):
127 """ 128 Connection to SSH server at the remote device 129 """ 130
131 - def __init__(self, *args, **kw):
132 SshClient.__init__(self, *args, **kw) 133 self.defers = {}
134
135 - def addCommand(self, command):
136 "Run a command against the server" 137 d = defer.Deferred() 138 self.defers[command] = d 139 SshClient.addCommand(self, command) 140 return d
141 142
143 - def addResult(self, command, data, code):
144 "Forward the results of the command execution to the starter" 145 # don't call the CollectorClient.addResult which adds the result to a 146 # member variable for zenmodeler 147 d = self.defers.pop(command, None) 148 if d is None: 149 log.error("Internal error where deferred object not in dictionary." \ 150 " Command = '%s' Data = '%s' Code = '%s'", 151 command, data, code) 152 elif not d.called: 153 d.callback((data, code))
154 155
156 - def check(self, ip, timeout=2):
157 "Turn off blocking SshClient.test method" 158 return True
159 160
161 - def clientFinished(self):
162 "We don't need to track commands/results when they complete" 163 SshClient.clientFinished(self) 164 self.cmdmap = {} 165 self._commands = [] 166 self.results = []
167 168
169 -class SshPool:
170 """ 171 Cache all the Ssh connections so they can be managed 172 """ 173
174 - def __init__(self):
175 self.pool = {} 176 self.eventSender = None
177
178 - def get(self, cmd):
179 "Make a new SSH connection if there isn't one available" 180 dc = cmd.deviceConfig 181 result = self.pool.get(dc.device, None) 182 if result is None: 183 log.debug("Creating connection to %s", dc.device) 184 options = Options(dc.username, dc.password, 185 dc.loginTimeout, dc.commandTimeout, 186 dc.keyPath, dc.concurrentSessions) 187 # New param KeyPath 188 result = MySshClient(dc.device, dc.ipAddress, dc.port, 189 options=options) 190 if self.eventSender is not None: 191 result.sendEvent = self.eventSender.sendEvent 192 result.run() 193 self.pool[dc.device] = result 194 return result
195
197 for collectorClient in self.pool.values(): 198 collectorClient.reinitialize()
199
200 - def _close(self, device):
201 "close the SSH connection to a device, if it exists" 202 c = self.pool.get(device, None) 203 if c: 204 log.debug("Closing connection to %s", device) 205 if c.transport: 206 c.transport.loseConnection() 207 del self.pool[device]
208 209
210 - def close(self, cmd):
211 "symetric close that matches get() method" 212 self._close(cmd.deviceConfig.device)
213 214
215 - def trimConnections(self, schedule):
216 "reduce the number of connections using the schedule for guidance" 217 # compute device list in order of next use 218 devices = [] 219 for c in schedule: 220 device = c.deviceConfig.device 221 if device not in devices: 222 devices.append(device) 223 # close as many devices as needed 224 while devices and len(self.pool) > MAX_CONNECTIONS: 225 self._close(devices.pop())
226 227
228 -class SshRunner:
229 """ 230 Run a single command across a cached SSH connection 231 """ 232 exitCode = None 233 output = None 234
235 - def __init__(self, pool):
236 self.pool = pool
237 238
239 - def start(self, cmd):
240 "Initiate a command on the remote device" 241 self.defer = defer.Deferred() 242 c = self.pool.get(cmd) 243 try: 244 d = Timeout(c.addCommand(cmd.command), 245 cmd.deviceConfig.commandTimeout, 246 cmd) 247 except Exception, ex: 248 log.warning('Error starting command: %s', ex) 249 self.pool.close(cmd) 250 return defer.fail(ex) 251 d.addErrback(self.timeout) 252 d.addBoth(self.processEnded) 253 return d
254 255
256 - def timeout(self, arg):
257 "Deal with slow executing command/connection (close it)" 258 cmd, = arg.value.args 259 # we could send a kill signal, but then we would need to track 260 # the command channel to send it to: just close the connection 261 self.pool.close(cmd) 262 return arg
263 264
265 - def processEnded(self, value):
266 "Deliver ourselves to the starter with the proper attributes" 267 if isinstance(value, failure.Failure): 268 return value 269 self.output, self.exitCode = value 270 return self
271 272
273 -class DeviceConfig(pb.Copyable, pb.RemoteCopy):
274 lastChange = 0. 275 device = '' 276 ipAddress = '' 277 port = 0 278 username = '' 279 password = '' 280 loginTimeout = 0. 281 commandTimeout = 0. 282 keyPath = ''
283 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig) 284 285
286 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
287 id = '' 288 component = '' 289 rrdPath = '' 290 rrdType = None 291 rrdCreateCommand = '' 292 rrdMin = None 293 rrdMax = None 294
295 - def __init__(self):
296 self.data = {}
297
298 - def __repr__(self):
299 return pformat((self.data, self.id))
300 301 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 302
303 -class Cmd(pb.Copyable, pb.RemoteCopy):
304 """ 305 Holds the config of every command to be run 306 """ 307 command = None 308 useSsh = False 309 cycleTime = None 310 eventClass = None 311 eventKey = None 312 severity = 3 313 lastStart = 0 314 lastStop = 0 315 result = None 316 317
318 - def __init__(self):
319 self.points = []
320
321 - def running(self):
322 return self.lastStop < self.lastStart
323 324
325 - def name(self):
326 cmd, args = (self.command + ' ').split(' ', 1) 327 cmd = cmd.split('/')[-1] 328 return '%s %s' % (cmd, args)
329 330
331 - def nextRun(self):
332 if self.running(): 333 return self.lastStart + self.cycleTime 334 return self.lastStop + self.cycleTime
335 336
337 - def start(self, pool):
338 if self.useSsh: 339 pr = SshRunner(pool) 340 else: 341 pr = ProcessRunner() 342 d = pr.start(self) 343 self.lastStart = time.time() 344 log.debug('Process %s started' % self.name()) 345 d.addBoth(self.processEnded) 346 return d
347 348
349 - def processEnded(self, pr):
350 self.result = pr 351 self.lastStop = time.time() 352 353 # Check for a condition that could cause zencommand to stop cycling. 354 # http://dev.zenoss.org/trac/ticket/4936 355 if self.lastStop < self.lastStart: 356 log.debug('System clock went back?') 357 self.lastStop = self.lastStart 358 359 if not isinstance(pr, failure.Failure): 360 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % ( 361 self.name(), 362 pr.exitCode, 363 self.lastStop - self.lastStart)) 364 return self 365 return pr
366 367
368 - def updateConfig(self, cfg, deviceConfig):
369 self.deviceConfig = deviceConfig 370 self.useSsh = cfg.useSsh 371 self.cycleTime = max(int(cfg.cycleTime), 1) 372 self.eventKey = cfg.eventKey 373 self.eventClass = cfg.eventClass 374 self.severity = cfg.severity 375 self.command = str(cfg.command) 376 self.points = cfg.points 377 if cfg.severity is None: 378 log.warning("severity is None: cfg,%r ; deviceConfig, %r", 379 cfg, deviceConfig) 380 return self
381
382 - def getEventKey(self, point):
383 # fetch datapoint name from filename path and add it to the event key 384 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
385
386 - def commandKey(self):
387 "Provide a value that establishes the uniqueness of this command" 388 return '%'.join(map(str, [self.useSsh, self.cycleTime, 389 self.severity, self.command]))
390 391 pb.setUnjellyableForClass(Cmd, Cmd) 392
393 -class Options:
394 loginTries=1 395 searchPath='' 396 existenceTest=None 397
398 - def __init__(self, username, password, loginTimeout, commandTimeout, 399 keyPath, concurrentSessions):
400 self.username = username 401 self.password = password 402 self.loginTimeout=loginTimeout 403 self.commandTimeout=commandTimeout 404 self.keyPath = keyPath 405 self.concurrentSessions = concurrentSessions
406 407
408 -class ConfigurationProcessor(object):
409
410 - def __init__(self, config, scheduledCmds, sendEvent):
411 """ 412 config - one of the configurations that was returned by calling 413 getDataSourceCommands on zenhub. 414 scheduledCmds - a dictionary that maps (device, command) to Cmd object 415 for the commands currently on zencommand's schedule. 416 sendEvent - a function that sends events to zenhub using twisted 417 perspective broker. 418 """ 419 self._config = config 420 self._scheduledCmds = scheduledCmds 421 self._sendEvent = sendEvent 422 self._device = config.device 423 self._suppressed = [] # log warning and send event once per device 424 self._summary = 'zCommandUsername is not set'
425
426 - def _sendUsernameEvent(self, severity):
427 "send an event (or clear it) for username not set" 428 self._sendEvent(dict( 429 device=self._device, 430 eventClass='/Cmd/Fail', 431 eventKey='zCommandUsername', 432 severity=severity, 433 component='zencommand', 434 summary=self._summary))
435
436 - def _warnUsernameNotSet(self, command):
437 """ 438 Warn that the username is not set for device and the SSH command cannot be 439 executed. 440 """ 441 if self._device not in self._suppressed: 442 log.warning(self._summary + ' for %s' % self._device) 443 self._sendUsernameEvent(Error) 444 msg = 'username not configured for %s. skipping %s.' 445 log.debug(msg % (self._device, command))
446
447 - def updateCommands(self):
448 """ 449 Go through the Cmd objects in config.commands and update the config on 450 the command with config. If currentDict has the command on it then use 451 that one, otherwise use the command from config. If the device does not have a 452 username set, then don't yield commands that use SSH. 453 """ 454 for cmd in self._config.commands: 455 key = (self._device, cmd.command) 456 if cmd.useSsh: 457 if self._config.username: 458 self._sendUsernameEvent(Clear) 459 else: 460 self._warnUsernameNotSet(cmd.command) 461 if self._device not in self._suppressed: 462 self._suppressed.append(self._device) 463 if key in self._scheduledCmds: 464 del self._scheduledCmds[key] 465 continue 466 if key in self._scheduledCmds: 467 newCmd = self._scheduledCmds.pop(key) 468 else: 469 newCmd = cmd 470 yield newCmd.updateConfig(cmd, self._config) 471 self._suppressed = []
472 473
474 -class zencommand(RRDDaemon):
475 """ 476 Daemon code to schedule commands and run them. 477 """ 478 479 initialServices = RRDDaemon.initialServices + ['CommandConfig'] 480
481 - def __init__(self):
482 RRDDaemon.__init__(self, 'zencommand') 483 self.schedule = [] 484 self.timeout = None 485 self.pool = SshPool() 486 self.pool.eventSender = self 487 self.executed = 0
488
489 - def remote_deleteDevice(self, doomed):
490 self.log.debug("zenhub has asked us to delete device %s" % doomed) 491 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
492
493 - def remote_updateConfig(self, config):
494 self.log.debug("Configuration update from zenhub") 495 self.log.info("config: %r", config) 496 self.updateConfig([config], [config.device])
497
498 - def remote_updateDeviceList(self, devices):
499 self.log.debug("zenhub sent updated device list %s" % devices) 500 updated = [] 501 lastChanges = dict(devices) # map device name to last change 502 keep = [] 503 for cmd in self.schedule: 504 if cmd.deviceConfig.device in lastChanges: 505 if cmd.lastChange > lastChanges[cmd.device]: 506 updated.append(cmd.deviceConfig.device) 507 keep.append(cmd) 508 else: 509 self.log.info("Removing all commands for %s", cmd.deviceConfig.device) 510 self.schedule = keep 511 if updated: 512 self.log.info("Fetching the config for %s", updated) 513 d = self.model().callRemote('getDataSourceCommands', devices) 514 d.addCallback(self.updateConfig, updated) 515 d.addErrback(self.error)
516
517 - def updateConfig(self, configs, expected):
518 expected = Set(expected) 519 current = {} 520 for c in self.schedule: 521 if c.deviceConfig.device in expected: 522 current[c.deviceConfig.device,c.command] = c 523 # keep all the commands we didn't ask for 524 update = [c for c in self.schedule if c.deviceConfig.device not in expected] 525 for cfg in configs: 526 self.thresholds.updateForDevice(cfg.device, cfg.thresholds) 527 if self.options.device and self.options.device != cfg.device: 528 continue 529 processor = ConfigurationProcessor(cfg, current, self.sendEvent) 530 update.extend(processor.updateCommands()) 531 for device, command in current.keys(): 532 self.log.info("Deleting command %s from %s", device, command) 533 self.schedule = update 534 self.processSchedule()
535
536 - def heartbeatCycle(self, *ignored):
537 "There is no master 'cycle' to send the hearbeat" 538 self.heartbeat() 539 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle) 540 events = [] 541 events += self.rrdStats.gauge('schedule', 542 self.heartbeatTimeout, 543 len(self.schedule)) 544 events += self.rrdStats.counter('commands', 545 self.heartbeatTimeout, 546 self.executed) 547 events += self.rrdStats.counter('dataPoints', 548 self.heartbeatTimeout, 549 self.rrd.dataPoints) 550 events += self.rrdStats.gauge('cyclePoints', 551 self.heartbeatTimeout, 552 self.rrd.endCycle()) 553 self.sendEvents(events)
554 555
556 - def processSchedule(self, *unused):
557 """ 558 Run through the schedule and start anything that needs to be done. 559 Set a timer if we have nothing to do. 560 """ 561 if not self.options.cycle: 562 for cmd in self.schedule: 563 if cmd.running() or cmd.lastStart == 0: 564 break 565 else: 566 self.stop() 567 return 568 try: 569 if self.timeout and not self.timeout.called: 570 self.timeout.cancel() 571 self.timeout = None 572 def compare(x, y): 573 return cmp(x.nextRun(), y.nextRun())
574 self.schedule.sort(compare) 575 self.pool.trimConnections(self.schedule) 576 earliest = None 577 running = 0 578 now = time.time() 579 for c in self.schedule: 580 if c.running(): 581 running += 1 582 583 for c in self.schedule: 584 if running >= self.options.parallel: 585 break 586 if c.nextRun() <= now: 587 c.start(self.pool).addBoth(self.finished) 588 running += 1 589 else: 590 earliest = c.nextRun() - now 591 break 592 593 if earliest is not None: 594 self.pool.reinitializeCollectorClients() 595 self.log.debug("Next command in %d seconds", int(earliest)) 596 self.timeout = reactor.callLater(max(1, earliest), 597 self.processSchedule) 598 except Exception, ex: 599 self.log.exception(ex)
600 601
602 - def sendCmdEvent(self, cmd, severity, summary):
603 """ 604 Send an event using the info in the Cmd object. 605 """ 606 self.sendEvent(dict(device=cmd.deviceConfig.device, 607 component=cmd.component, 608 eventClass=cmd.eventClass, 609 eventKey=cmd.eventKey, 610 severity=severity, 611 summary=summary))
612
613 - def finished(self, cmdOrErr):
614 """ 615 The command has finished. cmdOrErr is either a Cmd instance or a 616 twisted failure. 617 """ 618 self.executed += 1 619 if isinstance(cmdOrErr, failure.Failure): 620 self.error(cmdOrErr) 621 else: 622 cmd = cmdOrErr 623 self._handleExitCode(cmd) 624 self.parseResults(cmd) 625 self.processSchedule()
626
627 - def _handleExitCode(self, cmd):
628 """ 629 zencommand handles sending clears for exit code 0, all other exit codes 630 should be handled by the parser associated with the command 631 """ 632 exitCode = cmd.result.exitCode 633 msg = 'Cmd: %s - Code: %s - Msg: %s' % ( 634 cmd.command, exitCode, getExitMessage(exitCode)) 635 if exitCode == 0: 636 self.sendCmdEvent(cmd, Clear, msg)
637
638 - def error(self, err):
639 """ 640 The finished method indicated that there was a failure. This method 641 is also called by RRDDaemon.errorStop. 642 """ 643 if isinstance(err.value, TimeoutError): 644 cmd, = err.value.args 645 msg = "Command timed out on device %s: %r" % ( 646 cmd.deviceConfig.device, cmd.command) 647 self.log.warning(msg) 648 self.sendCmdEvent(cmd, cmd.severity, msg) 649 else: 650 self.log.exception(err.value)
651
652 - def parseResults(self, cmd):
653 """ 654 Process the results of our command-line, send events 655 and check datapoints. 656 657 @param cmd: command 658 @type: cmd object 659 """ 660 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output) 661 results = ParsedResults() 662 try: 663 parser = cmd.parser.create() 664 except Exception, ex: 665 self.log.exception("Error loading parser %s" % cmd.parser) 666 import traceback 667 self.sendEvent(dict(device=cmd.deviceConfig.device, 668 summary="Error loading parser %s" % cmd.parser, 669 component="zencommand", 670 message=traceback.format_exc(), 671 agent="zencommand", 672 )) 673 return 674 parser.processResults(cmd, results) 675 676 for ev in results.events: 677 self.sendEvent(ev, device=cmd.deviceConfig.device) 678 679 for dp, value in results.values: 680 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath)) 681 value = self.rrd.save(dp.rrdPath, 682 value, 683 dp.rrdType, 684 dp.rrdCreateCommand, 685 cmd.cycleTime, 686 dp.rrdMin, 687 dp.rrdMax) 688 self.log.debug("RRD save result: %s" % value) 689 for ev in self.thresholds.check(dp.rrdPath, time.time(), value): 690 eventKey = cmd.getEventKey(dp) 691 if 'eventKey' in ev: 692 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 693 else: 694 ev['eventKey'] = eventKey 695 ev['component'] = dp.component 696 self.sendEvent(ev)
697
698 - def fetchConfig(self):
699 def doFetchConfig(driver): 700 try: 701 now = time.time() 702 703 yield self.model().callRemote('propertyItems') 704 self.setPropertyItems(driver.next()) 705 706 yield self.model().callRemote('getDefaultRRDCreateCommand') 707 createCommand = driver.next() 708 709 yield self.model().callRemote('getThresholdClasses') 710 self.remote_updateThresholdClasses(driver.next()) 711 712 713 yield self.model().callRemote('getCollectorThresholds') 714 self.rrdStats.config(self.options.monitor, 715 self.name, 716 driver.next(), 717 createCommand) 718 719 devices = [] 720 if self.options.device: 721 devices = [self.options.device] 722 yield self.model().callRemote('getDataSourceCommands', 723 devices) 724 if not devices: 725 devices = list(Set([c.deviceConfig.device 726 for c in self.schedule])) 727 self.updateConfig(driver.next(), devices) 728 729 self.rrd = RRDUtil(createCommand, 60) 730 731 self.sendEvents( 732 self.rrdStats.gauge('configTime', 733 self.configCycleInterval * 60, 734 time.time() - now)) 735 736 except Exception, ex: 737 self.log.exception(ex) 738 raise
739 740 return drive(doFetchConfig) 741 742
743 - def start(self, driver):
744 """ 745 Fetch the configuration and return a deferred for its completion. 746 Also starts the config cycle 747 """ 748 ex = None 749 try: 750 self.log.debug('Fetching configuration from zenhub') 751 yield self.fetchConfig() 752 driver.next() 753 self.log.debug('Finished config fetch') 754 except Exception, ex: 755 self.log.exception(ex) 756 driveLater(self.configCycleInterval * 60, self.start) 757 if ex: 758 raise ex
759
760 - def buildOptions(self):
761 RRDDaemon.buildOptions(self) 762 763 self.parser.add_option('--parallel', dest='parallel', 764 default=10, type='int', 765 help="Number of devices to collect at one time")
766
767 - def connected(self):
768 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop) 769 if self.options.cycle: 770 d.addCallback(self.heartbeatCycle)
771 772 773 if __name__ == '__main__': 774 from Products.ZenRRD.zencommand import zencommand 775 z = zencommand() 776 z.run() 777