Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """ZenCommand 
 15   
 16  Run Command plugins periodically. 
 17   
 18  """ 
 19   
 20  import time 
 21  from pprint import pformat 
 22  import logging 
 23  log = logging.getLogger("zen.zencommand") 
 24  from sets import Set 
 25   
 26  from twisted.internet import reactor, defer, error 
 27  from twisted.internet.protocol import ProcessProtocol 
 28  from twisted.python import failure 
 29  from twisted.spread import pb 
 30   
 31  import Globals 
 32  from Products.ZenUtils.Driver import drive, driveLater 
 33  from Products.ZenUtils.Utils import unused 
 34  from Products.ZenRRD.RRDDaemon import RRDDaemon 
 35  from Products.ZenRRD.RRDUtil import RRDUtil 
 36  from Products.DataCollector.SshClient import SshClient 
 37  from Products.ZenEvents.ZenEventClasses import Clear 
 38  from Products.ZenRRD.CommandParser import ParsedResults 
 39   
 40  from Products.DataCollector import Plugins 
 41  unused(Plugins) 
 42  MAX_CONNECTIONS = 256 
 43   
 44   
 45   
46 -class TimeoutError(Exception):
47 """ 48 Error for a defered call taking too long to complete 49 """ 50
51 - def __init__(self, *args):
52 Exception.__init__(self) 53 self.args = args
54 55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete" 58 59 def _timeout(deferred, obj): 60 "took too long... call an errback" 61 deferred.errback(failure.Failure(TimeoutError(obj)))
62 63 64 def _cb(arg, timer): 65 "the command finished, possibly by timing out" 66 if not timer.called: 67 timer.cancel() 68 return arg 69 70 71 timer = reactor.callLater(seconds, _timeout, deferred, obj) 72 deferred.addBoth(_cb, timer) 73 return deferred 74 75
76 -class ProcessRunner(ProcessProtocol):
77 """ 78 Provide deferred process execution 79 """ 80 stopped = None 81 exitCode = None 82 output = '' 83
84 - def start(self, cmd):
85 "Kick off the process: run it local" 86 log.debug('running %r' % cmd.command) 87 88 shell = '/bin/sh' 89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 90 self.command = ' '.join(self.cmdline) 91 log.debug('cmd line: %r' % self.command) 92 reactor.spawnProcess(self, shell, self.cmdline, env=None) 93 94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd) 95 self.stopped = d 96 self.stopped.addErrback(self.timeout) 97 return d
98
99 - def timeout(self, value):
100 "Kill a process if it takes too long" 101 try: 102 self.transport.signalProcess('KILL') 103 except error.ProcessExitedAlready: 104 log.debug("Command already exited: %s" % self.command) 105 return value
106 107
108 - def outReceived(self, data):
109 "Store up the output as it arrives from the process" 110 self.output += data
111 112
113 - def processEnded(self, reason):
114 "notify the starter that their process is complete" 115 self.exitCode = reason.value.exitCode 116 log.debug('Received exit code: %s' % self.exitCode) 117 log.debug('Command: %r' % self.command) 118 log.debug('Output: %r' % self.output) 119 120 if self.stopped: 121 d, self.stopped = self.stopped, None 122 if not d.called: 123 d.callback(self)
124 125
126 -class MySshClient(SshClient):
127 """ 128 Connection to SSH server at the remote device 129 """ 130
131 - def __init__(self, *args, **kw):
132 SshClient.__init__(self, *args, **kw) 133 self.defers = {}
134
135 - def addCommand(self, command):
136 "Run a command against the server" 137 d = defer.Deferred() 138 self.defers[command] = d 139 SshClient.addCommand(self, command) 140 return d
141 142
143 - def addResult(self, command, data, code):
144 "Forward the results of the command execution to the starter" 145 SshClient.addResult(self, command, data, code) 146 d = self.defers.pop(command) 147 if not d.called: 148 d.callback((data, code))
149 150
151 - def check(self, ip, timeout=2):
152 "Turn off blocking SshClient.test method" 153 return True
154 155
156 - def clientFinished(self):
157 "We don't need to track commands/results when they complete" 158 SshClient.clientFinished(self) 159 self.commands = [] 160 self.results = []
161 162
163 -class SshPool:
164 """ 165 Cache all the Ssh connections so they can be managed 166 """ 167
168 - def __init__(self):
169 self.pool = {}
170 171
172 - def get(self, cmd):
173 "Make a new SSH connection if there isn't one available" 174 dc = cmd.deviceConfig 175 result = self.pool.get(dc.device, None) 176 if result is None: 177 log.debug("Creating connection to %s", dc.device) 178 options = Options(dc.username, dc.password, 179 dc.loginTimeout, dc.commandTimeout, 180 dc.keyPath, dc.concurrentSessions) 181 # New param KeyPath 182 result = MySshClient(dc.device, dc.ipAddress, dc.port, 183 options=options) 184 result.run() 185 self.pool[dc.device] = result 186 return result
187 188
189 - def _close(self, device):
190 "close the SSH connection to a device, if it exists" 191 c = self.pool.get(device, None) 192 if c: 193 log.debug("Closing connection to %s", device) 194 if c.transport: 195 c.transport.loseConnection() 196 del self.pool[device]
197 198
199 - def close(self, cmd):
200 "symetric close that matches get() method" 201 self._close(cmd.deviceConfig.device)
202 203
204 - def trimConnections(self, schedule):
205 "reduce the number of connections using the schedule for guidance" 206 # compute device list in order of next use 207 devices = [] 208 for c in schedule: 209 device = c.deviceConfig.device 210 if device not in devices: 211 devices.append(device) 212 # close as many devices as needed 213 while devices and len(self.pool) > MAX_CONNECTIONS: 214 self._close(devices.pop())
215 216
217 -class SshRunner:
218 """ 219 Run a single command across a cached SSH connection 220 """ 221 exitCode = None 222 output = None 223
224 - def __init__(self, pool):
225 self.pool = pool
226 227
228 - def start(self, cmd):
229 "Initiate a command on the remote device" 230 self.defer = defer.Deferred() 231 c = self.pool.get(cmd) 232 try: 233 d = Timeout(c.addCommand(cmd.command), 234 cmd.deviceConfig.commandTimeout, 235 cmd) 236 except Exception, ex: 237 log.warning('Error starting command: %s', ex) 238 self.pool.close(cmd) 239 return defer.fail(ex) 240 d.addErrback(self.timeout) 241 d.addBoth(self.processEnded) 242 return d
243 244
245 - def timeout(self, arg):
246 "Deal with slow executing command/connection (close it)" 247 cmd, = arg.value.args 248 # we could send a kill signal, but then we would need to track 249 # the command channel to send it to: just close the connection 250 self.pool.close(cmd) 251 return arg
252 253
254 - def processEnded(self, value):
255 "Deliver ourselves to the starter with the proper attributes" 256 if isinstance(value, failure.Failure): 257 return value 258 self.output, self.exitCode = value 259 return self
260 261
262 -class DeviceConfig(pb.Copyable, pb.RemoteCopy):
263 lastChange = 0. 264 device = '' 265 ipAddress = '' 266 port = 0 267 username = '' 268 password = '' 269 loginTimeout = 0. 270 commandTimeout = 0. 271 keyPath = ''
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig) 273 274
275 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
276 id = '' 277 component = '' 278 rrdPath = '' 279 rrdType = None 280 rrdCreateCommand = '' 281 rrdMin = None 282 rrdMax = None 283
284 - def __init__(self):
285 self.data = {}
286
287 - def __repr__(self):
288 return pformat((self.data, self.id))
289 290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 291
292 -class Cmd(pb.Copyable, pb.RemoteCopy):
293 """ 294 Holds the config of every command to be run 295 """ 296 command = None 297 useSsh = False 298 cycleTime = None 299 eventClass = None 300 eventKey = None 301 severity = 3 302 lastStart = 0 303 lastStop = 0 304 result = None 305 306
307 - def __init__(self):
308 self.points = []
309
310 - def running(self):
311 return self.lastStop < self.lastStart
312 313
314 - def name(self):
315 cmd, args = (self.command + ' ').split(' ', 1) 316 cmd = cmd.split('/')[-1] 317 return '%s %s' % (cmd, args)
318 319
320 - def nextRun(self):
321 if self.running(): 322 return self.lastStart + self.cycleTime 323 return self.lastStop + self.cycleTime
324 325
326 - def start(self, pool):
327 if self.useSsh: 328 pr = SshRunner(pool) 329 else: 330 pr = ProcessRunner() 331 d = pr.start(self) 332 self.lastStart = time.time() 333 log.debug('Process %s started' % self.name()) 334 d.addBoth(self.processEnded) 335 return d
336 337
338 - def processEnded(self, pr):
339 self.result = pr 340 self.lastStop = time.time() 341 342 # Check for a condition that could cause zencommand to stop cycling. 343 # http://dev.zenoss.org/trac/ticket/4936 344 if self.lastStop < self.lastStart: 345 log.debug('System clock went back?') 346 self.lastStop = self.lastStart 347 348 if not isinstance(pr, failure.Failure): 349 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % ( 350 self.name(), 351 pr.exitCode, 352 self.lastStop - self.lastStart)) 353 return self 354 return pr
355 356
357 - def updateConfig(self, cfg, deviceConfig):
358 self.deviceConfig = deviceConfig 359 self.useSsh = cfg.useSsh 360 self.cycleTime = max(cfg.cycleTime, 1) 361 self.eventKey = cfg.eventKey 362 self.eventClass = cfg.eventClass 363 self.severity = cfg.severity 364 self.command = str(cfg.command) 365 self.points = cfg.points 366 return self
367
368 - def getEventKey(self, point):
369 # fetch datapoint name from filename path and add it to the event key 370 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
371
372 - def commandKey(self):
373 "Provide a value that establishes the uniqueness of this command" 374 return '%'.join(map(str, [self.useSsh, self.cycleTime, 375 self.severity, self.command]))
376 377 pb.setUnjellyableForClass(Cmd, Cmd) 378
379 -class Options:
380 loginTries=1 381 searchPath='' 382 existenceTest=None 383
384 - def __init__(self, username, password, loginTimeout, commandTimeout, 385 keyPath, concurrentSessions):
386 self.username = username 387 self.password = password 388 self.loginTimeout=loginTimeout 389 self.commandTimeout=commandTimeout 390 self.keyPath = keyPath 391 self.concurrentSessions = concurrentSessions
392 393
394 -def warnUsernameNotSet(device, sshCmd, sendEvent, suppressed):
395 """ 396 Warn that the username is not set for device and the SSH command cannot be 397 executed. 398 """ 399 if device not in suppressed: 400 summary = 'zCommandUsername is not set' 401 log.warning(summary + ' for %s' % device) 402 sendEvent(dict(device=device, 403 eventClass='/Cmd/Fail', 404 eventKey='zCommandUsername', 405 severity=4, 406 component='zencommand', 407 summary=summary)) 408 msg = 'username not configured for %s. skipping %s.' 409 log.debug(msg % (device, sshCmd.command))
410 411
412 -def updateCommands(config, currentDict, sendEvent):
413 """ 414 Go through the Cmd objects in config.commands and update the config on the 415 command with config. If currentDict has the command on it then use that 416 one, otherwise use the command from config. If the device does not have a 417 username set, then don't yield commands that use SSH. 418 419 Parameters: 420 421 config - one of the configurations that was returned by calling 422 getDataSourceCommands on zenhub. 423 424 currentDict - a dictionary that maps (device, command) to Cmd object 425 for the commands currently on zencommand's schedule. 426 """ 427 428 suppressed = [] # log warning and send event once per device 429 430 for cmd in config.commands: 431 432 key = (config.device, cmd.command) 433 434 if not config.username and cmd.useSsh: 435 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed) 436 if config.device not in suppressed: 437 suppressed.append(config.device) 438 if key in currentDict: 439 del currentDict[key] 440 continue 441 442 if key in currentDict: newCmd = currentDict.pop(key) 443 else : newCmd = cmd 444 445 yield newCmd.updateConfig(cmd, config)
446 447
448 -class zencommand(RRDDaemon):
449 """ 450 Daemon code to schedule commands and run them. 451 """ 452 453 initialServices = RRDDaemon.initialServices + ['CommandConfig'] 454
455 - def __init__(self):
456 RRDDaemon.__init__(self, 'zencommand') 457 self.schedule = [] 458 self.timeout = None 459 self.pool = SshPool() 460 self.executed = 0
461
462 - def remote_deleteDevice(self, doomed):
463 self.log.debug("zenhub has asked us to delete device %s" % doomed) 464 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
465
466 - def remote_updateConfig(self, config):
467 self.log.debug("Configuration update from zenhub") 468 self.updateConfig([config], [config.device])
469
470 - def remote_updateDeviceList(self, devices):
471 self.log.debug("zenhub sent updated device list %s" % devices) 472 updated = [] 473 lastChanges = dict(devices) # map device name to last change 474 keep = [] 475 for cmd in self.schedule: 476 if cmd.deviceConfig.device in lastChanges: 477 if cmd.lastChange > lastChanges[cmd.device]: 478 updated.append(cmd.deviceConfig.device) 479 keep.append(cmd) 480 else: 481 self.log.info("Removing all commands for %s", cmd.deviceConfig.device) 482 self.schedule = keep 483 if updated: 484 self.log.info("Fetching the config for %s", updated) 485 d = self.model().callRemote('getDataSourceCommands', devices) 486 d.addCallback(self.updateConfig, updated) 487 d.addErrback(self.error)
488
489 - def updateConfig(self, configs, expected):
490 expected = Set(expected) 491 current = {} 492 for c in self.schedule: 493 if c.deviceConfig.device in expected: 494 current[c.deviceConfig.device,c.command] = c 495 # keep all the commands we didn't ask for 496 update = [c for c in self.schedule if c.deviceConfig.device not in expected] 497 for cfg in configs: 498 self.thresholds.updateForDevice(cfg.device, cfg.thresholds) 499 if self.options.device and self.options.device != cfg.device: 500 continue 501 update.extend(updateCommands(cfg, current, self.sendEvent)) 502 for device, command in current.keys(): 503 self.log.info("Deleting command %s from %s", device, command) 504 self.schedule = update 505 self.processSchedule()
506
507 - def heartbeatCycle(self, *ignored):
508 "There is no master 'cycle' to send the hearbeat" 509 self.heartbeat() 510 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle) 511 events = [] 512 events += self.rrdStats.gauge('schedule', 513 self.heartbeatTimeout, 514 len(self.schedule)) 515 events += self.rrdStats.counter('commands', 516 self.heartbeatTimeout, 517 self.executed) 518 events += self.rrdStats.counter('dataPoints', 519 self.heartbeatTimeout, 520 self.rrd.dataPoints) 521 events += self.rrdStats.gauge('cyclePoints', 522 self.heartbeatTimeout, 523 self.rrd.endCycle()) 524 self.sendEvents(events)
525 526
527 - def processSchedule(self, *unused):
528 """ 529 Run through the schedule and start anything that needs to be done. 530 Set a timer if we have nothing to do. 531 """ 532 if not self.options.cycle: 533 for cmd in self.schedule: 534 if cmd.running() or cmd.lastStart == 0: 535 break 536 else: 537 self.stop() 538 return 539 try: 540 if self.timeout and not self.timeout.called: 541 self.timeout.cancel() 542 self.timeout = None 543 def compare(x, y): 544 return cmp(x.nextRun(), y.nextRun())
545 self.schedule.sort(compare) 546 self.pool.trimConnections(self.schedule) 547 earliest = None 548 running = 0 549 now = time.time() 550 for c in self.schedule: 551 if c.running(): 552 running += 1 553 554 for c in self.schedule: 555 if running >= self.options.parallel: 556 break 557 if c.nextRun() <= now: 558 c.start(self.pool).addBoth(self.finished) 559 running += 1 560 else: 561 earliest = c.nextRun() - now 562 break 563 564 if earliest is not None: 565 self.log.debug("Next command in %d seconds", int(earliest)) 566 self.timeout = reactor.callLater(max(1, earliest), 567 self.processSchedule) 568 except Exception, ex: 569 self.log.exception(ex)
570 571
572 - def sendCmdEvent(self, cmd, severity, summary):
573 """ 574 Send an event using the info in the Cmd object. 575 """ 576 self.sendEvent(dict(device=cmd.deviceConfig.device, 577 component=cmd.component, 578 eventClass=cmd.eventClass, 579 eventKey=cmd.eventKey, 580 severity=severity, 581 summary=summary))
582
583 - def finished(self, cmdOrErr):
584 """ 585 The command has finished. cmdOrErr is either a Cmd instance or a 586 twisted failure. 587 """ 588 self.executed += 1 589 if isinstance(cmdOrErr, failure.Failure): 590 self.error(cmdOrErr) 591 else: 592 cmd = cmdOrErr 593 self.sendCmdEvent(cmd, Clear, "Clear") 594 self.parseResults(cmd) 595 self.processSchedule()
596
597 - def error(self, err):
598 """ 599 The finished method indicated that there was a failure. This method 600 is also called by RRDDaemon.errorStop. 601 """ 602 if isinstance(err.value, TimeoutError): 603 cmd, = err.value.args 604 msg = "Command timed out on device %s: %r" % ( 605 cmd.deviceConfig.device, cmd.command) 606 self.log.warning(msg) 607 self.sendCmdEvent(cmd, cmd.severity, msg) 608 else: 609 self.log.exception(err.value)
610
611 - def parseResults(self, cmd):
612 """ 613 Process the results of our command-line, send events 614 and check datapoints. 615 616 @param cmd: command 617 @type: cmd object 618 """ 619 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output) 620 results = ParsedResults() 621 try: 622 parser = cmd.parser.create() 623 except Exception, ex: 624 self.log.exception("Error loading parser %s" % cmd.parser) 625 import traceback 626 self.sendEvent(dict(device=cmd.deviceConfig.device, 627 summary="Error loading parser %s" % cmd.parser, 628 component="zencommand", 629 message=traceback.format_exc(), 630 agent="zencommand", 631 )) 632 return 633 parser.processResults(cmd, results) 634 635 for ev in results.events: 636 self.sendEvent(ev, device=cmd.deviceConfig.device) 637 638 for dp, value in results.values: 639 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath)) 640 value = self.rrd.save(dp.rrdPath, 641 value, 642 dp.rrdType, 643 dp.rrdCreateCommand, 644 cmd.cycleTime, 645 dp.rrdMin, 646 dp.rrdMax) 647 self.log.debug("RRD save result: %s" % value) 648 for ev in self.thresholds.check(dp.rrdPath, time.time(), value): 649 eventKey = cmd.getEventKey(dp) 650 if 'eventKey' in ev: 651 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 652 else: 653 ev['eventKey'] = eventKey 654 ev['component'] = dp.component 655 self.sendEvent(ev)
656
657 - def fetchConfig(self):
658 def doFetchConfig(driver): 659 try: 660 now = time.time() 661 662 yield self.model().callRemote('propertyItems') 663 self.setPropertyItems(driver.next()) 664 665 yield self.model().callRemote('getDefaultRRDCreateCommand') 666 createCommand = driver.next() 667 668 yield self.model().callRemote('getThresholdClasses') 669 self.remote_updateThresholdClasses(driver.next()) 670 671 672 yield self.model().callRemote('getCollectorThresholds') 673 self.rrdStats.config(self.options.monitor, 674 self.name, 675 driver.next(), 676 createCommand) 677 678 devices = [] 679 if self.options.device: 680 devices = [self.options.device] 681 yield self.model().callRemote('getDataSourceCommands', 682 devices) 683 if not devices: 684 devices = list(Set([c.deviceConfig.device 685 for c in self.schedule])) 686 self.updateConfig(driver.next(), devices) 687 688 self.rrd = RRDUtil(createCommand, 60) 689 690 self.sendEvents( 691 self.rrdStats.gauge('configTime', 692 self.configCycleInterval * 60, 693 time.time() - now)) 694 695 except Exception, ex: 696 self.log.exception(ex) 697 raise
698 699 return drive(doFetchConfig) 700 701
702 - def start(self, driver):
703 """ 704 Fetch the configuration and return a deferred for its completion. 705 Also starts the config cycle 706 """ 707 ex = None 708 try: 709 self.log.debug('Fetching configuration from zenhub') 710 yield self.fetchConfig() 711 driver.next() 712 self.log.debug('Finished config fetch') 713 except Exception, ex: 714 self.log.exception(ex) 715 driveLater(self.configCycleInterval * 60, self.start) 716 if ex: 717 raise ex
718
719 - def buildOptions(self):
720 RRDDaemon.buildOptions(self) 721 722 self.parser.add_option('--parallel', dest='parallel', 723 default=10, type='int', 724 help="Number of devices to collect at one time")
725
726 - def connected(self):
727 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop) 728 if self.options.cycle: 729 d.addCallback(self.heartbeatCycle)
730 731 732 if __name__ == '__main__': 733 from Products.ZenRRD.zencommand import zencommand 734 z = zencommand() 735 z.run() 736