| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2009, Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 as published by
8 # the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused, getExitMessage
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear, Error
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
54
55
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
111
112
114 "notify the starter that their process is complete"
115 self.exitCode = reason.value.exitCode
116 log.debug('Received exit code: %s' % self.exitCode)
117 log.debug('Command: %r' % self.command)
118 log.debug('Output: %r' % self.output)
119
120 if self.stopped:
121 d, self.stopped = self.stopped, None
122 if not d.called:
123 d.callback(self)
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
136 "Run a command against the server"
137 d = defer.Deferred()
138 self.defers[command] = d
139 SshClient.addCommand(self, command)
140 return d
141
142
144 "Forward the results of the command execution to the starter"
145 # don't call the CollectorClient.addResult which adds the result to a
146 # member variable for zenmodeler
147 d = self.defers.pop(command, None)
148 if d is None:
149 log.error("Internal error where deferred object not in dictionary." \
150 " Command = '%s' Data = '%s' Code = '%s'",
151 command, data, code)
152 elif not d.called:
153 d.callback((data, code))
154
155
159
160
162 "We don't need to track commands/results when they complete"
163 SshClient.clientFinished(self)
164 self.cmdmap = {}
165 self._commands = []
166 self.results = []
167
168
170 """
171 Cache all the Ssh connections so they can be managed
172 """
173
177
179 "Make a new SSH connection if there isn't one available"
180 dc = cmd.deviceConfig
181 result = self.pool.get(dc.device, None)
182 if result is None:
183 log.debug("Creating connection to %s", dc.device)
184 options = Options(dc.username, dc.password,
185 dc.loginTimeout, dc.commandTimeout,
186 dc.keyPath, dc.concurrentSessions)
187 # New param KeyPath
188 result = MySshClient(dc.device, dc.ipAddress, dc.port,
189 options=options)
190 if self.eventSender is not None:
191 result.sendEvent = self.eventSender.sendEvent
192 result.run()
193 self.pool[dc.device] = result
194 return result
195
199
201 "close the SSH connection to a device, if it exists"
202 c = self.pool.get(device, None)
203 if c:
204 log.debug("Closing connection to %s", device)
205 if c.transport:
206 c.transport.loseConnection()
207 del self.pool[device]
208
209
213
214
216 "reduce the number of connections using the schedule for guidance"
217 # compute device list in order of next use
218 devices = []
219 for c in schedule:
220 device = c.deviceConfig.device
221 if device not in devices:
222 devices.append(device)
223 # close as many devices as needed
224 while devices and len(self.pool) > MAX_CONNECTIONS:
225 self._close(devices.pop())
226
227
229 """
230 Run a single command across a cached SSH connection
231 """
232 exitCode = None
233 output = None
234
237
238
240 "Initiate a command on the remote device"
241 self.defer = defer.Deferred()
242 c = self.pool.get(cmd)
243 try:
244 d = Timeout(c.addCommand(cmd.command),
245 cmd.deviceConfig.commandTimeout,
246 cmd)
247 except Exception, ex:
248 log.warning('Error starting command: %s', ex)
249 self.pool.close(cmd)
250 return defer.fail(ex)
251 d.addErrback(self.timeout)
252 d.addBoth(self.processEnded)
253 return d
254
255
257 "Deal with slow executing command/connection (close it)"
258 cmd, = arg.value.args
259 # we could send a kill signal, but then we would need to track
260 # the command channel to send it to: just close the connection
261 self.pool.close(cmd)
262 return arg
263
264
271
272
274 lastChange = 0.
275 device = ''
276 ipAddress = ''
277 port = 0
278 username = ''
279 password = ''
280 loginTimeout = 0.
281 commandTimeout = 0.
282 keyPath = ''
283 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
284
285
287 id = ''
288 component = ''
289 rrdPath = ''
290 rrdType = None
291 rrdCreateCommand = ''
292 rrdMin = None
293 rrdMax = None
294
296 self.data = {}
297
300
301 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
302
304 """
305 Holds the config of every command to be run
306 """
307 command = None
308 useSsh = False
309 cycleTime = None
310 eventClass = None
311 eventKey = None
312 severity = 3
313 lastStart = 0
314 lastStop = 0
315 result = None
316
317
320
323
324
326 cmd, args = (self.command + ' ').split(' ', 1)
327 cmd = cmd.split('/')[-1]
328 return '%s %s' % (cmd, args)
329
330
332 if self.running():
333 return self.lastStart + self.cycleTime
334 return self.lastStop + self.cycleTime
335
336
338 if self.useSsh:
339 pr = SshRunner(pool)
340 else:
341 pr = ProcessRunner()
342 d = pr.start(self)
343 self.lastStart = time.time()
344 log.debug('Process %s started' % self.name())
345 d.addBoth(self.processEnded)
346 return d
347
348
350 self.result = pr
351 self.lastStop = time.time()
352
353 # Check for a condition that could cause zencommand to stop cycling.
354 # http://dev.zenoss.org/trac/ticket/4936
355 if self.lastStop < self.lastStart:
356 log.debug('System clock went back?')
357 self.lastStop = self.lastStart
358
359 if not isinstance(pr, failure.Failure):
360 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % (
361 self.name(),
362 pr.exitCode,
363 self.lastStop - self.lastStart))
364 return self
365 return pr
366
367
369 self.deviceConfig = deviceConfig
370 self.useSsh = cfg.useSsh
371 self.cycleTime = max(int(cfg.cycleTime), 1)
372 self.eventKey = cfg.eventKey
373 self.eventClass = cfg.eventClass
374 self.severity = cfg.severity
375 self.command = str(cfg.command)
376 self.points = cfg.points
377 if cfg.severity is None:
378 log.warning("severity is None: cfg,%r ; deviceConfig, %r",
379 cfg, deviceConfig)
380 return self
381
383 # fetch datapoint name from filename path and add it to the event key
384 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
385
390
391 pb.setUnjellyableForClass(Cmd, Cmd)
392
394 loginTries=1
395 searchPath=''
396 existenceTest=None
397
398 - def __init__(self, username, password, loginTimeout, commandTimeout,
399 keyPath, concurrentSessions):
400 self.username = username
401 self.password = password
402 self.loginTimeout=loginTimeout
403 self.commandTimeout=commandTimeout
404 self.keyPath = keyPath
405 self.concurrentSessions = concurrentSessions
406
407
409
411 """
412 config - one of the configurations that was returned by calling
413 getDataSourceCommands on zenhub.
414 scheduledCmds - a dictionary that maps (device, command) to Cmd object
415 for the commands currently on zencommand's schedule.
416 sendEvent - a function that sends events to zenhub using twisted
417 perspective broker.
418 """
419 self._config = config
420 self._scheduledCmds = scheduledCmds
421 self._sendEvent = sendEvent
422 self._device = config.device
423 self._suppressed = [] # log warning and send event once per device
424 self._summary = 'zCommandUsername is not set'
425
427 "send an event (or clear it) for username not set"
428 self._sendEvent(dict(
429 device=self._device,
430 eventClass='/Cmd/Fail',
431 eventKey='zCommandUsername',
432 severity=severity,
433 component='zencommand',
434 summary=self._summary))
435
437 """
438 Warn that the username is not set for device and the SSH command cannot be
439 executed.
440 """
441 if self._device not in self._suppressed:
442 log.warning(self._summary + ' for %s' % self._device)
443 self._sendUsernameEvent(Error)
444 msg = 'username not configured for %s. skipping %s.'
445 log.debug(msg % (self._device, command))
446
448 """
449 Go through the Cmd objects in config.commands and update the config on
450 the command with config. If currentDict has the command on it then use
451 that one, otherwise use the command from config. If the device does not have a
452 username set, then don't yield commands that use SSH.
453 """
454 for cmd in self._config.commands:
455 key = (self._device, cmd.command)
456 if cmd.useSsh:
457 if self._config.username:
458 self._sendUsernameEvent(Clear)
459 else:
460 self._warnUsernameNotSet(cmd.command)
461 if self._device not in self._suppressed:
462 self._suppressed.append(self._device)
463 if key in self._scheduledCmds:
464 del self._scheduledCmds[key]
465 continue
466 if key in self._scheduledCmds:
467 newCmd = self._scheduledCmds.pop(key)
468 else:
469 newCmd = cmd
470 yield newCmd.updateConfig(cmd, self._config)
471 self._suppressed = []
472
473
475 """
476 Daemon code to schedule commands and run them.
477 """
478
479 initialServices = RRDDaemon.initialServices + ['CommandConfig']
480
482 RRDDaemon.__init__(self, 'zencommand')
483 self.schedule = []
484 self.timeout = None
485 self.pool = SshPool()
486 self.pool.eventSender = self
487 self.executed = 0
488
490 self.log.debug("zenhub has asked us to delete device %s" % doomed)
491 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
492
494 self.log.debug("Configuration update from zenhub")
495 self.log.info("config: %r", config)
496 self.updateConfig([config], [config.device])
497
499 self.log.debug("zenhub sent updated device list %s" % devices)
500 updated = []
501 lastChanges = dict(devices) # map device name to last change
502 keep = []
503 for cmd in self.schedule:
504 if cmd.deviceConfig.device in lastChanges:
505 if cmd.lastChange > lastChanges[cmd.device]:
506 updated.append(cmd.deviceConfig.device)
507 keep.append(cmd)
508 else:
509 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
510 self.schedule = keep
511 if updated:
512 self.log.info("Fetching the config for %s", updated)
513 d = self.model().callRemote('getDataSourceCommands', devices)
514 d.addCallback(self.updateConfig, updated)
515 d.addErrback(self.error)
516
518 expected = Set(expected)
519 current = {}
520 for c in self.schedule:
521 if c.deviceConfig.device in expected:
522 current[c.deviceConfig.device,c.command] = c
523 # keep all the commands we didn't ask for
524 update = [c for c in self.schedule if c.deviceConfig.device not in expected]
525 for cfg in configs:
526 self.thresholds.updateForDevice(cfg.device, cfg.thresholds)
527 if self.options.device and self.options.device != cfg.device:
528 continue
529 processor = ConfigurationProcessor(cfg, current, self.sendEvent)
530 update.extend(processor.updateCommands())
531 for device, command in current.keys():
532 self.log.info("Deleting command %s from %s", device, command)
533 self.schedule = update
534 self.processSchedule()
535
537 "There is no master 'cycle' to send the hearbeat"
538 self.heartbeat()
539 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle)
540 events = []
541 events += self.rrdStats.gauge('schedule',
542 self.heartbeatTimeout,
543 len(self.schedule))
544 events += self.rrdStats.counter('commands',
545 self.heartbeatTimeout,
546 self.executed)
547 events += self.rrdStats.counter('dataPoints',
548 self.heartbeatTimeout,
549 self.rrd.dataPoints)
550 events += self.rrdStats.gauge('cyclePoints',
551 self.heartbeatTimeout,
552 self.rrd.endCycle())
553 self.sendEvents(events)
554
555
557 """
558 Run through the schedule and start anything that needs to be done.
559 Set a timer if we have nothing to do.
560 """
561 if not self.options.cycle:
562 for cmd in self.schedule:
563 if cmd.running() or cmd.lastStart == 0:
564 break
565 else:
566 self.stop()
567 return
568 try:
569 if self.timeout and not self.timeout.called:
570 self.timeout.cancel()
571 self.timeout = None
572 def compare(x, y):
573 return cmp(x.nextRun(), y.nextRun())
574 self.schedule.sort(compare)
575 self.pool.trimConnections(self.schedule)
576 earliest = None
577 running = 0
578 now = time.time()
579 for c in self.schedule:
580 if c.running():
581 running += 1
582
583 for c in self.schedule:
584 if running >= self.options.parallel:
585 break
586 if c.nextRun() <= now:
587 c.start(self.pool).addBoth(self.finished)
588 running += 1
589 else:
590 earliest = c.nextRun() - now
591 break
592
593 if earliest is not None:
594 self.pool.reinitializeCollectorClients()
595 self.log.debug("Next command in %d seconds", int(earliest))
596 self.timeout = reactor.callLater(max(1, earliest),
597 self.processSchedule)
598 except Exception, ex:
599 self.log.exception(ex)
600
601
603 """
604 Send an event using the info in the Cmd object.
605 """
606 self.sendEvent(dict(device=cmd.deviceConfig.device,
607 component=cmd.component,
608 eventClass=cmd.eventClass,
609 eventKey=cmd.eventKey,
610 severity=severity,
611 summary=summary))
612
614 """
615 The command has finished. cmdOrErr is either a Cmd instance or a
616 twisted failure.
617 """
618 self.executed += 1
619 if isinstance(cmdOrErr, failure.Failure):
620 self.error(cmdOrErr)
621 else:
622 cmd = cmdOrErr
623 self._handleExitCode(cmd)
624 self.parseResults(cmd)
625 self.processSchedule()
626
628 """
629 zencommand handles sending clears for exit code 0, all other exit codes
630 should be handled by the parser associated with the command
631 """
632 exitCode = cmd.result.exitCode
633 msg = 'Cmd: %s - Code: %s - Msg: %s' % (
634 cmd.command, exitCode, getExitMessage(exitCode))
635 if exitCode == 0:
636 self.sendCmdEvent(cmd, Clear, msg)
637
639 """
640 The finished method indicated that there was a failure. This method
641 is also called by RRDDaemon.errorStop.
642 """
643 if isinstance(err.value, TimeoutError):
644 cmd, = err.value.args
645 msg = "Command timed out on device %s: %r" % (
646 cmd.deviceConfig.device, cmd.command)
647 self.log.warning(msg)
648 self.sendCmdEvent(cmd, cmd.severity, msg)
649 else:
650 self.log.exception(err.value)
651
653 """
654 Process the results of our command-line, send events
655 and check datapoints.
656
657 @param cmd: command
658 @type: cmd object
659 """
660 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
661 results = ParsedResults()
662 try:
663 parser = cmd.parser.create()
664 except Exception, ex:
665 self.log.exception("Error loading parser %s" % cmd.parser)
666 import traceback
667 self.sendEvent(dict(device=cmd.deviceConfig.device,
668 summary="Error loading parser %s" % cmd.parser,
669 component="zencommand",
670 message=traceback.format_exc(),
671 agent="zencommand",
672 ))
673 return
674 parser.processResults(cmd, results)
675
676 for ev in results.events:
677 self.sendEvent(ev, device=cmd.deviceConfig.device)
678
679 for dp, value in results.values:
680 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
681 value = self.rrd.save(dp.rrdPath,
682 value,
683 dp.rrdType,
684 dp.rrdCreateCommand,
685 cmd.cycleTime,
686 dp.rrdMin,
687 dp.rrdMax)
688 self.log.debug("RRD save result: %s" % value)
689 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
690 eventKey = cmd.getEventKey(dp)
691 if 'eventKey' in ev:
692 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
693 else:
694 ev['eventKey'] = eventKey
695 ev['component'] = dp.component
696 self.sendEvent(ev)
697
699 def doFetchConfig(driver):
700 try:
701 now = time.time()
702
703 yield self.model().callRemote('propertyItems')
704 self.setPropertyItems(driver.next())
705
706 yield self.model().callRemote('getDefaultRRDCreateCommand')
707 createCommand = driver.next()
708
709 yield self.model().callRemote('getThresholdClasses')
710 self.remote_updateThresholdClasses(driver.next())
711
712
713 yield self.model().callRemote('getCollectorThresholds')
714 self.rrdStats.config(self.options.monitor,
715 self.name,
716 driver.next(),
717 createCommand)
718
719 devices = []
720 if self.options.device:
721 devices = [self.options.device]
722 yield self.model().callRemote('getDataSourceCommands',
723 devices)
724 if not devices:
725 devices = list(Set([c.deviceConfig.device
726 for c in self.schedule]))
727 self.updateConfig(driver.next(), devices)
728
729 self.rrd = RRDUtil(createCommand, 60)
730
731 self.sendEvents(
732 self.rrdStats.gauge('configTime',
733 self.configCycleInterval * 60,
734 time.time() - now))
735
736 except Exception, ex:
737 self.log.exception(ex)
738 raise
739
740 return drive(doFetchConfig)
741
742
744 """
745 Fetch the configuration and return a deferred for its completion.
746 Also starts the config cycle
747 """
748 ex = None
749 try:
750 self.log.debug('Fetching configuration from zenhub')
751 yield self.fetchConfig()
752 driver.next()
753 self.log.debug('Finished config fetch')
754 except Exception, ex:
755 self.log.exception(ex)
756 driveLater(self.configCycleInterval * 60, self.start)
757 if ex:
758 raise ex
759
761 RRDDaemon.buildOptions(self)
762
763 self.parser.add_option('--parallel', dest='parallel',
764 default=10, type='int',
765 help="Number of devices to collect at one time")
766
768 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop)
769 if self.options.cycle:
770 d.addCallback(self.heartbeatCycle)
771
772
773 if __name__ == '__main__':
774 from Products.ZenRRD.zencommand import zencommand
775 z = zencommand()
776 z.run()
777
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Wed Jul 14 12:07:20 2010 | http://epydoc.sourceforge.net |