| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2009, 2010 Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 or (at your
8 # option) any later version as published by the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import random
21 import time
22 from pprint import pformat
23 import logging
24 log = logging.getLogger("zen.zencommand")
25 import traceback
26 from copy import copy
27
28 from twisted.internet import reactor, defer, error
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.python.failure import Failure
31
32 from twisted.spread import pb
33
34 import Globals
35 import zope.interface
36
37 from Products.ZenUtils.Utils import unused, getExitMessage, readable_time
38 from Products.DataCollector.SshClient import SshClient
39 from Products.ZenEvents.ZenEventClasses import Clear, Error, Cmd_Fail, Cmd_Ok
40 from Products.ZenRRD.CommandParser import ParsedResults
41
42 from Products.ZenCollector.daemon import CollectorDaemon
43 from Products.ZenCollector.interfaces import ICollectorPreferences,\
44 IDataService,\
45 IEventService,\
46 IScheduledTask
47 from Products.ZenCollector.tasks import SimpleTaskFactory,\
48 SubConfigurationTaskSplitter,\
49 TaskStates, \
50 BaseTask
51 from Products.ZenCollector.pools import getPool
52 from Products.ZenEvents import Event
53 from Products.ZenUtils.Executor import TwistedExecutor
54
55 from Products.DataCollector import Plugins
56 unused(Plugins)
57
58 MAX_CONNECTIONS = 250
59 MAX_BACK_OFF_MINUTES = 20
60
61 # We retrieve our configuration data remotely via a Twisted PerspectiveBroker
62 # connection. To do so, we need to import the class that will be used by the
63 # configuration service to send the data over, i.e. DeviceProxy.
64 from Products.ZenCollector.services.config import DeviceProxy
65 unused(DeviceProxy)
66
67 COLLECTOR_NAME = "zencommand"
68 POOL_NAME = 'SshConfigs'
69
71 zope.interface.implements(ICollectorPreferences)
72
74 """
75 Constructs a new SshPerformanceCollectionPreferences instance and
76 provides default values for needed attributes.
77 """
78 self.collectorName = COLLECTOR_NAME
79 self.defaultRRDCreateCommand = None
80 self.configCycleInterval = 20 # minutes
81 self.cycleInterval = 5 * 60 # seconds
82
83 # The configurationService attribute is the fully qualified class-name
84 # of our configuration service that runs within ZenHub
85 self.configurationService = 'Products.ZenHub.services.CommandPerformanceConfig'
86
87 # Provide a reasonable default for the max number of tasks
88 self.maxTasks = 50
89
90 # Will be filled in based on buildOptions
91 self.options = None
92
94 parser.add_option('--showrawresults',
95 dest='showrawresults',
96 action="store_true",
97 default=False,
98 help="Show the raw RRD values. For debugging purposes only.")
99
100 parser.add_option('--maxbackoffminutes',
101 dest='maxbackoffminutes',
102 default=MAX_BACK_OFF_MINUTES,
103 help="When a device fails to respond, increase the time to" \
104 " check on the device until this limit.")
105
106 parser.add_option('--showfullcommand',
107 dest='showfullcommand',
108 action="store_true",
109 default=False,
110 help="Display the entire command and command-line arguments, " \
111 " including any passwords.")
112
115
116
122
123
131
132
134 "Cause an error on a deferred when it is taking too long to complete"
135
136 def _timeout(deferred, obj):
137 "took too long... call an errback"
138 deferred.errback(Failure(TimeoutError(obj)))
139
140 def _cb(arg, timer):
141 "the command finished, possibly by timing out"
142 if not timer.called:
143 timer.cancel()
144 return arg
145
146 timer = reactor.callLater(seconds, _timeout, deferred, obj)
147 deferred.mytimer = timer
148 deferred.addBoth(_cb, timer)
149 return deferred
150
151
153 """
154 Provide deferred process execution for a *single* command
155 """
156 stopped = None
157 exitCode = None
158 output = ''
159 stderr = ''
160
162 """
163 Kick off the process: run it local
164 """
165 log.debug('Running %s', cmd.command.split()[0])
166
167 self._cmd = cmd
168 shell = '/bin/sh'
169 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
170 self.command = ' '.join(self.cmdline)
171
172 reactor.spawnProcess(self, shell, self.cmdline, env=cmd.env)
173
174 d = timeoutCommand(defer.Deferred(), cmd.deviceConfig.zCommandCommandTimeout, cmd)
175 self.stopped = d
176 self.stopped.addErrback(self.timeout)
177 return d
178
180 """
181 Kill a process if it takes too long
182 """
183 try:
184 self.transport.signalProcess('KILL')
185 except error.ProcessExitedAlready:
186 log.debug("Command already exited: %s", self.command.split()[0])
187 return value
188
194
200
202 """
203 Notify the starter that their process is complete
204 """
205 self.exitCode = reason.value.exitCode
206 if self.exitCode is not None:
207 msg = """Datasource: %s Received exit code: %s Output:\n%r"""
208 data = [self._cmd.ds, self.exitCode, self.output]
209 if self.stderr:
210 msg += "\nStandard Error:\n%r"
211 data.append(self.stderr)
212 log.debug(msg, *data)
213
214 if self.stopped:
215 d, self.stopped = self.stopped, None
216 if not d.called:
217 d.callback(self)
218
219
221 """
222 Connection to SSH server at the remote device
223 """
228
230 """
231 Run a command against the server
232 """
233 d = defer.Deferred()
234 self.defers[command] = d
235 SshClient.addCommand(self, command)
236 return d
237
239 """
240 Forward the results of the command execution to the starter
241 """
242 # don't call the CollectorClient.addResult which adds the result to a
243 # member variable for zenmodeler
244 d = self.defers.pop(command, None)
245 if d is None:
246 log.error("Internal error where deferred object not in dictionary." \
247 " Command = '%s' Data = '%s' Code = '%s'",
248 command.split()[0], data, code)
249 elif not d.called:
250 d.callback((data, code))
251
253 connection_description = '%s:*****@%s:%s' % (self.username, self.ip, self.port)
254 # Connection was lost, but could be because we just closed it. Not necessarily cause for concern.
255 log.debug("Connection %s lost." % connection_description)
256 pool = getPool('SSH Connections')
257 poolkey = hash((self.username, self.password, self.ip, self.port))
258 if poolkey in pool:
259 # Clean it up so the next time around the task will get a new connection
260 log.debug("Deleting connection %s from pool." % connection_description)
261 del pool[poolkey]
262
268
270 """
271 We don't need to track commands/results when they complete
272 """
273 SshClient.clientFinished(self)
274 self.cmdmap = {}
275 self._commands = []
276 self.results = []
277
279 """
280 If we didn't connect let the modeler know
281
282 @param connector: connector associated with this failure
283 @type connector: object
284 @param reason: failure object
285 @type reason: object
286 """
287 self.clientFinished()
288 message= reason.getErrorMessage()
289 for task in list(self._taskList):
290 task.connectionFailed(message)
291
292
294 loginTries=1
295 searchPath=''
296 existenceTest=None
297
298 - def __init__(self, username, password, loginTimeout, commandTimeout,
299 keyPath, concurrentSessions):
300 self.username = username
301 self.password = password
302 self.loginTimeout=loginTimeout
303 self.commandTimeout=commandTimeout
304 self.keyPath = keyPath
305 self.concurrentSessions = concurrentSessions
306
307
309 """
310 Run a single command across a cached SSH connection
311 """
312
314 self._connection = connection
315 self.exitCode = None
316 self.output = None
317 self.stderr = None
318
320 "Initiate a command on the remote device"
321 self.defer = defer.Deferred(canceller=self._canceller)
322 try:
323 d = timeoutCommand(self._connection.addCommand(cmd.command),
324 self._connection.commandTimeout,
325 cmd)
326 except Exception, ex:
327 log.warning('Error starting command: %s', ex)
328 return defer.fail(ex)
329 d.addErrback(self.timeout)
330 d.addBoth(self.processEnded)
331 return d
332
337
339 "Deal with slow executing command/connection (close it)"
340 # We could send a kill signal, but then we would need to track
341 # the command channel to send it. Just close the connection.
342 return arg
343
350
351
353 id = ''
354 component = ''
355 rrdPath = ''
356 rrdType = None
357 rrdCreateCommand = ''
358 rrdMin = None
359 rrdMax = None
360
362 self.data = {}
363
366
367 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
368
370 """
371 Holds the config of every command to be run
372 """
373 device = ''
374 command = None
375 ds = ''
376 useSsh = False
377 cycleTime = None
378 eventClass = None
379 eventKey = None
380 severity = 3
381 lastStart = 0
382 lastStop = 0
383 result = None
384 env = None
385
388
390 """
391 Return back the datasource with the ProcessRunner/SshRunner stored in
392 the the 'result' attribute.
393 """
394 self.result = pr
395 self.lastStop = time.time()
396
397 # Check for a condition that could cause zencommand to stop cycling.
398 # http://dev.zenoss.org/trac/ticket/4936
399 if self.lastStop < self.lastStart:
400 log.debug('System clock went back?')
401 self.lastStop = self.lastStart
402
403 if isinstance(pr, Failure):
404 return pr
405
406 log.debug('Process %s stopped (%s), %.2f seconds elapsed',
407 self.name,
408 pr.exitCode,
409 self.lastStop - self.lastStart)
410 return self
411
413 # fetch datapoint name from filename path and add it to the event key
414 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
415
417 "Provide a value that establishes the uniqueness of this command"
418 return '%'.join(map(str, [self.useSsh, self.cycleTime,
419 self.severity, self.command]))
426
427 pb.setUnjellyableForClass(Cmd, Cmd)
428
429
430 STATUS_EVENT = { 'eventClass' : Cmd_Fail,
431 'component' : 'command',
432 }
433
435 """
436 A task that performs periodic performance collection for devices providing
437 data via SSH connections.
438 """
439 zope.interface.implements(IScheduledTask)
440
441 STATE_CONNECTING = 'CONNECTING'
442 STATE_FETCH_DATA = 'FETCH_DATA'
443 STATE_PARSE_DATA = 'PARSING_DATA'
444 STATE_STORE_PERF = 'STORE_PERF_DATA'
445
451 """
452 @param taskName: the unique identifier for this task
453 @type taskName: string
454 @param configId: configuration to watch
455 @type configId: string
456 @param scheduleIntervalSeconds: the interval at which this task will be
457 collected
458 @type scheduleIntervalSeconds: int
459 @param taskConfig: the configuration for this task
460 """
461 super(SshPerformanceCollectionTask, self).__init__(
462 taskName, configId,
463 scheduleIntervalSeconds, taskConfig
464 )
465
466 # Needed for interface
467 self.name = taskName
468 self.configId = configId
469 self.state = TaskStates.STATE_IDLE
470 self.interval = scheduleIntervalSeconds
471
472 # The taskConfig corresponds to a DeviceProxy
473 self._device = taskConfig
474
475 self._devId = self._device.id
476 self._manageIp = self._device.manageIp
477
478 self._dataService = zope.component.queryUtility(IDataService)
479 self._eventService = zope.component.queryUtility(IEventService)
480
481 self._preferences = zope.component.queryUtility(ICollectorPreferences,
482 COLLECTOR_NAME)
483 self._lastErrorMsg = ''
484
485 self._maxbackoffseconds = self._preferences.options.maxbackoffminutes * 60
486
487 self._concurrentSessions = taskConfig.zSshConcurrentSessions
488 self._executor = TwistedExecutor(self._concurrentSessions)
489 self._useSsh = taskConfig.datasources[0].useSsh
490 self._connection = None
491
492 self._datasources = taskConfig.datasources
493 self.pool = getPool('SSH Connections')
494 self.executed = 0
495
497 return "COMMAND schedule Name: %s configId: %s Datasources: %d" % (
498 self.name, self.configId, len(self._datasources))
499
503
505 """
506 Get the key under which the client should be stored in the pool.
507 """
508 username = self._device.zCommandUsername
509 password = self._device.zCommandPassword
510 ip = self._manageIp
511 port = self._device.zCommandPort
512 return hash((username, password, ip, port))
513
515 """
516 Close the connection currently associated with this task.
517 """
518 poolkey = self._getPoolKey()
519 if poolkey in self.pool:
520 client = self.pool[poolkey]
521 tasklist = client._taskList
522 if not tasklist:
523 # No other tasks, so safe to clean up
524 transport = client.transport
525 if transport:
526 transport.loseConnection()
527 del self.pool[poolkey]
528
530 """
531 Contact to one device and return a deferred which gathers data from
532 the device.
533
534 @return: Deferred actions to run against a device configuration
535 @rtype: Twisted deferred object
536 """
537 # See if we need to connect first before doing any collection
538 d = defer.maybeDeferred(self._connect)
539 d.addCallbacks(self._connectCallback, self._failure)
540 d.addCallback(self._fetchPerf)
541
542 # Call _finished for both success and error scenarios
543 d.addBoth(self._finished)
544
545 # Wait until the Deferred actually completes
546 return d
547
549 """
550 If a local datasource executor, do nothing.
551
552 If an SSH datasource executor, create a connection to object the remote device.
553 Make a new SSH connection object if there isn't one available. This doesn't
554 actually connect to the device.
555 """
556 if not self._useSsh:
557 return defer.succeed(None)
558
559 connection = self.pool.get(self._getPoolKey(), None)
560 if connection is None:
561 self.state = SshPerformanceCollectionTask.STATE_CONNECTING
562 log.debug("Creating connection object to %s", self._devId)
563 username = self._device.zCommandUsername
564 password = self._device.zCommandPassword
565 loginTimeout = self._device.zCommandLoginTimeout
566 commandTimeout = self._device.zCommandCommandTimeout
567 keypath = self._device.zKeyPath
568 options = SshOptions(username, password,
569 loginTimeout, commandTimeout,
570 keypath, self._concurrentSessions)
571
572 connection = MySshClient(self._devId, self._manageIp,
573 self._device.zCommandPort, options=options)
574 connection.sendEvent = self._eventService.sendEvent
575
576 self.pool[self._getPoolKey()] = connection
577
578 # Opens SSH connection to device
579 connection.run()
580
581 self._connection = connection
582 self._connection._taskList.add(self)
583 return connection
584
586 """
587 If a local datasource executor, do nothing.
588
589 If an SSH datasource executor, relinquish a connection to the remote device.
590 """
591 if self._connection:
592 self._connection._taskList.discard(self)
593 if not self._connection._taskList:
594 if self._getPoolKey() in self.pool:
595 client = self.pool[self._getPoolKey()]
596 client.clientFinished()
597 client.channelClosed()
598 self._connection = None
599
601 """
602 This method is called by the SSH client when the connection fails.
603
604 @parameter msg: message indicating the cause of the problem
605 @type msg: string
606 """
607 # Note: Raising an exception and then catching it doesn't work
608 # as it appears that the exception is discarded in PBDaemon.py
609 self.state = TaskStates.STATE_PAUSED
610 log.error("Pausing task %s as %s [%s] connection failure: %s",
611 self.name, self._devId, self._manageIp, msg)
612 self._eventService.sendEvent(STATUS_EVENT,
613 device=self._devId,
614 summary=msg,
615 component=COLLECTOR_NAME,
616 severity=Event.Error)
617 self._commandsToExecute.cancel()
618
620 """
621 Twisted errBack to log the exception for a single device.
622
623 @parameter reason: explanation of the failure
624 @type reason: Twisted error instance
625 """
626 # Decode the exception
627 if isinstance(reason.value, TimeoutError):
628 cmd, = reason.value.args
629 msg = "Command timed out on device %s: %r" % (
630 self._devId, cmd.command.split()[0])
631 log.warning(msg)
632 ev = self._makeCmdEvent(cmd, cmd.severity, msg)
633 self._eventService.sendEvent(ev)
634
635 # Don't log a traceback by not returning a result
636 reason = None
637
638 elif isinstance(reason.value, defer.CancelledError):
639 # The actual issue is logged by connectionFailed
640 # Don't log a traceback by not returning a result
641 msg = "Task %s paused due to connection error" % self.name
642 reason = None
643
644 else:
645 msg = reason.getErrorMessage()
646 if not msg: # Sometimes we get blank error messages
647 msg = reason.__class__
648 msg = '%s %s' % (self._devId, msg)
649 # Leave 'reason' alone to generate a traceback
650
651 if self._lastErrorMsg != msg:
652 self._lastErrorMsg = msg
653 if msg:
654 log.error(msg)
655
656 if reason:
657 self._eventService.sendEvent(STATUS_EVENT,
658 device=self._devId,
659 summary=msg,
660 severity=Event.Error)
661
662 if self._useSsh:
663 self._delayNextCheck()
664
665 return reason
666
668 """
669 Callback called after a successful connect to the remote device.
670 """
671 if self._useSsh:
672 log.debug("Connected to %s [%s]", self._devId, self._manageIp)
673 else:
674 log.debug("Running command(s) locally")
675 return result
676
678 """
679 Add a new instantiation of ProcessRunner or SshRunner
680 for every datasource.
681 """
682 if self._preferences.options.showfullcommand:
683 log.info("Datasource %s command: %s", datasource.name,
684 datasource.command)
685
686 if self._useSsh:
687 runner = SshRunner(self._connection)
688 else:
689 runner = ProcessRunner()
690
691 d = runner.start(datasource)
692 datasource.lastStart = time.time()
693 d.addBoth(datasource.processCompleted)
694 return d
695
697 """
698 Get performance data for all the monitored components on a device
699
700 @parameter ignored: required to keep Twisted's callback chain happy
701 @type ignored: result of previous callback
702 """
703 self.state = SshPerformanceCollectionTask.STATE_FETCH_DATA
704
705 # The keys are the datasource commands, which are by definition unique
706 # to the command run.
707 cacheableDS = {}
708
709 # Bundle up the list of tasks
710 deferredCmds = []
711 for datasource in self._datasources:
712 datasource.deviceConfig = self._device
713
714 if datasource.command in cacheableDS:
715 cacheableDS[datasource.command].append(datasource)
716 continue
717 cacheableDS[datasource.command] = []
718
719 task = self._executor.submit(self._addDatasource, datasource)
720 deferredCmds.append(task)
721
722 # Run the tasks
723 dl = defer.DeferredList(deferredCmds, consumeErrors=True)
724 dl.addCallback(self._parseResults, cacheableDS)
725 dl.addCallback(self._storeResults)
726 dl.addCallback(self._updateStatus)
727
728 # Save the list in case we need to cancel the commands
729 self._commandsToExecute = dl
730 return dl
731
733 """
734 Interpret the results retrieved from the commands and pass on
735 the datapoint values and events.
736
737 @parameter resultList: results of running the commands in a DeferredList
738 @type resultList: array of (boolean, datasource)
739 @parameter cacheableDS: other datasources that can use the same results
740 @type cacheableDS: dictionary of arrays of datasources
741 """
742 self.state = SshPerformanceCollectionTask.STATE_PARSE_DATA
743 parseableResults = []
744 for success, datasource in resultList:
745 results = ParsedResults()
746 if not success:
747 # In this case, our datasource is actually a defer.Failure
748 reason = datasource
749 datasource, = reason.value.args
750 msg = "Datasource %s command timed out" % (
751 datasource.name)
752 ev = self._makeCmdEvent(datasource, msg)
753 results.events.append(ev)
754
755 else:
756 # Re-use our results for any similar datasources
757 cachedDsList = cacheableDS.get(datasource.command)
758 if cachedDsList:
759 for ds in cachedDsList:
760 ds.result = copy(datasource.result)
761 results = ParsedResults()
762 self._processDatasourceResults(ds, results)
763 parseableResults.append( (ds, results) )
764 results = ParsedResults()
765
766 self._processDatasourceResults(datasource, results)
767
768 parseableResults.append( (datasource, results) )
769 return parseableResults
770
772 """
773 Create a parser object to process data
774
775 @parameter datasource: datasource containg information
776 @type datasource: Cmd object
777 @parameter eventList: list of events
778 @type eventList: list of dictionaries
779 """
780 parser = None
781 try:
782 parser = datasource.parser.create()
783 except Exception, ex:
784 msg = "Error loading parser %s" % datasource.parser
785 log.exception("%s %s %s", self.name, datasource.name, msg)
786 ev = self._makeCmdEvent(datasource, msg)
787 ev['message'] = traceback.format_exc()
788 eventList.append(ev)
789 return parser
790
792 """
793 Process a single datasource's results
794
795 @parameter datasource: datasource containg information
796 @type datasource: Cmd object
797 @parameter results: empty results object
798 @type results: ParsedResults object
799 """
800 showcommand = self._preferences.options.showfullcommand
801 if not datasource.result.output:
802 msg = "No data returned for command"
803 if showcommand:
804 msg += ": %s" % datasource.command
805 log.warn("%s %s %s", self.name, datasource.name, msg)
806 ev = self._makeCmdEvent(datasource, msg)
807 if showcommand:
808 ev['command'] = datasource.command
809 results.events.append(ev)
810 return
811
812 parser = self._makeParser(datasource, results.events)
813 if not parser:
814 return
815
816 try:
817 parser.preprocessResults(datasource, log)
818 parser.processResults(datasource, results)
819 if not results.events and parser.createDefaultEventUsingExitCode:
820 # Add a failsafe event guessing at the error codes
821 self._addDefaultEvent(datasource, results)
822 if datasource.result.stderr:
823 self._addStderrMsg(datasource.result.stderr,
824 results.events)
825 except Exception, ex:
826 msg = "Error running parser %s" % datasource.parser
827 log.exception("%s %s %s", self.name, datasource.name, msg)
828 ev = self._makeCmdEvent(datasource, msg)
829 ev['message'] = traceback.format_exc()
830 ev['output'] = datasource.result.output
831 results.events.append(ev)
832
834 """
835 If there is no event, send one based on the exit code.
836 """
837 exitCode = datasource.result.exitCode
838 if exitCode == 0:
839 msg = ''
840 severity = 0
841 else:
842 msg = 'Datasource: %s - Code: %s - Msg: %s' % (
843 datasource.name, exitCode, getExitMessage(exitCode))
844 severity = datasource.severity
845
846 ev = self._makeCmdEvent(datasource, msg, severity)
847 results.events.append(ev)
848
850 """
851 Add the stderr output to error events.
852
853 @parameter stderrMsg: stderr output from the command
854 @type stderrMsg: string
855 @parameter eventList: list of events
856 @type eventList: list of dictionaries
857 """
858 for event in eventList:
859 if event['severity'] not in ('Clear', 'Info', 'Debug'):
860 event['stderr'] = stderrMsg
861
863 """
864 Store the values in RRD files
865
866 @parameter resultList: results of running the commands
867 @type resultList: array of (datasource, dictionary)
868 """
869 self.state = SshPerformanceCollectionTask.STATE_STORE_PERF
870 for datasource, results in resultList:
871 for dp, value in results.values:
872 threshData = {
873 'eventKey': datasource.getEventKey(dp),
874 'component': dp.component,
875 }
876 self._dataService.writeRRD(
877 dp.rrdPath,
878 value,
879 dp.rrdType,
880 dp.rrdCreateCommand,
881 datasource.cycleTime,
882 dp.rrdMin,
883 dp.rrdMax,
884 threshData)
885
886 return resultList
887
889 """
890 Send any accumulated events
891
892 @parameter resultList: results of running the commands
893 @type resultList: array of (datasource, dictionary)
894 """
895 for datasource, results in resultList:
896 self._clearEvent(datasource, results.events)
897 for ev in results.events:
898 self._eventService.sendEvent(ev, device=self._devId)
899 return resultList
900
902 """
903 Ensure that a CLEAR event is sent for any command that
904 successfully completes.
905 """
906 # If the result is a Failure, no exitCode exists
907 exitCode = getattr(datasource.result, 'exitCode', -1)
908 if exitCode is None or exitCode != 0:
909 return
910
911 clearEvents = [ev for ev in eventList if ev['severity'] == Clear]
912 if not clearEvents:
913 msg = 'Datasource %s command completed successfully' % (
914 datasource.name)
915 ev = self._makeCmdEvent(datasource, msg, severity=Clear)
916 eventList.append(ev)
917
919 """
920 Create an event using the info in the Cmd object.
921 """
922 severity = datasource.severity if severity is None else severity
923 ev = dict(
924 device=self._devId,
925 component=datasource.component,
926 eventClass=datasource.eventClass,
927 eventKey=datasource.eventKey,
928 severity=severity,
929 summary=msg
930 )
931 return ev
932
934 """
935 Callback activated when the task is complete
936
937 @parameter result: results of the task
938 @type result: deferred object
939 """
940 if not isinstance(result, Failure):
941 self._returnToNormalSchedule()
942
943 try:
944 self._close()
945 except Exception, ex:
946 log.warn("Failed to close device %s: error %s" %
947 (self._devId, str(ex)))
948
949 # Return the result so the framework can track success/failure
950 return result
951
953 """
954 Called by the collector framework scheduler, and allows us to
955 see how each task is doing.
956 """
957 display = "%s useSSH: %s\n" % (
958 self.name, self._useSsh)
959 if self._lastErrorMsg:
960 display += "%s\n" % self._lastErrorMsg
961 return display
962
963
964 if __name__ == '__main__':
965 # Required for passing classes from zenhub to here
966 from Products.ZenRRD.zencommand import Cmd, DataPointConfig
967
968 myPreferences = SshPerformanceCollectionPreferences()
969 myTaskFactory = SimpleTaskFactory(SshPerformanceCollectionTask)
970 myTaskSplitter = SshPerCycletimeTaskSplitter(myTaskFactory)
971 daemon = CollectorDaemon(myPreferences, myTaskSplitter)
972 daemon.run()
973
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1.1812 on Tue Oct 11 12:51:48 2011 | http://epydoc.sourceforge.net |