1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused, getExitMessage
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear, Error
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
47 """
48 Error for a defered call taking too long to complete
49 """
50
52 Exception.__init__(self)
53 self.args = args
54
55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
109 "Store up the output as it arrives from the process"
110 self.output += data
111
112
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
141
142
149
150
151 - def check(self, ip, timeout=2):
152 "Turn off blocking SshClient.test method"
153 return True
154
155
157 "We don't need to track commands/results when they complete"
158 SshClient.clientFinished(self)
159 self.commands = []
160 self.results = []
161
162
164 """
165 Cache all the Ssh connections so they can be managed
166 """
167
170
171
172 - def get(self, cmd):
173 "Make a new SSH connection if there isn't one available"
174 dc = cmd.deviceConfig
175 result = self.pool.get(dc.device, None)
176 if result is None:
177 log.debug("Creating connection to %s", dc.device)
178 options = Options(dc.username, dc.password,
179 dc.loginTimeout, dc.commandTimeout,
180 dc.keyPath, dc.concurrentSessions)
181
182 result = MySshClient(dc.device, dc.ipAddress, dc.port,
183 options=options)
184 result.run()
185 self.pool[dc.device] = result
186 return result
187
188
197
198
200 "symetric close that matches get() method"
201 self._close(cmd.deviceConfig.device)
202
203
205 "reduce the number of connections using the schedule for guidance"
206
207 devices = []
208 for c in schedule:
209 device = c.deviceConfig.device
210 if device not in devices:
211 devices.append(device)
212
213 while devices and len(self.pool) > MAX_CONNECTIONS:
214 self._close(devices.pop())
215
216
218 """
219 Run a single command across a cached SSH connection
220 """
221 exitCode = None
222 output = None
223
226
227
243
244
246 "Deal with slow executing command/connection (close it)"
247 cmd, = arg.value.args
248
249
250 self.pool.close(cmd)
251 return arg
252
253
255 "Deliver ourselves to the starter with the proper attributes"
256 if isinstance(value, failure.Failure):
257 return value
258 self.output, self.exitCode = value
259 return self
260
261
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
273
274
289
290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
291
292 -class Cmd(pb.Copyable, pb.RemoteCopy):
293 """
294 Holds the config of every command to be run
295 """
296 command = None
297 useSsh = False
298 cycleTime = None
299 eventClass = None
300 eventKey = None
301 severity = 3
302 lastStart = 0
303 lastStop = 0
304 result = None
305
306
309
312
313
315 cmd, args = (self.command + ' ').split(' ', 1)
316 cmd = cmd.split('/')[-1]
317 return '%s %s' % (cmd, args)
318
319
324
325
336
337
355
356
367
371
373 "Provide a value that establishes the uniqueness of this command"
374 return '%'.join(map(str, [self.useSsh, self.cycleTime,
375 self.severity, self.command]))
376
377 pb.setUnjellyableForClass(Cmd, Cmd)
378
392
393
395
396 - def __init__(self, config, scheduledCmds, sendEvent):
397 """
398 config - one of the configurations that was returned by calling
399 getDataSourceCommands on zenhub.
400 scheduledCmds - a dictionary that maps (device, command) to Cmd object
401 for the commands currently on zencommand's schedule.
402 sendEvent - a function that sends events to zenhub using twisted
403 perspective broker.
404 """
405 self._config = config
406 self._scheduledCmds = scheduledCmds
407 self._sendEvent = sendEvent
408 self._device = config.device
409 self._suppressed = []
410 self._summary = 'zCommandUsername is not set'
411
421
423 """
424 Warn that the username is not set for device and the SSH command cannot be
425 executed.
426 """
427 if self._device not in self._suppressed:
428 log.warning(self._summary + ' for %s' % self._device)
429 self._sendUsernameEvent(Error)
430 msg = 'username not configured for %s. skipping %s.'
431 log.debug(msg % (self._device, command))
432
434 """
435 Go through the Cmd objects in config.commands and update the config on
436 the command with config. If currentDict has the command on it then use
437 that one, otherwise use the command from config. If the device does not have a
438 username set, then don't yield commands that use SSH.
439 """
440 for cmd in self._config.commands:
441 key = (self._device, cmd.command)
442 if cmd.useSsh:
443 if self._config.username:
444 self._sendUsernameEvent(Clear)
445 else:
446 self._warnUsernameNotSet(cmd.command)
447 if self._device not in self._suppressed:
448 self._suppressed.append(self._device)
449 if key in self._scheduledCmds:
450 del self._scheduledCmds[key]
451 continue
452 if key in self._scheduledCmds:
453 newCmd = self._scheduledCmds.pop(key)
454 else:
455 newCmd = cmd
456 yield newCmd.updateConfig(cmd, self._config)
457 self._suppressed = []
458
459
461 """
462 Daemon code to schedule commands and run them.
463 """
464
465 initialServices = RRDDaemon.initialServices + ['CommandConfig']
466
473
475 self.log.debug("zenhub has asked us to delete device %s" % doomed)
476 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
477
481
483 self.log.debug("zenhub sent updated device list %s" % devices)
484 updated = []
485 lastChanges = dict(devices)
486 keep = []
487 for cmd in self.schedule:
488 if cmd.deviceConfig.device in lastChanges:
489 if cmd.lastChange > lastChanges[cmd.device]:
490 updated.append(cmd.deviceConfig.device)
491 keep.append(cmd)
492 else:
493 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
494 self.schedule = keep
495 if updated:
496 self.log.info("Fetching the config for %s", updated)
497 d = self.model().callRemote('getDataSourceCommands', devices)
498 d.addCallback(self.updateConfig, updated)
499 d.addErrback(self.error)
500
519
538
539
541 """
542 Run through the schedule and start anything that needs to be done.
543 Set a timer if we have nothing to do.
544 """
545 if not self.options.cycle:
546 for cmd in self.schedule:
547 if cmd.running() or cmd.lastStart == 0:
548 break
549 else:
550 self.stop()
551 return
552 try:
553 if self.timeout and not self.timeout.called:
554 self.timeout.cancel()
555 self.timeout = None
556 def compare(x, y):
557 return cmp(x.nextRun(), y.nextRun())
558 self.schedule.sort(compare)
559 self.pool.trimConnections(self.schedule)
560 earliest = None
561 running = 0
562 now = time.time()
563 for c in self.schedule:
564 if c.running():
565 running += 1
566
567 for c in self.schedule:
568 if running >= self.options.parallel:
569 break
570 if c.nextRun() <= now:
571 c.start(self.pool).addBoth(self.finished)
572 running += 1
573 else:
574 earliest = c.nextRun() - now
575 break
576
577 if earliest is not None:
578 self.log.debug("Next command in %d seconds", int(earliest))
579 self.timeout = reactor.callLater(max(1, earliest),
580 self.processSchedule)
581 except Exception, ex:
582 self.log.exception(ex)
583
584
595
597 """
598 The command has finished. cmdOrErr is either a Cmd instance or a
599 twisted failure.
600 """
601 self.executed += 1
602 if isinstance(cmdOrErr, failure.Failure):
603 self.error(cmdOrErr)
604 else:
605 cmd = cmdOrErr
606 self._handleExitCode(cmd)
607 self.parseResults(cmd)
608 self.processSchedule()
609
620
622 """
623 The finished method indicated that there was a failure. This method
624 is also called by RRDDaemon.errorStop.
625 """
626 if isinstance(err.value, TimeoutError):
627 cmd, = err.value.args
628 msg = "Command timed out on device %s: %r" % (
629 cmd.deviceConfig.device, cmd.command)
630 self.log.warning(msg)
631 self.sendCmdEvent(cmd, cmd.severity, msg)
632 else:
633 self.log.exception(err.value)
634
636 """
637 Process the results of our command-line, send events
638 and check datapoints.
639
640 @param cmd: command
641 @type: cmd object
642 """
643 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
644 results = ParsedResults()
645 try:
646 parser = cmd.parser.create()
647 except Exception, ex:
648 self.log.exception("Error loading parser %s" % cmd.parser)
649 import traceback
650 self.sendEvent(dict(device=cmd.deviceConfig.device,
651 summary="Error loading parser %s" % cmd.parser,
652 component="zencommand",
653 message=traceback.format_exc(),
654 agent="zencommand",
655 ))
656 return
657 parser.processResults(cmd, results)
658
659 for ev in results.events:
660 self.sendEvent(ev, device=cmd.deviceConfig.device)
661
662 for dp, value in results.values:
663 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
664 value = self.rrd.save(dp.rrdPath,
665 value,
666 dp.rrdType,
667 dp.rrdCreateCommand,
668 cmd.cycleTime,
669 dp.rrdMin,
670 dp.rrdMax)
671 self.log.debug("RRD save result: %s" % value)
672 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
673 eventKey = cmd.getEventKey(dp)
674 if 'eventKey' in ev:
675 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
676 else:
677 ev['eventKey'] = eventKey
678 ev['component'] = dp.component
679 self.sendEvent(ev)
680
722
723 return drive(doFetchConfig)
724
725
726 - def start(self, driver):
727 """
728 Fetch the configuration and return a deferred for its completion.
729 Also starts the config cycle
730 """
731 ex = None
732 try:
733 self.log.debug('Fetching configuration from zenhub')
734 yield self.fetchConfig()
735 driver.next()
736 self.log.debug('Finished config fetch')
737 except Exception, ex:
738 self.log.exception(ex)
739 driveLater(self.configCycleInterval * 60, self.start)
740 if ex:
741 raise ex
742
744 RRDDaemon.buildOptions(self)
745
746 self.parser.add_option('--parallel', dest='parallel',
747 default=10, type='int',
748 help="Number of devices to collect at one time")
749
754
755
756 if __name__ == '__main__':
757 from Products.ZenRRD.zencommand import zencommand
758 z = zencommand()
759 z.run()
760