Package Products :: Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zencommand

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, 2009, 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  __doc__ = """ZenCommand 
 12   
 13  Run Command plugins periodically. 
 14   
 15  """ 
 16   
 17  import time 
 18  from pprint import pformat 
 19  import logging 
 20  log = logging.getLogger("zen.zencommand") 
 21  import traceback 
 22  from copy import copy 
 23   
 24  from twisted.internet import reactor, defer, error 
 25  from twisted.internet.protocol import ProcessProtocol 
 26  from twisted.python.failure import Failure 
 27   
 28  from twisted.spread import pb 
 29   
 30  import Globals 
 31  import zope.interface 
 32   
 33  from Products.ZenUtils.Utils import unused, getExitMessage 
 34  from Products.DataCollector.SshClient import SshClient 
 35  from Products.ZenEvents.ZenEventClasses import Clear, Cmd_Fail 
 36  from Products.ZenRRD.CommandParser import ParsedResults 
 37   
 38  from Products.ZenCollector.daemon import CollectorDaemon 
 39  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 40                                               IDataService,\ 
 41                                               IEventService,\ 
 42                                               IScheduledTask 
 43  from Products.ZenCollector.tasks import SimpleTaskFactory,\ 
 44                                          SubConfigurationTaskSplitter,\ 
 45                                          TaskStates, \ 
 46                                          BaseTask 
 47  from Products.ZenCollector.pools import getPool 
 48  from Products.ZenEvents import Event 
 49  from Products.ZenUtils.Executor import TwistedExecutor 
 50   
 51  from Products.DataCollector import Plugins 
 52  unused(Plugins) 
 53   
 54  MAX_CONNECTIONS = 250 
 55  MAX_BACK_OFF_MINUTES = 20 
 56   
 57  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 58  # connection. To do so, we need to import the class that will be used by the 
 59  # configuration service to send the data over, i.e. DeviceProxy. 
 60  from Products.ZenCollector.services.config import DeviceProxy 
 61  unused(DeviceProxy) 
 62   
 63  COLLECTOR_NAME = "zencommand" 
 64  POOL_NAME = 'SshConfigs' 
 65   
66 -class SshPerformanceCollectionPreferences(object):
67 zope.interface.implements(ICollectorPreferences) 68
69 - def __init__(self):
70 """ 71 Constructs a new SshPerformanceCollectionPreferences instance and 72 provides default values for needed attributes. 73 """ 74 self.collectorName = COLLECTOR_NAME 75 self.defaultRRDCreateCommand = None 76 self.configCycleInterval = 20 # minutes 77 self.cycleInterval = 5 * 60 # seconds 78 79 # The configurationService attribute is the fully qualified class-name 80 # of our configuration service that runs within ZenHub 81 self.configurationService = 'Products.ZenHub.services.CommandPerformanceConfig' 82 83 # Provide a reasonable default for the max number of tasks 84 self.maxTasks = 50 85 86 # Will be filled in based on buildOptions 87 self.options = None
88
89 - def buildOptions(self, parser):
90 parser.add_option('--showrawresults', 91 dest='showrawresults', 92 action="store_true", 93 default=False, 94 help="Show the raw RRD values. For debugging purposes only.") 95 96 parser.add_option('--maxbackoffminutes', 97 dest='maxbackoffminutes', 98 default=MAX_BACK_OFF_MINUTES, 99 type='int', 100 help="When a device fails to respond, increase the time to" \ 101 " check on the device until this limit.") 102 103 parser.add_option('--showfullcommand', 104 dest='showfullcommand', 105 action="store_true", 106 default=False, 107 help="Display the entire command and command-line arguments, " \ 108 " including any passwords.")
109
110 - def postStartup(self):
111 pass
112 113
114 -class SshPerCycletimeTaskSplitter(SubConfigurationTaskSplitter):
115 subconfigName = 'datasources' 116
117 - def makeConfigKey(self, config, subconfig):
118 return (config.id, subconfig.cycleTime, 'Remote' if subconfig.useSsh else 'Local')
119 120
121 -class TimeoutError(Exception):
122 """ 123 Error for a defered call taking too long to complete 124 """
125 - def __init__(self, *args):
126 Exception.__init__(self) 127 self.args = args
128 129
130 -def timeoutCommand(deferred, seconds, obj):
131 "Cause an error on a deferred when it is taking too long to complete" 132 133 def _timeout(deferred, obj): 134 "took too long... call an errback" 135 deferred.errback(Failure(TimeoutError(obj)))
136 137 def _cb(arg, timer): 138 "the command finished, possibly by timing out" 139 if not timer.called: 140 timer.cancel() 141 return arg 142 143 timer = reactor.callLater(seconds, _timeout, deferred, obj) 144 deferred.mytimer = timer 145 deferred.addBoth(_cb, timer) 146 return deferred 147 148
149 -class ProcessRunner(ProcessProtocol):
150 """ 151 Provide deferred process execution for a *single* command 152 """ 153 stopped = None 154 exitCode = None 155 output = '' 156 stderr = '' 157
158 - def start(self, cmd):
159 """ 160 Kick off the process: run it local 161 """ 162 log.debug('Running %s', cmd.command.split()[0]) 163 164 self._cmd = cmd 165 shell = '/bin/sh' 166 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 167 self.command = ' '.join(self.cmdline) 168 169 reactor.spawnProcess(self, shell, self.cmdline, env=cmd.env) 170 171 d = timeoutCommand(defer.Deferred(), cmd.deviceConfig.zCommandCommandTimeout, cmd) 172 self.stopped = d 173 self.stopped.addErrback(self.timeout) 174 return d
175
176 - def timeout(self, value):
177 """ 178 Kill a process gracefully if it takes too long 179 """ 180 try: 181 self.transport.signalProcess('INT') 182 reactor.callLater(2, self._reap) 183 except error.ProcessExitedAlready: 184 log.debug("Command already exited: %s", self.command.split()[0]) 185 return value
186
187 - def _reap(self):
188 """ 189 Kill a process forcefully if it takes too long 190 """ 191 try: 192 self.transport.signalProcess('KILL') 193 except Exception: 194 pass
195
196 - def outReceived(self, data):
197 """ 198 Store up the output as it arrives from the process 199 """ 200 self.output += data
201
202 - def errReceived(self, data):
203 """ 204 Store up the output as it arrives from the process 205 """ 206 self.stderr += data
207
208 - def processEnded(self, reason):
209 """ 210 Notify the starter that their process is complete 211 """ 212 self.exitCode = reason.value.exitCode 213 if self.exitCode is not None: 214 msg = """Datasource: %s Received exit code: %s Output:\n%r""" 215 data = [self._cmd.ds, self.exitCode, self.output] 216 if self.stderr: 217 msg += "\nStandard Error:\n%r" 218 data.append(self.stderr) 219 log.debug(msg, *data) 220 221 if self.stopped: 222 d, self.stopped = self.stopped, None 223 if not d.called: 224 d.callback(self)
225 226
227 -class MySshClient(SshClient):
228 """ 229 Connection to SSH server at the remote device 230 """
231 - def __init__(self, *args, **kw):
232 SshClient.__init__(self, *args, **kw) 233 self.defers = {} 234 self._taskList = set()
235
236 - def addCommand(self, command):
237 """ 238 Run a command against the server 239 """ 240 d = defer.Deferred() 241 self.defers[command] = d 242 SshClient.addCommand(self, command) 243 return d
244
245 - def addResult(self, command, data, code):
246 """ 247 Forward the results of the command execution to the starter 248 """ 249 # don't call the CollectorClient.addResult which adds the result to a 250 # member variable for zenmodeler 251 d = self.defers.pop(command, None) 252 if d is None: 253 log.error("Internal error where deferred object not in dictionary." \ 254 " Command = '%s' Data = '%s' Code = '%s'", 255 command.split()[0], data, code) 256 elif not d.called: 257 d.callback((data, code))
258
259 - def clientConnectionLost(self, connector, reason):
260 connection_description = '%s:*****@%s:%s' % (self.username, self.ip, self.port) 261 # Connection was lost, but could be because we just closed it. Not necessarily cause for concern. 262 log.debug("Connection %s lost." % connection_description) 263 pool = getPool('SSH Connections') 264 poolkey = hash((self.username, self.password, self.ip, self.port)) 265 if poolkey in pool: 266 # Clean it up so the next time around the task will get a new connection 267 log.debug("Deleting connection %s from pool." % connection_description) 268 del pool[poolkey]
269
270 - def check(self, ip, timeout=2):
271 """ 272 Turn off blocking SshClient.test method 273 """ 274 return True
275
276 - def clientFinished(self):
277 """ 278 We don't need to track commands/results when they complete 279 """ 280 SshClient.clientFinished(self) 281 self.cmdmap = {} 282 self._commands = [] 283 self.results = []
284
285 - def clientConnectionFailed(self, connector, reason):
286 """ 287 If we didn't connect let the modeler know 288 289 @param connector: connector associated with this failure 290 @type connector: object 291 @param reason: failure object 292 @type reason: object 293 """ 294 self.clientFinished() 295 message= reason.getErrorMessage() 296 for task in list(self._taskList): 297 task.connectionFailed(message)
298 299
300 -class SshOptions:
301 loginTries=1 302 searchPath='' 303 existenceTest=None 304
305 - def __init__(self, username, password, loginTimeout, commandTimeout, 306 keyPath, concurrentSessions):
307 self.username = username 308 self.password = password 309 self.loginTimeout=loginTimeout 310 self.commandTimeout=commandTimeout 311 self.keyPath = keyPath 312 self.concurrentSessions = concurrentSessions
313 314
315 -class SshRunner(object):
316 """ 317 Run a single command across a cached SSH connection 318 """ 319
320 - def __init__(self, connection):
321 self._connection = connection 322 self.exitCode = None 323 self.output = None 324 self.stderr = None
325
326 - def start(self, cmd):
327 "Initiate a command on the remote device" 328 self.defer = defer.Deferred(canceller=self._canceller) 329 try: 330 d = timeoutCommand(self._connection.addCommand(cmd.command), 331 self._connection.commandTimeout, 332 cmd) 333 except Exception, ex: 334 log.warning('Error starting command: %s', ex) 335 return defer.fail(ex) 336 d.addErrback(self.timeout) 337 d.addBoth(self.processEnded) 338 return d
339
340 - def _canceller(self, deferToCancel):
341 if not deferToCancel.mytimer.called: 342 deferToCancel.mytimer.cancel() 343 return None
344
345 - def timeout(self, arg):
346 "Deal with slow executing command/connection (close it)" 347 # We could send a kill signal, but then we would need to track 348 # the command channel to send it. Just close the connection. 349 return arg
350
351 - def processEnded(self, value):
352 "Deliver ourselves to the starter with the proper attributes" 353 if isinstance(value, Failure): 354 return value 355 self.output, self.exitCode = value 356 return self
357 358
359 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
360 id = '' 361 component = '' 362 rrdPath = '' 363 rrdType = None 364 rrdCreateCommand = '' 365 rrdMin = None 366 rrdMax = None 367
368 - def __init__(self):
369 self.data = {}
370
371 - def __repr__(self):
372 return pformat((self.data, self.id))
373 374 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 375
376 -class Cmd(pb.Copyable, pb.RemoteCopy):
377 """ 378 Holds the config of every command to be run 379 """ 380 device = '' 381 command = None 382 ds = '' 383 useSsh = False 384 cycleTime = None 385 eventClass = None 386 eventKey = None 387 severity = 3 388 lastStart = 0 389 lastStop = 0 390 result = None 391 env = None 392
393 - def __init__(self):
394 self.points = []
395
396 - def processCompleted(self, pr):
397 """ 398 Return back the datasource with the ProcessRunner/SshRunner stored in 399 the the 'result' attribute. 400 """ 401 self.result = pr 402 self.lastStop = time.time() 403 404 # Check for a condition that could cause zencommand to stop cycling. 405 # http://dev.zenoss.org/trac/ticket/4936 406 if self.lastStop < self.lastStart: 407 log.debug('System clock went back?') 408 self.lastStop = self.lastStart 409 410 if isinstance(pr, Failure): 411 return pr 412 413 log.debug('Process %s stopped (%s), %.2f seconds elapsed', 414 self.name, 415 pr.exitCode, 416 self.lastStop - self.lastStart) 417 return self
418
419 - def getEventKey(self, point):
420 # fetch datapoint name from filename path and add it to the event key 421 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
422
423 - def commandKey(self):
424 "Provide a value that establishes the uniqueness of this command" 425 return '%'.join(map(str, [self.useSsh, self.cycleTime, 426 self.severity, self.command]))
427 - def __str__(self):
428 return ' '.join(map(str, [ 429 self.ds, 430 'useSSH=%s' % self.useSsh, 431 self.cycleTime, 432 ]))
433 434 pb.setUnjellyableForClass(Cmd, Cmd) 435 436 437 STATUS_EVENT = { 'eventClass' : Cmd_Fail, 438 'component' : 'command', 439 } 440
441 -class SshPerformanceCollectionTask(BaseTask):
442 """ 443 A task that performs periodic performance collection for devices providing 444 data via SSH connections. 445 """ 446 zope.interface.implements(IScheduledTask) 447 448 STATE_CONNECTING = 'CONNECTING' 449 STATE_FETCH_DATA = 'FETCH_DATA' 450 STATE_PARSE_DATA = 'PARSING_DATA' 451 STATE_STORE_PERF = 'STORE_PERF_DATA' 452
453 - def __init__(self, 454 taskName, 455 configId, 456 scheduleIntervalSeconds, 457 taskConfig):
458 """ 459 @param taskName: the unique identifier for this task 460 @type taskName: string 461 @param configId: configuration to watch 462 @type configId: string 463 @param scheduleIntervalSeconds: the interval at which this task will be 464 collected 465 @type scheduleIntervalSeconds: int 466 @param taskConfig: the configuration for this task 467 """ 468 super(SshPerformanceCollectionTask, self).__init__( 469 taskName, configId, 470 scheduleIntervalSeconds, taskConfig 471 ) 472 473 # Needed for interface 474 self.name = taskName 475 self.configId = configId 476 self.state = TaskStates.STATE_IDLE 477 self.interval = scheduleIntervalSeconds 478 479 # The taskConfig corresponds to a DeviceProxy 480 self._device = taskConfig 481 482 self._devId = self._device.id 483 self._manageIp = self._device.manageIp 484 485 self._dataService = zope.component.queryUtility(IDataService) 486 self._eventService = zope.component.queryUtility(IEventService) 487 488 self._preferences = zope.component.queryUtility(ICollectorPreferences, 489 COLLECTOR_NAME) 490 self._lastErrorMsg = '' 491 492 self._maxbackoffseconds = self._preferences.options.maxbackoffminutes * 60 493 494 self._concurrentSessions = taskConfig.zSshConcurrentSessions 495 self._executor = TwistedExecutor(self._concurrentSessions) 496 self._useSsh = taskConfig.datasources[0].useSsh 497 self._connection = None 498 499 self._datasources = taskConfig.datasources 500 self.pool = getPool('SSH Connections') 501 self.executed = 0
502
503 - def __str__(self):
504 return "COMMAND schedule Name: %s configId: %s Datasources: %d" % ( 505 self.name, self.configId, len(self._datasources))
506
507 - def cleanup(self):
508 self._cleanUpPool() 509 self._close()
510
511 - def _getPoolKey(self):
512 """ 513 Get the key under which the client should be stored in the pool. 514 """ 515 username = self._device.zCommandUsername 516 password = self._device.zCommandPassword 517 ip = self._manageIp 518 port = self._device.zCommandPort 519 return hash((username, password, ip, port))
520
521 - def _cleanUpPool(self):
522 """ 523 Close the connection currently associated with this task. 524 """ 525 poolkey = self._getPoolKey() 526 if poolkey in self.pool: 527 client = self.pool[poolkey] 528 tasklist = client._taskList 529 if not tasklist: 530 # No other tasks, so safe to clean up 531 transport = client.transport 532 if transport: 533 transport.loseConnection() 534 del self.pool[poolkey]
535
536 - def doTask(self):
537 """ 538 Contact to one device and return a deferred which gathers data from 539 the device. 540 541 @return: Deferred actions to run against a device configuration 542 @rtype: Twisted deferred object 543 """ 544 # See if we need to connect first before doing any collection 545 d = defer.maybeDeferred(self._connect) 546 d.addCallbacks(self._connectCallback, self._failure) 547 d.addCallback(self._fetchPerf) 548 549 # Call _finished for both success and error scenarios 550 d.addBoth(self._finished) 551 552 # Wait until the Deferred actually completes 553 return d
554
555 - def _connect(self):
556 """ 557 If a local datasource executor, do nothing. 558 559 If an SSH datasource executor, create a connection to object the remote device. 560 Make a new SSH connection object if there isn't one available. This doesn't 561 actually connect to the device. 562 """ 563 if not self._useSsh: 564 return defer.succeed(None) 565 566 connection = self.pool.get(self._getPoolKey(), None) 567 if connection is None: 568 self.state = SshPerformanceCollectionTask.STATE_CONNECTING 569 log.debug("Creating connection object to %s", self._devId) 570 username = self._device.zCommandUsername 571 password = self._device.zCommandPassword 572 loginTimeout = self._device.zCommandLoginTimeout 573 commandTimeout = self._device.zCommandCommandTimeout 574 keypath = self._device.zKeyPath 575 options = SshOptions(username, password, 576 loginTimeout, commandTimeout, 577 keypath, self._concurrentSessions) 578 579 connection = MySshClient(self._devId, self._manageIp, 580 self._device.zCommandPort, options=options) 581 connection.sendEvent = self._eventService.sendEvent 582 583 self.pool[self._getPoolKey()] = connection 584 585 # Opens SSH connection to device 586 connection.run() 587 588 self._connection = connection 589 self._connection._taskList.add(self) 590 return connection
591
592 - def _close(self):
593 """ 594 If a local datasource executor, do nothing. 595 596 If an SSH datasource executor, relinquish a connection to the remote device. 597 """ 598 if self._connection: 599 self._connection._taskList.discard(self) 600 if not self._connection._taskList: 601 if self._getPoolKey() in self.pool: 602 client = self.pool[self._getPoolKey()] 603 client.clientFinished() 604 client.channelClosed() 605 self._connection = None
606
607 - def connectionFailed(self, msg):
608 """ 609 This method is called by the SSH client when the connection fails. 610 611 @parameter msg: message indicating the cause of the problem 612 @type msg: string 613 """ 614 # Note: Raising an exception and then catching it doesn't work 615 # as it appears that the exception is discarded in PBDaemon.py 616 self.state = TaskStates.STATE_PAUSED 617 log.error("Pausing task %s as %s [%s] connection failure: %s", 618 self.name, self._devId, self._manageIp, msg) 619 self._eventService.sendEvent(STATUS_EVENT, 620 device=self._devId, 621 summary=msg, 622 component=COLLECTOR_NAME, 623 severity=Event.Error) 624 self._commandsToExecute.cancel()
625
626 - def _failure(self, reason):
627 """ 628 Twisted errBack to log the exception for a single device. 629 630 @parameter reason: explanation of the failure 631 @type reason: Twisted error instance 632 """ 633 # Decode the exception 634 if isinstance(reason.value, TimeoutError): 635 cmd, = reason.value.args 636 msg = "Command timed out on device %s: %r" % ( 637 self._devId, cmd.command.split()[0]) 638 log.warning(msg) 639 ev = self._makeCmdEvent(cmd, cmd.severity, msg) 640 self._eventService.sendEvent(ev) 641 642 # Don't log a traceback by not returning a result 643 reason = None 644 645 elif isinstance(reason.value, defer.CancelledError): 646 # The actual issue is logged by connectionFailed 647 # Don't log a traceback by not returning a result 648 msg = "Task %s paused due to connection error" % self.name 649 reason = None 650 651 else: 652 msg = reason.getErrorMessage() 653 if not msg: # Sometimes we get blank error messages 654 msg = reason.__class__ 655 msg = '%s %s' % (self._devId, msg) 656 # Leave 'reason' alone to generate a traceback 657 658 if self._lastErrorMsg != msg: 659 self._lastErrorMsg = msg 660 if msg: 661 log.error(msg) 662 663 if reason: 664 self._eventService.sendEvent(STATUS_EVENT, 665 device=self._devId, 666 summary=msg, 667 severity=Event.Error) 668 669 if self._useSsh: 670 self._delayNextCheck() 671 672 return reason
673
674 - def _connectCallback(self, result):
675 """ 676 Callback called after a successful connect to the remote device. 677 """ 678 if self._useSsh: 679 log.debug("Connected to %s [%s]", self._devId, self._manageIp) 680 else: 681 log.debug("Running command(s) locally") 682 return result
683
684 - def _addDatasource(self, datasource):
685 """ 686 Add a new instantiation of ProcessRunner or SshRunner 687 for every datasource. 688 """ 689 if self._preferences.options.showfullcommand: 690 log.info("Datasource %s command: %s", datasource.name, 691 datasource.command) 692 693 if self._useSsh: 694 runner = SshRunner(self._connection) 695 else: 696 runner = ProcessRunner() 697 698 d = runner.start(datasource) 699 datasource.lastStart = time.time() 700 d.addBoth(datasource.processCompleted) 701 return d
702
703 - def _fetchPerf(self, ignored):
704 """ 705 Get performance data for all the monitored components on a device 706 707 @parameter ignored: required to keep Twisted's callback chain happy 708 @type ignored: result of previous callback 709 """ 710 self.state = SshPerformanceCollectionTask.STATE_FETCH_DATA 711 712 # The keys are the datasource commands, which are by definition unique 713 # to the command run. 714 cacheableDS = {} 715 716 # Bundle up the list of tasks 717 deferredCmds = [] 718 for datasource in self._datasources: 719 datasource.deviceConfig = self._device 720 721 if datasource.command in cacheableDS: 722 cacheableDS[datasource.command].append(datasource) 723 continue 724 cacheableDS[datasource.command] = [] 725 726 task = self._executor.submit(self._addDatasource, datasource) 727 deferredCmds.append(task) 728 729 # Run the tasks 730 dl = defer.DeferredList(deferredCmds, consumeErrors=True) 731 dl.addCallback(self._parseResults, cacheableDS) 732 dl.addCallback(self._storeResults) 733 dl.addCallback(self._updateStatus) 734 735 # Save the list in case we need to cancel the commands 736 self._commandsToExecute = dl 737 return dl
738
739 - def _parseResults(self, resultList, cacheableDS):
740 """ 741 Interpret the results retrieved from the commands and pass on 742 the datapoint values and events. 743 744 @parameter resultList: results of running the commands in a DeferredList 745 @type resultList: array of (boolean, datasource) 746 @parameter cacheableDS: other datasources that can use the same results 747 @type cacheableDS: dictionary of arrays of datasources 748 """ 749 self.state = SshPerformanceCollectionTask.STATE_PARSE_DATA 750 parseableResults = [] 751 for success, datasource in resultList: 752 results = ParsedResults() 753 if not success: 754 # In this case, our datasource is actually a defer.Failure 755 reason = datasource 756 datasource, = reason.value.args 757 msg = "Datasource %s command timed out" % ( 758 datasource.name) 759 ev = self._makeCmdEvent(datasource, msg) 760 results.events.append(ev) 761 762 else: 763 # Re-use our results for any similar datasources 764 cachedDsList = cacheableDS.get(datasource.command) 765 if cachedDsList: 766 for ds in cachedDsList: 767 ds.result = copy(datasource.result) 768 results = ParsedResults() 769 self._processDatasourceResults(ds, results) 770 parseableResults.append( (ds, results) ) 771 results = ParsedResults() 772 773 self._processDatasourceResults(datasource, results) 774 775 parseableResults.append( (datasource, results) ) 776 return parseableResults
777
778 - def _makeParser(self, datasource, eventList):
779 """ 780 Create a parser object to process data 781 782 @parameter datasource: datasource containg information 783 @type datasource: Cmd object 784 @parameter eventList: list of events 785 @type eventList: list of dictionaries 786 """ 787 parser = None 788 try: 789 parser = datasource.parser.create() 790 except Exception: 791 msg = "Error loading parser %s" % datasource.parser 792 log.exception("%s %s %s", self.name, datasource.name, msg) 793 ev = self._makeCmdEvent(datasource, msg) 794 ev['message'] = traceback.format_exc() 795 eventList.append(ev) 796 return parser
797
798 - def _processDatasourceResults(self, datasource, results):
799 """ 800 Process a single datasource's results 801 802 @parameter datasource: datasource containg information 803 @type datasource: Cmd object 804 @parameter results: empty results object 805 @type results: ParsedResults object 806 """ 807 showcommand = self._preferences.options.showfullcommand 808 result = datasource.result 809 if result.exitCode == 0 and not result.output.strip(): 810 msg = "No data returned for command" 811 if showcommand: 812 msg += ": %s" % datasource.command 813 log.warn("%s %s %s", self.name, datasource.name, msg) 814 ev = self._makeCmdEvent(datasource, msg) 815 if showcommand: 816 ev['command'] = datasource.command 817 results.events.append(ev) 818 return 819 820 parser = self._makeParser(datasource, results.events) 821 if not parser: 822 return 823 824 try: 825 parser.preprocessResults(datasource, log) 826 parser.processResults(datasource, results) 827 if not results.events and parser.createDefaultEventUsingExitCode: 828 # Add a failsafe event guessing at the error codes 829 self._addDefaultEvent(datasource, results) 830 if datasource.result.stderr: 831 self._addStderrMsg(datasource.result.stderr, 832 results.events) 833 except Exception: 834 msg = "Error running parser %s" % datasource.parser 835 log.exception("%s %s %s", self.name, datasource.name, msg) 836 ev = self._makeCmdEvent(datasource, msg) 837 ev['message'] = traceback.format_exc() 838 ev['output'] = datasource.result.output 839 results.events.append(ev)
840
841 - def _addDefaultEvent(self, datasource, results):
842 """ 843 If there is no event, send one based on the exit code. 844 """ 845 exitCode = datasource.result.exitCode 846 if exitCode == 0: 847 msg = '' 848 severity = 0 849 else: 850 msg = 'Datasource: %s - Code: %s - Msg: %s' % ( 851 datasource.name, exitCode, getExitMessage(exitCode)) 852 severity = datasource.severity 853 854 ev = self._makeCmdEvent(datasource, msg, severity) 855 results.events.append(ev)
856
857 - def _addStderrMsg(self, stderrMsg, eventList):
858 """ 859 Add the stderr output to error events. 860 861 @parameter stderrMsg: stderr output from the command 862 @type stderrMsg: string 863 @parameter eventList: list of events 864 @type eventList: list of dictionaries 865 """ 866 for event in eventList: 867 if event['severity'] not in ('Clear', 'Info', 'Debug'): 868 event['stderr'] = stderrMsg
869
870 - def _storeResults(self, resultList):
871 """ 872 Store the values in RRD files 873 874 @parameter resultList: results of running the commands 875 @type resultList: array of (datasource, dictionary) 876 """ 877 self.state = SshPerformanceCollectionTask.STATE_STORE_PERF 878 for datasource, results in resultList: 879 for dp, value in results.values: 880 threshData = { 881 'eventKey': datasource.getEventKey(dp), 882 'component': dp.component, 883 } 884 self._dataService.writeRRD( 885 dp.rrdPath, 886 value, 887 dp.rrdType, 888 dp.rrdCreateCommand, 889 datasource.cycleTime, 890 dp.rrdMin, 891 dp.rrdMax, 892 threshData) 893 894 return resultList
895
896 - def _updateStatus(self, resultList):
897 """ 898 Send any accumulated events 899 900 @parameter resultList: results of running the commands 901 @type resultList: array of (datasource, dictionary) 902 """ 903 for datasource, results in resultList: 904 self._clearEvent(datasource, results.events) 905 for ev in results.events: 906 self._eventService.sendEvent(ev, device=self._devId) 907 return resultList
908
909 - def _clearEvent(self, datasource, eventList):
910 """ 911 Ensure that a CLEAR event is sent for any command that 912 successfully completes. 913 """ 914 # If the result is a Failure, no exitCode exists 915 exitCode = getattr(datasource.result, 'exitCode', -1) 916 # Don't send if non-zero exit code or no output returned 917 if exitCode is None or exitCode != 0 or not datasource.result.output.strip(): 918 return 919 920 clearEvents = [ev for ev in eventList if ev['severity'] == Clear] 921 if not clearEvents: 922 msg = 'Datasource %s command completed successfully' % ( 923 datasource.name) 924 ev = self._makeCmdEvent(datasource, msg, severity=Clear) 925 eventList.append(ev)
926
927 - def _makeCmdEvent(self, datasource, msg, severity=None):
928 """ 929 Create an event using the info in the Cmd object. 930 """ 931 severity = datasource.severity if severity is None else severity 932 ev = dict( 933 device=self._devId, 934 component=datasource.component, 935 eventClass=datasource.eventClass, 936 eventKey=datasource.eventKey, 937 severity=severity, 938 summary=msg 939 ) 940 return ev
941
942 - def _finished(self, result):
943 """ 944 Callback activated when the task is complete 945 946 @parameter result: results of the task 947 @type result: deferred object 948 """ 949 if not isinstance(result, Failure): 950 self._returnToNormalSchedule() 951 952 try: 953 self._close() 954 except Exception, ex: 955 log.warn("Failed to close device %s: error %s" % 956 (self._devId, str(ex))) 957 958 # Return the result so the framework can track success/failure 959 return result
960
961 - def displayStatistics(self):
962 """ 963 Called by the collector framework scheduler, and allows us to 964 see how each task is doing. 965 """ 966 display = "%s useSSH: %s\n" % ( 967 self.name, self._useSsh) 968 if self._lastErrorMsg: 969 display += "%s\n" % self._lastErrorMsg 970 return display
971 972 973 if __name__ == '__main__': 974 # Required for passing classes from zenhub to here 975 from Products.ZenRRD.zencommand import Cmd, DataPointConfig 976 977 myPreferences = SshPerformanceCollectionPreferences() 978 myTaskFactory = SimpleTaskFactory(SshPerformanceCollectionTask) 979 myTaskSplitter = SshPerCycletimeTaskSplitter(myTaskFactory) 980 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 981 daemon.run() 982