Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """ZenCommand 
 15   
 16  Run Command plugins periodically. 
 17   
 18  """ 
 19   
 20  import time 
 21  from pprint import pformat 
 22  import logging 
 23  log = logging.getLogger("zen.zencommand") 
 24  from sets import Set 
 25   
 26  from twisted.internet import reactor, defer, error 
 27  from twisted.internet.protocol import ProcessProtocol 
 28  from twisted.python import failure 
 29  from twisted.spread import pb 
 30   
 31  import Globals 
 32  from Products.ZenUtils.Driver import drive, driveLater 
 33   
 34  from Products.ZenRRD.RRDDaemon import RRDDaemon 
 35  from Products.ZenRRD.RRDUtil import RRDUtil 
 36  from Products.DataCollector.SshClient import SshClient 
 37   
 38  from Products.ZenRRD.CommandParser import getParser, ParsedResults 
 39   
 40  MAX_CONNECTIONS = 256 
 41   
 42   
 43   
44 -class TimeoutError(Exception):
45 """ 46 Error for a defered call taking too long to complete 47 """ 48
49 - def __init__(self, *args):
50 Exception.__init__(self) 51 self.args = args
52 53
54 -def Timeout(deferred, seconds, obj):
55 "Cause an error on a deferred when it is taking too long to complete" 56 57 def _timeout(deferred, obj): 58 "took too long... call an errback" 59 deferred.errback(failure.Failure(TimeoutError(obj)))
60 61 62 def _cb(arg, timer): 63 "the command finished, possibly by timing out" 64 if not timer.called: 65 timer.cancel() 66 return arg 67 68 69 timer = reactor.callLater(seconds, _timeout, deferred, obj) 70 deferred.addBoth(_cb, timer) 71 return deferred 72 73
74 -class ProcessRunner(ProcessProtocol):
75 """ 76 Provide deferred process execution 77 """ 78 stopped = None 79 exitCode = None 80 output = '' 81
82 - def start(self, cmd):
83 "Kick off the process: run it local" 84 log.debug('running %r' % cmd.command) 85 86 shell = '/bin/sh' 87 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 88 self.command = ' '.join(self.cmdline) 89 log.debug('cmd line: %r' % self.command) 90 reactor.spawnProcess(self, shell, self.cmdline, env=None) 91 92 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd) 93 self.stopped = d 94 self.stopped.addErrback(self.timeout) 95 return d
96
97 - def timeout(self, value):
98 "Kill a process if it takes too long" 99 try: 100 self.transport.signalProcess('KILL') 101 except error.ProcessExitedAlready: 102 log.debug("Command already exited: %s" % self.command) 103 return value
104 105
106 - def outReceived(self, data):
107 "Store up the output as it arrives from the process" 108 self.output += data
109 110
111 - def processEnded(self, reason):
112 "notify the starter that their process is complete" 113 self.exitCode = reason.value.exitCode 114 log.debug('Received exit code: %s' % self.exitCode) 115 log.debug('Command: %r' % self.command) 116 log.debug('Output: %r' % self.output) 117 118 if self.stopped: 119 d, self.stopped = self.stopped, None 120 if not d.called: 121 d.callback(self)
122 123
124 -class MySshClient(SshClient):
125 """ 126 Connection to SSH server at the remote device 127 """ 128
129 - def __init__(self, *args, **kw):
130 SshClient.__init__(self, *args, **kw) 131 self.defers = {}
132
133 - def addCommand(self, command):
134 "Run a command against the server" 135 d = defer.Deferred() 136 self.defers[command] = d 137 SshClient.addCommand(self, command) 138 return d
139 140
141 - def addResult(self, command, data, code):
142 "Forward the results of the command execution to the starter" 143 SshClient.addResult(self, command, data, code) 144 d = self.defers.pop(command) 145 if not d.called: 146 d.callback((data, code))
147 148
149 - def check(self, ip, timeout=2):
150 "Turn off blocking SshClient.test method" 151 return True
152 153
154 - def clientFinished(self):
155 "We don't need to track commands/results when they complete" 156 SshClient.clientFinished(self) 157 self.commands = [] 158 self.results = []
159 160
161 -class SshPool:
162 """ 163 Cache all the Ssh connections so they can be managed 164 """ 165
166 - def __init__(self):
167 self.pool = {}
168 169
170 - def get(self, cmd):
171 "Make a new SSH connection if there isn't one available" 172 dc = cmd.deviceConfig 173 result = self.pool.get(dc.device, None) 174 if result is None: 175 log.debug("Creating connection to %s", dc.device) 176 options = Options(dc.username, dc.password, 177 dc.loginTimeout, dc.commandTimeout, 178 dc.keyPath, dc.concurrentSessions) 179 # New param KeyPath 180 result = MySshClient(dc.device, dc.ipAddress, dc.port, 181 options=options) 182 result.run() 183 self.pool[dc.device] = result 184 return result
185 186
187 - def _close(self, device):
188 "close the SSH connection to a device, if it exists" 189 c = self.pool.get(device, None) 190 if c: 191 log.debug("Closing connection to %s", device) 192 if c.transport: 193 c.transport.loseConnection() 194 del self.pool[device]
195 196
197 - def close(self, cmd):
198 "symetric close that matches get() method" 199 self._close(cmd.deviceConfig.device)
200 201
202 - def trimConnections(self, schedule):
203 "reduce the number of connections using the schedule for guidance" 204 # compute device list in order of next use 205 devices = [] 206 for c in schedule: 207 device = c.deviceConfig.device 208 if device not in devices: 209 devices.append(device) 210 # close as many devices as needed 211 while devices and len(self.pool) > MAX_CONNECTIONS: 212 self._close(devices.pop())
213 214
215 -class SshRunner:
216 """ 217 Run a single command across a cached SSH connection 218 """ 219 exitCode = None 220 output = None 221
222 - def __init__(self, pool):
223 self.pool = pool
224 225
226 - def start(self, cmd):
227 "Initiate a command on the remote device" 228 self.defer = defer.Deferred() 229 c = self.pool.get(cmd) 230 try: 231 d = Timeout(c.addCommand(cmd.command), 232 cmd.deviceConfig.commandTimeout, 233 cmd) 234 except Exception, ex: 235 log.warning('Error starting command: %s', ex) 236 self.pool.close(cmd) 237 return defer.fail(ex) 238 d.addErrback(self.timeout) 239 d.addBoth(self.processEnded) 240 return d
241 242
243 - def timeout(self, arg):
244 "Deal with slow executing command/connection (close it)" 245 cmd, = arg.value.args 246 # we could send a kill signal, but then we would need to track 247 # the command channel to send it to: just close the connection 248 self.pool.close(cmd) 249 return arg
250 251
252 - def processEnded(self, value):
253 "Deliver ourselves to the starter with the proper attributes" 254 if isinstance(value, failure.Failure): 255 return value 256 self.output, self.exitCode = value 257 return self
258 259
260 -class DeviceConfig(pb.Copyable, pb.RemoteCopy):
261 lastChange = 0. 262 device = '' 263 ipAddress = '' 264 port = 0 265 username = '' 266 password = '' 267 loginTimeout = 0. 268 commandTimeout = 0. 269 keyPath = ''
270 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig) 271 272
273 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
274 id = '' 275 component = '' 276 rrdPath = '' 277 rrdType = None 278 rrdCreateCommand = '' 279 rrdMin = None 280 rrdMax = None 281
282 - def __init__(self):
283 self.data = {}
284
285 - def __repr__(self):
286 return pformat((self.data, self.id))
287 288 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 289
290 -class Cmd(pb.Copyable, pb.RemoteCopy):
291 """ 292 Holds the config of every command to be run 293 """ 294 command = None 295 useSsh = False 296 cycleTime = None 297 eventClass = None 298 eventKey = None 299 severity = 3 300 lastStart = 0 301 lastStop = 0 302 result = None 303 304
305 - def __init__(self):
306 self.points = []
307
308 - def running(self):
309 return self.lastStop < self.lastStart
310 311
312 - def name(self):
313 cmd, args = (self.command + ' ').split(' ', 1) 314 cmd = cmd.split('/')[-1] 315 return '%s %s' % (cmd, args)
316 317
318 - def nextRun(self):
319 if self.running(): 320 return self.lastStart + self.cycleTime 321 return self.lastStop + self.cycleTime
322 323
324 - def start(self, pool):
325 if self.useSsh: 326 pr = SshRunner(pool) 327 else: 328 pr = ProcessRunner() 329 d = pr.start(self) 330 self.lastStart = time.time() 331 log.debug('Process %s started' % self.name()) 332 d.addBoth(self.processEnded) 333 return d
334 335
336 - def processEnded(self, pr):
337 self.result = pr 338 self.lastStop = time.time() 339 if not isinstance(pr, failure.Failure): 340 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % ( 341 self.name(), 342 pr.exitCode, 343 self.lastStop - self.lastStart)) 344 return self 345 return pr
346 347
348 - def updateConfig(self, cfg, deviceConfig):
349 self.deviceConfig = deviceConfig 350 self.useSsh = cfg.useSsh 351 self.cycleTime = max(cfg.cycleTime, 1) 352 self.eventKey = cfg.eventKey 353 self.eventClass = cfg.eventClass 354 self.severity = cfg.severity 355 self.command = str(cfg.command) 356 self.points = cfg.points 357 return self
358
359 - def getEventKey(self, point):
360 # fetch datapoint name from filename path and add it to the event key 361 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
362
363 - def commandKey(self):
364 "Provide a value that establishes the uniqueness of this command" 365 return '%'.join(map(str, [self.useSsh, self.cycleTime, 366 self.severity, self.command]))
367 368 pb.setUnjellyableForClass(Cmd, Cmd) 369
370 -class Options:
371 loginTries=1 372 searchPath='' 373 existenceTest=None 374
375 - def __init__(self, username, password, loginTimeout, commandTimeout, 376 keyPath, concurrentSessions):
377 self.username = username 378 self.password = password 379 self.loginTimeout=loginTimeout 380 self.commandTimeout=commandTimeout 381 self.keyPath = keyPath 382 self.concurrentSessions = concurrentSessions
383 384
385 -def warnUsernameNotSet(device, sshCmd, sendEvent, suppressed):
386 """ 387 Warn that the username is not set for device and the SSH command cannot be 388 executed. 389 """ 390 if device not in suppressed: 391 summary = 'zCommandUsername is not set' 392 log.warning(summary + ' for %s' % device) 393 sendEvent(dict(device=device, 394 eventClass='/Cmd/Fail', 395 eventKey='zCommandUsername', 396 severity=4, 397 component='zencommand', 398 summary=summary)) 399 msg = 'username not configured for %s. skipping %s.' 400 log.debug(msg % (device, sshCmd.command))
401 402
403 -def updateCommands(config, currentDict, sendEvent):
404 """ 405 Go through the Cmd objects in config.commands and update the config on the 406 command with config. If currentDict has the command on it then use that 407 one, otherwise use the command from config. If the device does not have a 408 username set, then don't yield commands that use SSH. 409 410 Parameters: 411 412 config - one of the configurations that was returned by calling 413 getDataSourceCommands on zenhub. 414 415 currentDict - a dictionary that maps (device, command) to Cmd object 416 for the commands currently on zencommand's schedule. 417 """ 418 419 suppressed = [] # log warning and send event once per device 420 421 for cmd in config.commands: 422 423 key = (config.device, cmd.command) 424 425 if not config.username and cmd.useSsh: 426 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed) 427 if config.device not in suppressed: 428 suppressed.append(config.device) 429 if key in currentDict: 430 del currentDict[key] 431 continue 432 433 if key in currentDict: newCmd = currentDict.pop(key) 434 else : newCmd = cmd 435 436 yield newCmd.updateConfig(cmd, config)
437 438
439 -class zencommand(RRDDaemon):
440 """ 441 Daemon code to schedule commands and run them. 442 """ 443 444 initialServices = RRDDaemon.initialServices + ['CommandConfig'] 445
446 - def __init__(self):
447 RRDDaemon.__init__(self, 'zencommand') 448 self.schedule = [] 449 self.timeout = None 450 self.pool = SshPool() 451 self.executed = 0
452
453 - def remote_deleteDevice(self, doomed):
454 self.log.debug("zenhub has asked us to delete device %s" % doomed) 455 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
456
457 - def remote_updateConfig(self, config):
458 self.log.debug("Configuration update from zenhub") 459 self.updateConfig([config], [config.device])
460
461 - def remote_updateDeviceList(self, devices):
462 self.log.debug("zenhub sent updated device list %s" % devices) 463 updated = [] 464 lastChanges = dict(devices) # map device name to last change 465 keep = [] 466 for cmd in self.schedule: 467 if cmd.deviceConfig.device in lastChanges: 468 if cmd.lastChange > lastChanges[cmd.device]: 469 updated.append(cmd.deviceConfig.device) 470 keep.append(cmd) 471 else: 472 self.log.info("Removing all commands for %s", cmd.deviceConfig.device) 473 self.schedule = keep 474 if updated: 475 self.log.info("Fetching the config for %s", updated) 476 d = self.model().callRemote('getDataSourceCommands', devices) 477 d.addCallback(self.updateConfig, updated) 478 d.addErrback(self.error)
479
480 - def updateConfig(self, configs, expected):
481 expected = Set(expected) 482 current = {} 483 for c in self.schedule: 484 if c.deviceConfig.device in expected: 485 current[c.deviceConfig.device,c.command] = c 486 # keep all the commands we didn't ask for 487 update = [c for c in self.schedule if c.deviceConfig.device not in expected] 488 for cfg in configs: 489 self.thresholds.updateForDevice(cfg.device, cfg.thresholds) 490 if self.options.device and self.options.device != cfg.device: 491 continue 492 update.extend(updateCommands(cfg, current, self.sendEvent)) 493 for device, command in current.keys(): 494 self.log.info("Deleting command %s from %s", device, command) 495 self.schedule = update 496 self.processSchedule()
497
498 - def heartbeatCycle(self, *ignored):
499 "There is no master 'cycle' to send the hearbeat" 500 self.heartbeat() 501 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle) 502 events = [] 503 events += self.rrdStats.gauge('schedule', 504 self.heartbeatTimeout, 505 len(self.schedule)) 506 events += self.rrdStats.counter('commands', 507 self.heartbeatTimeout, 508 self.executed) 509 events += self.rrdStats.counter('dataPoints', 510 self.heartbeatTimeout, 511 self.rrd.dataPoints) 512 events += self.rrdStats.gauge('cyclePoints', 513 self.heartbeatTimeout, 514 self.rrd.endCycle()) 515 self.sendEvents(events)
516 517
518 - def processSchedule(self, *unused):
519 """ 520 Run through the schedule and start anything that needs to be done. 521 Set a timer if we have nothing to do. 522 """ 523 if not self.options.cycle: 524 for cmd in self.schedule: 525 if cmd.running() or cmd.lastStart == 0: 526 break 527 else: 528 self.stop() 529 return 530 try: 531 if self.timeout and not self.timeout.called: 532 self.timeout.cancel() 533 self.timeout = None 534 def compare(x, y): 535 return cmp(x.nextRun(), y.nextRun())
536 self.schedule.sort(compare) 537 self.pool.trimConnections(self.schedule) 538 earliest = None 539 running = 0 540 now = time.time() 541 for c in self.schedule: 542 if c.running(): 543 running += 1 544 545 for c in self.schedule: 546 if running >= self.options.parallel: 547 break 548 if c.nextRun() <= now: 549 c.start(self.pool).addBoth(self.finished) 550 running += 1 551 else: 552 earliest = c.nextRun() - now 553 break 554 555 if earliest is not None: 556 self.log.debug("Next command in %d seconds", int(earliest)) 557 self.timeout = reactor.callLater(max(1, earliest), 558 self.processSchedule) 559 except Exception, ex: 560 self.log.exception(ex)
561 562
563 - def finished(self, cmd):
564 self.executed += 1 565 if isinstance(cmd, failure.Failure): 566 self.error(cmd) 567 else: 568 self.parseResults(cmd) 569 self.processSchedule()
570 571
572 - def error(self, err):
573 if isinstance(err.value, TimeoutError): 574 cmd, = err.value.args 575 dc = cmd.deviceConfig 576 msg = "Command timed out on device %s: %r" % (dc.device, cmd.command) 577 self.log.warning(msg) 578 self.sendEvent(dict(device=dc.device, 579 component="zencommand", 580 eventClass=cmd.eventClass, 581 eventKey=cmd.eventKey, 582 severity=cmd.severity, 583 summary=msg)) 584 else: 585 self.log.exception(err.value)
586
587 - def parseResults(self, cmd):
588 """ 589 Process the results of our command-line, send events 590 and check datapoints. 591 592 @param cmd: command 593 @type: cmd object 594 """ 595 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output) 596 results = ParsedResults() 597 try: 598 parser = getParser(cmd.parser) 599 except Exception, ex: 600 self.log.exception("Error loading parser %s" % cmd.parser) 601 import traceback 602 self.sendEvent(dict(device=cmd.deviceConfig.device, 603 summary="Error loading parser %s" % cmd.parser, 604 component="zencommand", 605 message=traceback.format_exc(), 606 agent="zencommand", 607 )) 608 return 609 parser.processResults(cmd, results) 610 611 for ev in results.events: 612 self.sendEvent(ev, device=cmd.deviceConfig.device) 613 614 for dp, value in results.values: 615 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath)) 616 value = self.rrd.save(dp.rrdPath, 617 value, 618 dp.rrdType, 619 dp.rrdCreateCommand, 620 cmd.cycleTime, 621 dp.rrdMin, 622 dp.rrdMax) 623 self.log.debug("RRD save result: %s" % value) 624 for ev in self.thresholds.check(dp.rrdPath, time.time(), value): 625 eventKey = cmd.getEventKey(dp) 626 if 'eventKey' in ev: 627 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey']) 628 else: 629 ev['eventKey'] = eventKey 630 ev['component'] = dp.component 631 self.sendEvent(ev)
632
633 - def fetchConfig(self):
634 def doFetchConfig(driver): 635 try: 636 now = time.time() 637 638 yield self.model().callRemote('propertyItems') 639 self.setPropertyItems(driver.next()) 640 641 yield self.model().callRemote('getDefaultRRDCreateCommand') 642 createCommand = driver.next() 643 644 yield self.model().callRemote('getThresholdClasses') 645 self.remote_updateThresholdClasses(driver.next()) 646 647 648 yield self.model().callRemote('getCollectorThresholds') 649 self.rrdStats.config(self.options.monitor, 650 self.name, 651 driver.next(), 652 createCommand) 653 654 devices = [] 655 if self.options.device: 656 devices = [self.options.device] 657 yield self.model().callRemote('getDataSourceCommands', 658 devices) 659 if not devices: 660 devices = list(Set([c.deviceConfig.device 661 for c in self.schedule])) 662 self.updateConfig(driver.next(), devices) 663 664 self.rrd = RRDUtil(createCommand, 60) 665 666 self.sendEvents( 667 self.rrdStats.gauge('configTime', 668 self.configCycleInterval * 60, 669 time.time() - now)) 670 671 except Exception, ex: 672 self.log.exception(ex) 673 raise
674 675 return drive(doFetchConfig) 676 677
678 - def start(self, driver):
679 """ 680 Fetch the configuration and return a deferred for its completion. 681 Also starts the config cycle 682 """ 683 ex = None 684 try: 685 self.log.debug('Fetching configuration from zenhub') 686 yield self.fetchConfig() 687 driver.next() 688 self.log.debug('Finished config fetch') 689 except Exception, ex: 690 self.log.exception(ex) 691 driveLater(self.configCycleInterval * 60, self.start) 692 if ex: 693 raise ex
694
695 - def buildOptions(self):
696 RRDDaemon.buildOptions(self) 697 698 self.parser.add_option('--parallel', dest='parallel', 699 default=10, type='int', 700 help="Number of devices to collect at one time")
701
702 - def connected(self):
703 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop) 704 if self.options.cycle: 705 d.addCallback(self.heartbeatCycle)
706 707 708 if __name__ == '__main__': 709 from Products.ZenRRD.zencommand import zencommand 710 z = zencommand() 711 z.run() 712