Package Products :: Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """ZenCommand 
 15   
 16  Run Command plugins periodically. 
 17   
 18  """ 
 19   
 20  import time 
 21  from pprint import pformat 
 22  import logging 
 23  log = logging.getLogger("zen.zencommand") 
 24  from sets import Set 
 25   
 26  from twisted.internet import reactor, defer, error 
 27  from twisted.internet.protocol import ProcessProtocol 
 28  from twisted.python import failure 
 29  from twisted.spread import pb 
 30   
 31  import Globals 
 32  from Products.ZenUtils.Driver import drive, driveLater 
 33  from Products.ZenUtils.Utils import unused, getExitMessage 
 34  from Products.ZenRRD.RRDDaemon import RRDDaemon 
 35  from Products.ZenRRD.RRDUtil import RRDUtil 
 36  from Products.DataCollector.SshClient import SshClient 
 37  from Products.ZenEvents.ZenEventClasses import Clear, Error 
 38  from Products.ZenRRD.CommandParser import ParsedResults 
 39   
 40  from Products.DataCollector import Plugins 
 41  unused(Plugins) 
 42  MAX_CONNECTIONS = 256 
 43   
 44   
 45   
46 -class TimeoutError(Exception):
47 """ 48 Error for a defered call taking too long to complete 49 """ 50
51 - def __init__(self, *args):
52 Exception.__init__(self) 53 self.args = args
54 55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete" 58 59 def _timeout(deferred, obj): 60 "took too long... call an errback" 61 deferred.errback(failure.Failure(TimeoutError(obj)))
62 63 64 def _cb(arg, timer): 65 "the command finished, possibly by timing out" 66 if not timer.called: 67 timer.cancel() 68 return arg 69 70 71 timer = reactor.callLater(seconds, _timeout, deferred, obj) 72 deferred.addBoth(_cb, timer) 73 return deferred 74 75
76 -class ProcessRunner(ProcessProtocol):
77 """ 78 Provide deferred process execution 79 """ 80 stopped = None 81 exitCode = None 82 output = '' 83
84 - def start(self, cmd):
85 "Kick off the process: run it local" 86 log.debug('running %r' % cmd.command) 87 88 shell = '/bin/sh' 89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 90 self.command = ' '.join(self.cmdline) 91 log.debug('cmd line: %r' % self.command) 92 reactor.spawnProcess(self, shell, self.cmdline, env=None) 93 94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd) 95 self.stopped = d 96 self.stopped.addErrback(self.timeout) 97 return d
98
99 - def timeout(self, value):
100 "Kill a process if it takes too long" 101 try: 102 self.transport.signalProcess('KILL') 103 except error.ProcessExitedAlready: 104 log.debug("Command already exited: %s" % self.command) 105 return value
106 107
108 - def outReceived(self, data):
109 "Store up the output as it arrives from the process" 110 self.output += data
111 112
113 - def processEnded(self, reason):
114 "notify the starter that their process is complete" 115 self.exitCode = reason.value.exitCode 116 log.debug('Received exit code: %s' % self.exitCode) 117 log.debug('Command: %r' % self.command) 118 log.debug('Output: %r' % self.output) 119 120 if self.stopped: 121 d, self.stopped = self.stopped, None 122 if not d.called: 123 d.callback(self)
124 125
126 -class MySshClient(SshClient):
127 """ 128 Connection to SSH server at the remote device 129 """ 130
131 - def __init__(self, *args, **kw):
132 SshClient.__init__(self, *args, **kw) 133 self.defers = {}
134
135 - def addCommand(self, command):
136 "Run a command against the server" 137 d = defer.Deferred() 138 self.defers[command] = d 139 SshClient.addCommand(self, command) 140 return d
141 142
143 - def addResult(self, command, data, code):
144 "Forward the results of the command execution to the starter" 145 SshClient.addResult(self, command, data, code) 146 d = self.defers.pop(command) 147 if not d.called: 148 d.callback((data, code))
149 150
151 - def check(self, ip, timeout=2):
152 "Turn off blocking SshClient.test method" 153 return True
154 155
156 - def clientFinished(self):
157 "We don't need to track commands/results when they complete" 158 SshClient.clientFinished(self) 159 self.commands = [] 160 self.results = []
161 162
163 -class SshPool:
164 """ 165 Cache all the Ssh connections so they can be managed 166 """ 167
168 - def __init__(self):
169 self.pool = {}
170 171
172 - def get(self, cmd):
173 "Make a new SSH connection if there isn't one available" 174 dc = cmd.deviceConfig 175 result = self.pool.get(dc.device, None) 176 if result is None: 177 log.debug("Creating connection to %s", dc.device) 178 options = Options(dc.username, dc.password, 179 dc.loginTimeout, dc.commandTimeout, 180 dc.keyPath, dc.concurrentSessions) 181 # New param KeyPath 182 result = MySshClient(dc.device, dc.ipAddress, dc.port, 183 options=options) 184 result.run() 185 self.pool[dc.device] = result 186 return result
187 188
189 - def _close(self, device):
190 "close the SSH connection to a device, if it exists" 191 c = self.pool.get(device, None) 192 if c: 193 log.debug("Closing connection to %s", device) 194 if c.transport: 195 c.transport.loseConnection() 196 del self.pool[device]
197 198
199 - def close(self, cmd):
200 "symetric close that matches get() method" 201 self._close(cmd.deviceConfig.device)
202 203
204 - def trimConnections(self, schedule):
205 "reduce the number of connections using the schedule for guidance" 206 # compute device list in order of next use 207 devices = [] 208 for c in schedule: 209 device = c.deviceConfig.device 210 if device not in devices: 211 devices.append(device) 212 # close as many devices as needed 213 while devices and len(self.pool) > MAX_CONNECTIONS: 214 self._close(devices.pop())
215 216
217 -class SshRunner:
218 """ 219 Run a single command across a cached SSH connection 220 """ 221 exitCode = None 222 output = None 223
224 - def __init__(self, pool):
225 self.pool = pool
226 227
228 - def start(self, cmd):
229 "Initiate a command on the remote device" 230 self.defer = defer.Deferred() 231 c = self.pool.get(cmd) 232 try: 233 d = Timeout(c.addCommand(cmd.command), 234 cmd.deviceConfig.commandTimeout, 235 cmd) 236 except Exception, ex: 237 log.warning('Error starting command: %s', ex) 238 self.pool.close(cmd) 239 return defer.fail(ex) 240 d.addErrback(self.timeout) 241 d.addBoth(self.processEnded) 242 return d
243 244
245 - def timeout(self, arg):
246 "Deal with slow executing command/connection (close it)" 247 cmd, = arg.value.args 248 # we could send a kill signal, but then we would need to track 249 # the command channel to send it to: just close the connection 250 self.pool.close(cmd) 251 return arg
252 253
254 - def processEnded(self, value):
255 "Deliver ourselves to the starter with the proper attributes" 256 if isinstance(value, failure.Failure): 257 return value 258 self.output, self.exitCode = value 259 return self
260 261
262 -class DeviceConfig(pb.Copyable, pb.RemoteCopy):
263 lastChange = 0. 264 device = '' 265 ipAddress = '' 266 port = 0 267 username = '' 268 password = '' 269 loginTimeout = 0. 270 commandTimeout = 0. 271 keyPath = ''
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig) 273 274
275 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
276 id = '' 277 component = '' 278 rrdPath = '' 279 rrdType = None 280 rrdCreateCommand = '' 281 rrdMin = None 282 rrdMax = None 283
284 - def __init__(self):
285 self.data = {}
286
287 - def __repr__(self):
288 return pformat((self.data, self.id))
289 290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 291
292 -class Cmd(pb.Copyable, pb.RemoteCopy):
293 """ 294 Holds the config of every command to be run 295 """ 296 command = None 297 useSsh = False 298 cycleTime = None 299 eventClass = None 300 eventKey = None 301 severity = 3 302 lastStart = 0 303 lastStop = 0 304 result = None 305 306
307 - def __init__(self):
308 self.points = []
309
310 - def running(self):
311 return self.lastStop < self.lastStart
312 313
314 - def name(self):
315 cmd, args = (self.command + ' ').split(' ', 1) 316 cmd = cmd.split('/')[-1] 317 return '%s %s' % (cmd, args)
318 319
320 - def nextRun(self):
321 if self.running(): 322 return self.lastStart + self.cycleTime 323 return self.lastStop + self.cycleTime
324 325
326 - def start(self, pool):
327 if self.useSsh: 328 pr = SshRunner(pool) 329 else: 330 pr = ProcessRunner() 331 d = pr.start(self) 332 self.lastStart = time.time() 333 log.debug('Process %s started' % self.name()) 334 d.addBoth(self.processEnded) 335 return d
336 337
338 - def processEnded(self, pr):
339 self.result = pr 340 self.lastStop = time.time() 341 342 # Check for a condition that could cause zencommand to stop cycling. 343 # http://dev.zenoss.org/trac/ticket/4936 344 if self.lastStop < self.lastStart: 345 log.debug('System clock went back?') 346 self.lastStop = self.lastStart 347 348 if not isinstance(pr, failure.Failure): 349 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % ( 350 self.name(), 351 pr.exitCode, 352 self.lastStop - self.lastStart)) 353 return self 354 return pr
355 356
357 - def updateConfig(self, cfg, deviceConfig):
358 self.deviceConfig = deviceConfig 359 self.useSsh = cfg.useSsh 360 self.cycleTime = max(cfg.cycleTime, 1) 361 self.eventKey = cfg.eventKey 362 self.eventClass = cfg.eventClass 363 self.severity = cfg.severity 364 self.command = str(cfg.command) 365 self.points = cfg.points 366 return self
367
368 - def getEventKey(self, point):
369 # fetch datapoint name from filename path and add it to the event key 370 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
371
372 - def commandKey(self):
373 "Provide a value that establishes the uniqueness of this command" 374 return '%'.join(map(str, [self.useSsh, self.cycleTime, 375 self.severity, self.command]))
376 377 pb.setUnjellyableForClass(Cmd, Cmd) 378
379 -class Options:
380 loginTries=1 381 searchPath='' 382 existenceTest=None 383
384 - def __init__(self, username, password, loginTimeout, commandTimeout, 385 keyPath, concurrentSessions):
386 self.username = username 387 self.password = password 388 self.loginTimeout=loginTimeout 389 self.commandTimeout=commandTimeout 390 self.keyPath = keyPath 391 self.concurrentSessions = concurrentSessions
392 393
394 -class ConfigurationProcessor(object):
395
396 - def __init__(self, config, scheduledCmds, sendEvent):
397 """ 398 config - one of the configurations that was returned by calling 399 getDataSourceCommands on zenhub. 400 scheduledCmds - a dictionary that maps (device, command) to Cmd object 401 for the commands currently on zencommand's schedule. 402 sendEvent - a function that sends events to zenhub using twisted 403 perspective broker. 404 """ 405 self._config = config 406 self._scheduledCmds = scheduledCmds 407 self._sendEvent = sendEvent 408 self._device = config.device 409 self._suppressed = [] # log warning and send event once per device 410 self._summary = 'zCommandUsername is not set'
411
412 - def _sendUsernameEvent(self, severity):
413 "send an event (or clear it) for username not set" 414 self._sendEvent(dict( 415 device=self._device, 416 eventClass='/Cmd/Fail', 417 eventKey='zCommandUsername', 418 severity=severity, 419 component='zencommand', 420 summary=self._summary))
421
422 - def _warnUsernameNotSet(self, command):
423 """ 424 Warn that the username is not set for device and the SSH command cannot be 425 executed. 426 """ 427 if self._device not in self._suppressed: 428 log.warning(self._summary + ' for %s' % self._device) 429 self._sendUsernameEvent(Error) 430 msg = 'username not configured for %s. skipping %s.' 431 log.debug(msg % (self._device, command))
432
433 - def updateCommands(self):
434 """ 435 Go through the Cmd objects in config.commands and update the config on 436 the command with config. If currentDict has the command on it then use 437 that one, otherwise use the command from config. If the device does not have a 438 username set, then don't yield commands that use SSH. 439 """ 440 for cmd in self._config.commands: 441 key = (self._device, cmd.command) 442 if cmd.useSsh: 443 if self._config.username: 444 self._sendUsernameEvent(Clear) 445 else: 446 self._warnUsernameNotSet(cmd.command) 447 if self._device not in self._suppressed: 448 self._suppressed.append(self._device) 449 if key in self._scheduledCmds: 450 del self._scheduledCmds[key] 451 continue 452 if key in self._scheduledCmds: 453 newCmd = self._scheduledCmds.pop(key) 454 else: 455 newCmd = cmd 456 yield newCmd.updateConfig(cmd, self._config) 457 self._suppressed = []
458 459
460 -class zencommand(RRDDaemon):
461 """ 462 Daemon code to schedule commands and run them. 463 """ 464 465 initialServices = RRDDaemon.initialServices + ['CommandConfig'] 466
467 - def __init__(self):
468 RRDDaemon.__init__(self, 'zencommand') 469 self.schedule = [] 470 self.timeout = None 471 self.pool = SshPool() 472 self.executed = 0
473
474 - def remote_deleteDevice(self, doomed):
475 self.log.debug("zenhub has asked us to delete device %s" % doomed) 476 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
477
478 - def remote_updateConfig(self, config):
479 self.log.debug("Configuration update from zenhub") 480 self.updateConfig([config], [config.device])
481
482 - def remote_updateDeviceList(self, devices):
483 self.log.debug("zenhub sent updated device list %s" % devices) 484 updated = [] 485 lastChanges = dict(devices) # map device name to last change 486 keep = [] 487 for cmd in self.schedule: 488 if cmd.deviceConfig.device in lastChanges: 489 if cmd.lastChange > lastChanges[cmd.device]: 490 updated.append(cmd.deviceConfig.device) 491 keep.append(cmd) 492 else: 493 self.log.info("Removing all commands for %s", cmd.deviceConfig.device) 494 self.schedule = keep 495 if updated: 496 self.log.info("Fetching the config for %s", updated) 497 d = self.model().callRemote('getDataSourceCommands', devices) 498 d.addCallback(self.updateConfig, updated) 499 d.addErrback(self.error)
500
501 - def updateConfig(self, configs, expected):
502 expected = Set(expected) 503 current = {} 504 for c in self.schedule: 505 if c.deviceConfig.device in expected: 506 current[c.deviceConfig.device,c.command] = c 507 # keep all the commands we didn't ask for 508 update = [c for c in self.schedule if c.deviceConfig.device not in expected] 509 for cfg in configs: 510 self.thresholds.updateForDevice(cfg.device, cfg.thresholds) 511 if self.options.device and self.options.device != cfg.device: 512 continue 513 processor = ConfigurationProcessor(cfg, current, self.sendEvent) 514 update.extend(processor.updateCommands()) 515 for device, command in current.keys(): 516 self.log.info("Deleting command %s from %s", device, command) 517 self.schedule = update 518 self.processSchedule()
519
520 - def heartbeatCycle(self, *ignored):
521 "There is no master 'cycle' to send the hearbeat" 522 self.heartbeat() 523 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle) 524 events = [] 525 events += self.rrdStats.gauge('schedule', 526 self.heartbeatTimeout, 527 len(self.schedule)) 528 events += self.rrdStats.counter('commands', 529 self.heartbeatTimeout, 530 self.executed) 531 events += self.rrdStats.counter('dataPoints', 532 self.heartbeatTimeout, 533 self.rrd.dataPoints) 534 events += self.rrdStats.gauge('cyclePoints', 535 self.heartbeatTimeout, 536 self.rrd.endCycle()) 537 self.sendEvents(events)
538 539
540 - def processSchedule(self, *unused):
541 """ 542 Run through the schedule and start anything that needs to be done. 543 Set a timer if we have nothing to do. 544 """ 545 if not self.options.cycle: 546 for cmd in self.schedule: 547 if cmd.running() or cmd.lastStart == 0: 548 break 549 else: 550 self.stop() 551 return 552 try: 553 if self.timeout and not self.timeout.called: 554 self.timeout.cancel() 555 self.timeout = None 556 def compare(x, y): 557 return cmp(x.nextRun(), y.nextRun())
558 self.schedule.sort(compare) 559 self.pool.trimConnections(self.schedule) 560 earliest = None 561 running = 0 562 now = time.time() 563 for c in self.schedule: 564 if c.running(): 565 running += 1 566 567 for c in self.schedule: 568 if running >= self.options.parallel: 569 break 570 if c.nextRun() <= now: 571 c.start(self.pool).addBoth(self.finished) 572 running += 1 573 else: 574 earliest = c.nextRun() - now 575 break 576 577 if earliest is not None: 578 self.log.debug("Next command in %d seconds", int(earliest)) 579 self.timeout = reactor.callLater(max(1, earliest), 580 self.processSchedule) 581 except Exception, ex: 582 self.log.exception(ex)
583 584
585 - def sendCmdEvent(self, cmd, severity, summary):
586 """ 587 Send an event using the info in the Cmd object. 588 """ 589 self.sendEvent(dict(device=cmd.deviceConfig.device, 590 component=cmd.component, 591 eventClass=cmd.eventClass, 592 eventKey=cmd.eventKey, 593 severity=severity, 594 summary=summary))
595
596 - def finished(self, cmdOrErr):
597 """ 598 The command has finished. cmdOrErr is either a Cmd instance or a 599 twisted failure. 600 """ 601 self.executed += 1 602 if isinstance(cmdOrErr, failure.Failure): 603 self.error(cmdOrErr) 604 else: 605 cmd = cmdOrErr 606 self._handleExitCode(cmd) 607 self.parseResults(cmd) 608 self.processSchedule()
609
610 - def _handleExitCode(self, cmd):
611 """ 612 zencommand handles sending clears for exit code 0, all other exit codes 613 should be handled by the parser associated with the command 614 """ 615 exitCode = cmd.result.exitCode 616 msg = 'Cmd: %s - Code: %s - Msg: %s' % ( 617 cmd.command, exitCode, getExitMessage(exitCode)) 618 if exitCode == 0: 619 self.sendCmdEvent(cmd, Clear, msg)
620
621 - def error(self, err):
622 """ 623 The finished method indicated that there was a failure. This method 624 is also called by RRDDaemon.errorStop. 625 """ 626 if isinstance(err.value, TimeoutError): 627 cmd, = err.value.args 628 msg = "Command timed out on device %s: %r" % ( 629 cmd.deviceConfig.device, cmd.command) 630 self.log.warning(msg) 631 self.sendCmdEvent(cmd, cmd.severity, msg) 632 else: 633 self.log.exception(err.value)
634
635 - def parseResults(self, cmd):
636 """ 637 Process the results of our command-line, send events 638 and check datapoints. 639 640 @param cmd: command 641 @type: cmd object 642 """ 643 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output) 644 results = ParsedResults() 645 try: 646 parser = cmd.parser.create() 647 except Exception, ex: 648 self.log.exception("Error loading parser %s" % cmd.parser) 649 import traceback 650 self.sendEvent(dict(device=cmd.deviceConfig.device, 651 summary="Error loading parser %s" % cmd.parser, 652 component="zencommand", 653 message=traceback.format_exc(), 654 agent="zencommand", 655 )) 656 return 657 parser.processResults(cmd, results) 658 659 for ev in results.events: 660 self.sendEvent(ev, device=cmd.deviceConfig.device) 661 662 for dp, value in results.values: 663 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath)) 664 value = self.rrd.save(dp.rrdPath, 665 value, 666 dp.rrdType, 667 dp.rrdCreateCommand, 668 cmd.cycleTime, 669 dp.rrdMin, 670 dp.rrdMax) 671 self.log.debug("RRD save result: %s" % value) 672 for ev in self.thresholds.check(dp.rrdPath, time.time(), value): 673 eventKey = cmd.getEventKey(dp) 674 if 'eventKey' in ev: 675 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 676 else: 677 ev['eventKey'] = eventKey 678 ev['component'] = dp.component 679 self.sendEvent(ev)
680
681 - def fetchConfig(self):
682 def doFetchConfig(driver): 683 try: 684 now = time.time() 685 686 yield self.model().callRemote('propertyItems') 687 self.setPropertyItems(driver.next()) 688 689 yield self.model().callRemote('getDefaultRRDCreateCommand') 690 createCommand = driver.next() 691 692 yield self.model().callRemote('getThresholdClasses') 693 self.remote_updateThresholdClasses(driver.next()) 694 695 696 yield self.model().callRemote('getCollectorThresholds') 697 self.rrdStats.config(self.options.monitor, 698 self.name, 699 driver.next(), 700 createCommand) 701 702 devices = [] 703 if self.options.device: 704 devices = [self.options.device] 705 yield self.model().callRemote('getDataSourceCommands', 706 devices) 707 if not devices: 708 devices = list(Set([c.deviceConfig.device 709 for c in self.schedule])) 710 self.updateConfig(driver.next(), devices) 711 712 self.rrd = RRDUtil(createCommand, 60) 713 714 self.sendEvents( 715 self.rrdStats.gauge('configTime', 716 self.configCycleInterval * 60, 717 time.time() - now)) 718 719 except Exception, ex: 720 self.log.exception(ex) 721 raise
722 723 return drive(doFetchConfig) 724 725
726 - def start(self, driver):
727 """ 728 Fetch the configuration and return a deferred for its completion. 729 Also starts the config cycle 730 """ 731 ex = None 732 try: 733 self.log.debug('Fetching configuration from zenhub') 734 yield self.fetchConfig() 735 driver.next() 736 self.log.debug('Finished config fetch') 737 except Exception, ex: 738 self.log.exception(ex) 739 driveLater(self.configCycleInterval * 60, self.start) 740 if ex: 741 raise ex
742
743 - def buildOptions(self):
744 RRDDaemon.buildOptions(self) 745 746 self.parser.add_option('--parallel', dest='parallel', 747 default=10, type='int', 748 help="Number of devices to collect at one time")
749
750 - def connected(self):
751 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop) 752 if self.options.cycle: 753 d.addCallback(self.heartbeatCycle)
754 755 756 if __name__ == '__main__': 757 from Products.ZenRRD.zencommand import zencommand 758 z = zencommand() 759 z.run() 760